以下のようなデータの流れで、表題の通りのことを行っていきます。
1. BigQueryのクエリをスケジュールし、定期的にクエリを投げる。
2. 1の操作が完了後、Pub/Subのトピックにパブリッシュする。
3. Pub/Subのトピックをトリガーとして、Cloud Functionsを起動し、Slackへの通知を行う。
それぞれについてもう少し詳しく見ていきます。
Pub/Subトピック
まず、Pub/Subトピックを作成します。「トピック」→「トピックを作成」を選択し、スキーマやサブスクリプション等は入力せずにIDだけ入力すれば、新たなトピックが作成されます。
クエリのスケジュール
INFORMATION_SCHEMA.TABLESを利用することで条件に合うテーブル情報を拾ってくることができます。「スケジュール」→「スケジュールされたクエリを新規作成」を選び、実行する時間帯、出力テーブルの格納先、さらには先ほど作成したPub/Subトピックを入力すれば、完了です。
Slackへの通知
今回はpythonとIncoming Webhookを利用して、Slackへの通知を行います。まずは、Incoming Webhookの設定ページを開き、投稿するチャンネルを選び、「Incoming Webhook インテグレーションの追加」をクリックします。その後、Webhook URLが表示されるので、それをコピーします。最後に以下のような処理を実行するとSlackに通知を送ることができます。
pip install slackweb
import slackweb
slack = slackweb.Slack(url="コピーしたWebhook URL")
slack.notify(text="通知内容")
Cloud Functions
作成したトピックを開き、「CLOUD FUNCTIONをトリガー」を選択します。
ランタイムにPythonを選択し、以下のように、クエリから作成されたテーブルに対し、get_table関数を使用します。
client = bigquery.Client(project="project_id")
table_id = "dataset_id.table_id"
table = client.get_table(table_id)
table.num_rowsで行数を調べて、上述のSlackへの通知文を挿入すれば、テーブルの有無を調べてSlackに通知することができます。