※時間がねえ、方法だけ教えてくれよって方は本題まで飛んでください
背景
第27回すいすい会(2022/2/16)でお話しさせていただいた内容の補足です。
当日は第25回すいすい会で紹介した「架空の通販会社の休眠顧客掘り起こし」をケースとして設定し、Matillionで集約したデータをどうのように自動機械学習ツール(ForecastFlow)へと連携するのかをデモとしてお見せいたしました。
したのですが、時間の都合上、個々のコンポーネントでの処理詳細はすっ飛ばしてしまったので、改めてこの場で詳しく説明していきたいと思います。
ちなみに第25回すいすい会の資料はこちら(サブスク型ビジネスにおける休眠顧客掘り起こしに対して、なぜ機械学習が有用なのかが弊社の経験談を交えて語られています)
この記事の位置付け
さて、実はForecastFlowとMatillion(Python)を連携させるやり方は、弊社先人による以下のような素晴らしい記事によって過去にも紹介されております。
ForecastFlowをMatillionから使う-予測編
PythonからForecastFlowで訓練と推論を行う方法
が、如何せん日々アップデートを繰り返して強くなっているForecastFlow、執筆当時のコードでは動かない可能性があります。
そこで、現時点での連携テンプレートを改めて記述しておこうと思った次第です。
訓練編と予測編を分割するのはなぜなのか?
予測編といってるのはこちらの記事のことですね

第25回・第27回はもちろん、過去のすいすい会でもたびたびご説明させていただいておりますので、詳細は割愛させていたただきますが、理由は「目的が全く違うケースが多いから」になります。
そうです、全然違います。異世界食堂と相席食堂くらい違います。
従って、その目的の違いをハッキリ認識しつつコードを追うことができるように、読者の方々の混乱を招かないように、記事を分けます
ちなみに、Python APIとしてはそこまで劇的に変わることはありません。非常に洗練された抽象化がなされているForecastFlow APIの賜物ですね(いけません、ついステマを……もはやステルスしてないまである)。
本題:MatillionとForecastFlowを連携する(訓練編)
全体像
この辺りはすいすい会とその資料である程度詳しく説明しましたが、念の為。
大前提として、MatillionでBigQueryにデータを集約し、ID・ターゲット・特徴量コミコミの一枚表テーブル作成までは完了しているものとします。
なので全体像としては以下のような感じです。(1と2が今回説明するところ)
(0)データ収集・一枚表作成
(1)BigQueryの一枚表テーブルをCloud Storageにエクスポート
(2)ForecastFlow APIによるモデル作成

なお以下で説明するフローではJob Variablesならびに、Environment Variablesで以下の変数を定義しています。実際に動かす際は、ここだけご自身のものに書き換えていただければ、コンポーネントの中身自体はコピペで行けます。
変数名 | 説明 |
---|---|
GCP_PROJECT | GCPのプロジェクト名 |
BQ_DATASET_ID | (BigQuery)訓練用テーブルを格納しているデータセット名 |
BQ_TRAIN_TABLE_NAME | (BigQuery)訓練用テーブル名 |
BQ_LOCATION | (BigQuery)訓練用テーブルのロケーション |
GCS_BUCKET_NAME | (Cloud Storage)移行先のバケット名 |
GCS_PREFIX | (Cloud Storage)移行先のディレクトリ階層 |
FF_EMAIL | (ForecastFlow)アカウント情報:メールアドレス |
FF_PASSWORD | (ForecastFlow)アカウント情報:パスワード |
FF_PROJECT_ID | (ForecastFlow)プロジェクトID |
FF_TEAM_ID | (ForecastFlow)チームID |
FF_TRAIN_PRIMARY_ID | (ForecastFlow)訓練用テーブルのプライマリキー列名 |
FF_TRAIN_TARGET | (ForecastFlow)モデル訓練時の予測ターゲット列名 |
FF_TRAIN_TEST_FRAC | (ForecastFlow)モデル訓練時に検証用へと回す行数の割合 |
ちなみに、ForecastFlowのチームIDとプロジェクトIDはWeb上のURLを見ればすぐにわかります。xxxがチームID、yyyがプロジェクトIDです

(1)BigQueryの一枚表テーブルをCloud Storageにエクスポート
早速ですが、コードはこのような感じです。
基本は
- 移行元BQ上のテーブル指定
- →移行先GCS上のファイルパス指定
それだけです。簡単ですね。
from google.cloud import bigquery
bq_client = bigquery.Client()
# BigQueryのテーブル情報を取得するためのクラス
class BqTable:
def __init__(self, project, dataset, table_name):
self.project = project
self.dataset = dataset
self.table_name = table_name
def table_ref(self):
dataset_ref = bigquery.DatasetReference(self.project, self.dataset)
table_ref = dataset_ref.table(self.table_name)
return table_ref
# GCSにファイル出力するためのクラス
class GCS:
def __init__(self, bq_client, bucket_name, prefix):
self.bq_client = bq_client
self.bucket_name = bucket_name
self.prefix = prefix
def export_table(self, bq_table, file_name, bq_location):
destination_uri = f'gs://{self.bucket_name}/{self.prefix}/{file_name}'
extract_job = self.bq_client.extract_table(bq_table.table_ref(), destination_uri, location=bq_location)
extract_job.result()
# 対象テーブルをcsvでGCSに出力
table_to_export = BqTable(GCP_PROJECT, BQ_DATASET_ID, BQ_TRAIN_TABLE_NAME)
GCS(bq_client, GCS_BUCKET_NAME, GCS_PREFIX).export_table(
table_to_export, BQ_TRAIN_TABLE_NAME + '.csv', BQ_LOCATION
)
(2)ForecastFlow APIによるモデル作成
こちらもちゃちゃっとコードを貼り付けます。
流れだけ簡単にさらっておくと
- ForecastFlowのユーザー情報を認証
- →GCS上のファイルをForecastFlowに連携
- →モデル訓練の設定
- →訓練実行
ごちゃごちゃ書いとりますが、中身を解き明かさないと気が済まないぜ、って方以外はコピペしてご自身のアカウント情報をJob Variablesに設定すれば動きます。ご安心を。
from google.cloud import storage
from forecastflow import User, Project, DataSourceLabel, FileType, ClassifierTrainingSettings
from forecastflow.satellite.google.cloud import storage as ff_storage
from forecastflow.enums import ClassificationMetrics
# ForecastFlowのユーザー情報
ff_user = User(FF_EMAIL, FF_PASSWORD)
ff_project = Project(ff_user, FF_PROJECT_ID, FF_TEAM_ID)
# データソースをForecastFlowにアップロード
gcs_client = storage.Client()
uri_import = f'gs://{GCS_BUCKET_NAME}/{GCS_PREFIX}/{BQ_TRAIN_TABLE_NAME+".csv"}'
data_source = ff_storage.import_data_source(
uri=uri_import,
project=ff_project,
name=BQ_TRAIN_TABLE_NAME,
label=DataSourceLabel.TRAIN, # データソースのラベル
filetype=FileType.CSV, # データソースのファイル形式 [CSV, TSV, PARQUET]
description=None, # 説明文(省略可能)
skip_profile=False, #データプロファイリングをスキップするかどうか
client=gcs_client # GCSクライアント
)
data_source.wait_until_done()
# モデル訓練の設定
training_settings = ClassifierTrainingSettings(
FF_TRAIN_TARGET,
ClassificationMetrics.F1,
primary_id=FF_TRAIN_PRIMARY_ID
)
# モデル訓練実行
data_source.create_model(
training_settings,
name=FF_MODEL_NAME,
test_frac=float(FF_TRAIN_TEST_FRAC)
)
以上です、ご興味がある方は予測編にお進みください。

本記事は分類問題の扱いに関してなので、回帰問題を扱い方はこちらを参照ください。
