ForecastFlowをMatillionから使う-予測編
クラウドネイティブのETLツールMatillionを使って、データ加工からForecastFlowの予測までの処理を解説します
クラウドサービスはGCPを利用しております
モデルの作成
ForecastFlowでモデル作成と訓練を行ってください
Matillion Componentの作成
bashコンポーネントを作成します ここには依存パッケージをインストールするコードを書きます
# pipのアップグレード python3 -m pip install --upgrade pip --user # pandas, forecastflow, gcs sdkをインストール python3 -m pip install pandas --user python3 -m pip install forecastflow --user python3 -m pip install google-cloud-storage --user
次にpythonコンポーネントを作成します
ここには推論APIのスクリプトとGCSへのデータ入出力スクリプトを書きます
推論APIスクリプトの取得については下記ページの"スクリプトの用意"を参考してください
ForecastFlow を Tableau Prep から使う方法 - 予測編
先に推論スクリプトをForecastFlowからコピペします
email, password, project_id, model_idは適宜編集してください
# 推論スクリプト # Read the blog (Japanese) to figure out how to use this script. # https://gri-blog.hatenablog.com/entry/2019/12/09/162019 import forecastflow from forecastflow.tabpy_support import make_prediction_schema import datetime # ============================================================================ # [Required] Fill your ForecastFlow parameters email = "email" password = "password" project_id = "project_id" model_id = "model_id" # ============================================================================ # ============================================================================ # [Optional] Change display names data_name = "Test Stock" + str(datetime.datetime.now()) prediction_name = "Stock Prediction " + str(datetime.datetime.now()) # ============================================================================ user = forecastflow.User(email, password) get_output_schema = make_prediction_schema(user, project_id, model_id) def ff_predict(input_data): project = user.get_project(project_id) model = project.get_model(model_id) prediction_data = project.create_data_source( input_data, data_name, forecastflow.DataSourceLabel.PREDICTION ) prediction = model.create_prediction( prediction_data, prediction_name ) result = prediction.get_result() return result
続けてデータ入出力のコードを推論スクリプトの下に書き加えます
入力先のバケットと出力先のバケットをMatillionの変数に登録しておけば
コピペするだけで他のモデルにも使えるので便利です
コードはこちら
# GCS(Google Cloud Storage)連携用 # 推論スクリプトの下部 import pandas as pd import io # Imports the Google Cloud client library from google.cloud import storage def download_blob(bucket_name, source_blob_name): # Instantiates a client storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) stream = blob.download_as_string() return stream def upload_to_storage(dataframe): # Instantiates a client storage_client = storage.Client() bucket = storage_client.get_bucket(BucketNameTo) # save file time_now = datetime.datetime.now() time_formatted = datetime.datetime.strftime(time_now, "%Y%m%d_%H%M%S") blob = bucket.blob(f"Result-{time_formatted}.csv") blob.upload_from_string(dataframe.to_csv(index=False), content_type='application/octet-stream') # get data from gcs stream = download_blob(BucketNameFrom, PredictionFileName) df = pd.read_csv(io.StringIO(stream.decode("utf-8"))) # predict result = ff_predict(df) # dataframe # upload to gcs upload_to_storage(result)
コンポーネント作成したらフローを作成して実行します
結果が無事gcsに出力されました
MatillionのETLを使えば、データ加工 -> BigQuery -> ForecastFlowで予測するまでを自動化することでできます
下図はForecastFlow連携の一例になります
終わりに
今回はForecastFlowの推論APIを用いたMatillion連携を簡単に紹介致しました
訓練API -> ETL(Matilliion)ツール -> 推論APIといった使い方をすることで、機械学習の自動化も可能になりそうです
訓練APIについてはこちら
PythonからForecastFlowで訓練と推論を行う方法
Higashi