データサイエンス

ForecastFlowをMatillionから使う-予測編

クラウドネイティブのETLツールMatillionを使って、データ加工からForecastFlowの予測までの処理を解説します
クラウドサービスはGCPを利用しております

モデルの作成

ForecastFlowでモデル作成と訓練を行ってください

Matillion Componentの作成

bashコンポーネントを作成します

f:id:gri-blog:20200727145030p:plain
bash-component

ここには依存パッケージをインストールするコードを書きます

# pipのアップグレード
python3 -m pip install --upgrade pip --user
# pandas, forecastflow, gcs sdkをインストール
python3 -m pip install pandas --user
python3 -m pip install forecastflow --user
python3 -m pip install google-cloud-storage --user

次にpythonコンポーネントを作成します

f:id:gri-blog:20200727161702p:plain
python-component

ここには推論APIのスクリプトとGCSへのデータ入出力スクリプトを書きます
推論APIスクリプトの取得については下記ページの”スクリプトの用意”を参考してください
ForecastFlow を Tableau Prep から使う方法 – 予測編

先に推論スクリプトをForecastFlowからコピペします
email, password, project_id, model_idは適宜編集してください

# 推論スクリプト
# Read the blog (Japanese) to figure out how to use this script.
# https://gri-blog.hatenablog.com/entry/2019/12/09/162019
import forecastflow
from forecastflow.tabpy_support import make_prediction_schema
import datetime
# ============================================================================
# [Required] Fill your ForecastFlow parameters
email = "email"
password = "password"
project_id = "project_id"
model_id = "model_id"
# ============================================================================
# ============================================================================
# [Optional] Change display names
data_name = "Test Stock" + str(datetime.datetime.now())
prediction_name = "Stock Prediction " + str(datetime.datetime.now())
# ============================================================================
user = forecastflow.User(email, password)
get_output_schema = make_prediction_schema(user, project_id, model_id)
def ff_predict(input_data):
project = user.get_project(project_id)
model = project.get_model(model_id)
prediction_data = project.create_data_source(
input_data,
data_name,
forecastflow.DataSourceLabel.PREDICTION
)
prediction = model.create_prediction(
prediction_data,
prediction_name
)
result = prediction.get_result()
return result

続けてデータ入出力のコードを推論スクリプトの下に書き加えます
入力先のバケットと出力先のバケットをMatillionの変数に登録しておけば
コピペするだけで他のモデルにも使えるので便利です

f:id:gri-blog:20200727163639p:plain
python-component-var

コードはこちら

# GCS(Google Cloud Storage)連携用
# 推論スクリプトの下部
import pandas as pd
import io
# Imports the Google Cloud client library
from google.cloud import storage
def download_blob(bucket_name, source_blob_name):
# Instantiates a client
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
stream = blob.download_as_string()
return stream
def upload_to_storage(dataframe):
# Instantiates a client
storage_client = storage.Client()
bucket = storage_client.get_bucket(BucketNameTo)
# save file
time_now = datetime.datetime.now()
time_formatted = datetime.datetime.strftime(time_now, "%Y%m%d_%H%M%S")
blob = bucket.blob(f"Result-{time_formatted}.csv")
blob.upload_from_string(dataframe.to_csv(index=False), content_type='application/octet-stream')
# get data from gcs
stream = download_blob(BucketNameFrom, PredictionFileName)
df = pd.read_csv(io.StringIO(stream.decode("utf-8")))
# predict
result = ff_predict(df) # dataframe
# upload to gcs
upload_to_storage(result)

コンポーネント作成したらフローを作成して実行します

f:id:gri-blog:20200727163837p:plain
forecastflow-orchestration

結果が無事gcsに出力されました

f:id:gri-blog:20200727164138p:plain
gcs

MatillionのETLを使えば、データ加工 -> BigQuery -> ForecastFlowで予測するまでを自動化することでできます
下図はForecastFlow連携の一例になります

f:id:gri-blog:20200727170021p:plain
etl-example

終わりに

今回はForecastFlowの推論APIを用いたMatillion連携を簡単に紹介致しました
訓練API -> ETL(Matilliion)ツール -> 推論APIといった使い方をすることで、機械学習の自動化も可能になりそうです

訓練APIについてはこちら
PythonからForecastFlowで訓練と推論を行う方法

Higashi