PrefectとBigQueryを使っているので現状やったことを書いてみる

4 min
xainome

xainome

データサイエンティスト風味のSES
自身の話をするよりは人の話を聞く方が好き
普通の人

FOLLOW

副業でのPrefect CloudとBigQueryの使用

副業でPrefect CloudとBigQueryを使っているので触った点と書いてみようと思います。

環境の設定とインストール

私はWindowsを使っています。契約を結んでいる企業の導入手順がMacで書かれていたのでそこもいろいろ調べてやってみました。

Prefectとは?

そもそもprefectとは何ぞや?というところですね。prefectはPythonベースのワークフローエンジンになります。

pythonジョブの実行を自動化したりデバッグしたりする機能をpythonコードで書けるシステムになります。

AWSだとStepfunctionのイメージに近いですね。それに加えてEventBridgeやCloud Watch Logsが足されたような感じです。

おそらくAWSやGoogle Cloudに比べてコストが低く、使いやすいという点で採用されたと考えられます。

Prefectの参照しているドキュメントはこちらです。ちなみに3.0がリリースされたみたいなのでそちらのドキュメントはこちらになります。

インストール手順

一応pythonをインストールしている前提でお話しします。

まずはprefect cloudのインストールを行います。

pip install prefect

次はタスクを実行するためのコマンドが必要になります。ただ、Windowsにはパッケージ管理ツールがありません。

なくても問題ないですが、せっかくなので導入してみます。今後パッケージを入れるときは活用していきたいです。

Scoopの導入

今回使ったパッケージはscoopになります。手順はサイトにあるのでコマンドプロンプトに入力します。

Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser

デフォルトでmainがありますが、もしバケットを別にして管理したい場合は以下のコマンドを使います。

# バケットの追加
scoop bucket add extras
# バケット一覧
scoop bucket list
# インストール一覧
scoop list
scoop bucket 一覧
scoop インストール一覧

今回はtaskだけ必要なのでmainに入れました。これでtaskコマンドが使えるようになります。

Prefectの設定と実行

ただ、taskを実行するにはTaskfile.yamlが必要になります。中身はプロジェクトによりますが私がやったのはこんな感じ。全部ではないですが…

tasks:
  start-local:
    cmds:
      - pip install -r requirements.txt
      - prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
      - prefect server start

  start-local-agent:
    cmds:
      - prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

Taskfile.yamlの準備が出来たらターミナルの用意します。

cd workflow(Taskfile.yamlの場所じゃなくてもok)
task start-local

これでprefectのサーバーを立ち上げることができます。ログインするとこんな感じ

prefect dashboard

Prefect用フローの作成と実行

次にprefect用のフローを書いてみましょう。一応BigQueryも使っているのでテーブルデータを取得するコードを記載してみます。

test.py

from google.cloud import bigquery
from prefect import flow, task
@task()
def create_table() -> None:

    schema = [
        bigquery.SchemaField("id", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
    ]

    table = bigquery.Table(
        table_ref=f"{PROJECT}.{DATABASE}.{TABLE}",
        schema=schema,
    )
    print(table.schema)
    print(table.num_rows)

@flow(name="test")
def result_details():
    create_table()

test.pyという形でテーブルのデータを取るようにしました。PROJECT、DATABASE、TABLEは使用しているデータの値を入れます。schemaも使用するカラムに合わせます。

@flowで書かれた部分の関数を実行として呼び出します。nameを付ければ実行した時のフローの名前になります。

@taskはフロー内で実行するタスクになります。実行したい関数があれば@taskを付け、@flow関数で呼び出しましょう。

ログの確認方法

最後にログですね。pythonのデバッグではなくprefectのGUIのログを見ることになります。

ただ、prefectのログはpythonの標準搭載してあるprintやloggerで見ることはできません。printを使う場合とloggerを使う場合でこんな書き方になります。

print文を使う場合

@task()
def create_table() -> None:
print(“prefectで見れるよ”)

@flow(name=”test”, log_prints=True)
def result_details():
create_table()

loggerを使う場合

from prefect import flow, task, get_run_logger
@task()
def create_table() -> None:
logger = get_run_logger()
logger.info(“prefectで見れるよ”)

@flow(name=”test”)
def result_details():
create_table()

print文のlog_printsはtask()にも使えます。またloggerのほうも同様にflowで使うことができます。ただし、@flowまた@taskの関数内でしか使うことができないので注意が必要です。

まとめ

今のところ私が触ったのはこんな感じです。ドキュメントを見るとまだ使いきれてないですし、BigQueryもまだ使いこなせてないですね。

という感じで少しずつ理解を深めているというお話でした。ではでは。

xainome

xainome

データサイエンティスト風味のSES
自身の話をするよりは人の話を聞く方が好き
普通の人

FOLLOW

カテゴリー:
関連記事

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA


Social Media Auto Publish Powered By : XYZScripts.com