ForecastFlowをMatillionから使う-予測編

クラウドネイティブのETLツールMatillionを使って、データ加工からForecastFlowの予測までの処理を解説します
クラウドサービスはGCPを利用しております

モデルの作成

ForecastFlowでモデル作成と訓練を行ってください

Matillion Componentの作成

bashコンポーネントを作成します

f:id:gri-blog:20200727145030p:plain
bash-component
ここには依存パッケージをインストールするコードを書きます

# pipのアップグレード
python3 -m pip install --upgrade pip --user
# pandas, forecastflow, gcs sdkをインストール
python3 -m pip install pandas --user
python3 -m pip install forecastflow --user
python3 -m pip install google-cloud-storage --user

次にpythonコンポーネントを作成します

f:id:gri-blog:20200727161702p:plain
python-component
ここには推論APIスクリプトとGCSへのデータ入出力スクリプトを書きます
推論APIスクリプトの取得については下記ページの"スクリプトの用意"を参考してください ForecastFlow を Tableau Prep から使う方法 - 予測編

先に推論スクリプトをForecastFlowからコピペします
email, password, project_id, model_idは適宜編集してください

# 推論スクリプト
# Read the blog (Japanese) to figure out how to use this script.
# https://gri-blog.hatenablog.com/entry/2019/12/09/162019
import forecastflow
from forecastflow.tabpy_support import make_prediction_schema
import datetime

# ============================================================================
# [Required] Fill your ForecastFlow parameters
email = "email"
password = "password"
project_id = "project_id"
model_id = "model_id"
# ============================================================================

# ============================================================================
# [Optional] Change display names
data_name = "Test Stock" + str(datetime.datetime.now())
prediction_name = "Stock Prediction " + str(datetime.datetime.now())
# ============================================================================


user = forecastflow.User(email, password)
get_output_schema = make_prediction_schema(user, project_id, model_id)

def ff_predict(input_data):
    project = user.get_project(project_id)
    model = project.get_model(model_id)
    prediction_data = project.create_data_source(
        input_data,
        data_name,
        forecastflow.DataSourceLabel.PREDICTION
    )
    prediction = model.create_prediction(
        prediction_data,
        prediction_name
    )
    result = prediction.get_result()
    return result

続けてデータ入出力のコードを推論スクリプトの下に書き加えます
入力先のバケットと出力先のバケットをMatillionの変数に登録しておけば コピペするだけで他のモデルにも使えるので便利です

f:id:gri-blog:20200727163639p:plain
python-component-var

コードはこちら

# GCS(Google Cloud Storage)連携用
# 推論スクリプトの下部
import pandas as pd
import io
# Imports the Google Cloud client library
from google.cloud import storage


def download_blob(bucket_name, source_blob_name):
    # Instantiates a client
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    stream = blob.download_as_string()
    
    return stream


def upload_to_storage(dataframe):
  # Instantiates a client
  storage_client = storage.Client()

  bucket = storage_client.get_bucket(BucketNameTo)

  # save file
  time_now = datetime.datetime.now()
  time_formatted = datetime.datetime.strftime(time_now, "%Y%m%d_%H%M%S")
  blob = bucket.blob(f"Result-{time_formatted}.csv")
  
  blob.upload_from_string(dataframe.to_csv(index=False), content_type='application/octet-stream')


# get data from gcs
stream = download_blob(BucketNameFrom, PredictionFileName)
df = pd.read_csv(io.StringIO(stream.decode("utf-8")))

# predict
result = ff_predict(df) # dataframe

# upload to gcs
upload_to_storage(result)

コンポーネント作成したらフローを作成して実行します

f:id:gri-blog:20200727163837p:plain
forecastflow-orchestration

結果が無事gcsに出力されました

f:id:gri-blog:20200727164138p:plain
gcs

MatillionのETLを使えば、データ加工 -> BigQuery -> ForecastFlowで予測するまでを自動化することでできます
下図はForecastFlow連携の一例になります

f:id:gri-blog:20200727170021p:plain
etl-example

終わりに

今回はForecastFlowの推論APIを用いたMatillion連携を簡単に紹介致しました
訓練API -> ETL(Matilliion)ツール -> 推論APIといった使い方をすることで、機械学習の自動化も可能になりそうです

訓練APIについてはこちら
PythonからForecastFlowで訓練と推論を行う方法

Higashi