1
/
5

【TECH BLOG】BigQueryで時を遡って過去のテーブルを再構成する

はじめに

こんにちは、データシステム部データ基盤ブロックSREの纐纈です。

本記事では、過去に遡ってBigQueryのデータを参照する方法(以下、タイムトラベルと呼びます)をご紹介します。また、この機能はBigQueryが提供している、変更または削除されたデータにアクセスするタイムトラベルとは異なることをご了承ください。

開発背景

この機能は過去データを日次スナップショットより細かい粒度で見たい、また障害対応時に障害発生前などピンポイントで時間指定して参照したいという要望を受け、開発することになりました。

さらに、BigQueryからこの機能を作るのに役立ちそうなテーブル関数という機能がリリースされたのもきっかけとなりました。



テーブル関数とは、事前にパラメータを使って定義したクエリをエイリアスのようにテーブルとして保存して、そのテーブルに対して関数を実行するかのようにクエリを書ける機能です。例えば、以下のようにテーブル関数を定義するとします。

CREATE TABLE FUNCTIONS `some_dataset.foo_records_by_name`(name_param STRING) AS
SELECT * FROM `some_dataset.foo` WHERE name = name_param

その上で、このようなクエリを実行するとします。

SELECT * FROM `foo_records_by_name`('bar')

すると、事前に定義したテーブル関数がパラメータを代入して、結果としてこちらのクエリが実行されます。

SELECT * FROM `some_dataset.foo` WHERE name = 'bar'

短いクエリだと受けられる恩恵が少ないですが、長いクエリに対しては重宝される機能かと思います。

タイムトラベルの機能

SELECT * FROM `<table ID>`('2021-01-01')

テーブル関数を使用して上のようにクエリを打つと、指定した日時の状態のデータを参照できます。

実際に実行されているクエリは、こちらです。クエリ内のpast_timeはTIMESTAMP型で、テーブル関数から渡されるパラメータです。

WITH
snapshot_validation AS (
SELECT
'<base_table>' AS table_id,
MAX(creation_time) AS snapshot_validation_time,
FROM
`<snapshot_dataset>.INFORMATION_SCHEMA.TABLES`
WHERE
REGEXP_CONTAINS( table_name, CONCAT('<base_table>','_',FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(past_time, INTERVAL 1 DAY), "Asia/Tokyo") ))),
streaming_data_validation AS (
SELECT
table_id,
min_bigquery_insert_time AS streaming_validation_time
FROM
`<changetracking validation table ID>`
WHERE
dataset_id = '<changetracking_dataset>'
AND table_id = '<changetracking_table>'),
validation AS (
SELECT
a.table_id,
snapshot_validation_time,
streaming_validation_time
FROM
snapshot_validation AS a
INNER JOIN
streaming_data_validation AS b
ON
a.table_id = b.table_id),
nearest_snapshot AS (
SELECT
*,
CONCAT(${join(",", primary_key)}) AS primary_key
FROM
`<snapshot_dataset>.<base_table>_*` AS snapshot_table
WHERE
_TABLE_SUFFIX IN (FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(past_time, INTERVAL 1 DAY), "Asia/Tokyo"))),
changetracking_for_two_days_until_specified_time AS (
SELECT * FROM (
SELECT
*,
id AS primary_key
FROM
`changetracking_dataset.changetracking_table`
WHERE
bigquery_insert_time BETWEEN TIMESTAMP_SUB(past_time, INTERVAL 2 DAY) AND past_time
) AS changetracking
),
changetracking_latest_version_key_group AS (
SELECT
primary_key,
MAX(CAST(changetrack_ver AS int64)) AS changetrack_ver,
MAX(changetrack_start_time) AS changetrack_start_time
FROM
changetracking_for_two_days_until_specified_time
GROUP BY
primary_key ),
changetracking_latest_version AS (
SELECT
a.*
FROM
changetracking_for_two_days_until_specified_time AS a
INNER JOIN
changetracking_latest_version_key_group AS b
ON
a.primary_key = b.primary_key
AND a.changetrack_ver = b.changetrack_ver ),
changetracking_without_duplication AS (
SELECT
*
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY primary_key ORDER BY primary_key) AS row_number
FROM
changetracking_latest_version)
WHERE
row_number = 1 ),
nearest_snapshot_except_what_changetracking_included AS (
SELECT
*
FROM
nearest_snapshot
WHERE
primary_key NOT IN (
SELECT
primary_key
FROM
streaming_diff ) )
SELECT
... -- columns in the base table (cannot use *) to align with changetracking
FROM
nearest_snapshot_except_what_changetracking_included
UNION ALL
SELECT
... -- columns in the base table (cannot use *) since changetracking_without_duplication has more columns
FROM
changetracking_without_duplication
WHERE
changetrack_type != 'D'
AND
IF
(snapshot_validation_time IS NOT NULL,
TRUE,
ERROR( CONCAT("Cannot time-travel since snapshot data does not exist for the specified time." ) ))
AND
IF
(past_time > streaming_validation_time,
TRUE,
ERROR( CONCAT("Cannot time-travel since recording changetracking had not started at the time. check nearest daily snapshot directly. Specify time after: ", streaming_validation_time)))

このクエリの中では、パタメータに渡された日時をもとに以下の内容を実行しています。

  • 指定された日のテーブルコピーがあるかチェック
  • 差分データがあるかチェック
  • 日次で取っているテーブルのコピーからデータを取得する
  • テーブルコピーに記録されている最終時刻と指定した時間までの差分データを変更履歴ログから摘出する
  • 組み合わせて指定された時刻のテーブルの状態を再現する

そして、そのテーブルに対して元々のSELECT文のクエリを実行するという仕組みになっています。



使われているテーブルについて、簡単に説明します。

  • base_table:元となるテーブルで、このテーブルの過去データを見ることがタイムトラベル機能の目的です。
  • daily_snapshot:base_tableの日次テーブルコピー。データ基盤を構築するために、日次バッチによってBigQueryにテーブルデータを転送しており、その際にその日時点でのテーブルのコピーを取っています。データ転送用の日次バッチは日本時間0時に動かしていますが、必ずしも0時時点のデータとは限りません。テーブル定義はbase_tableと全く同じです。
  • change_tracking:base_tableの変更追跡ログ。これはSQL ServerのChange trackingという機能によって保存されているテーブルです。データベース上のテーブルに対してinsert, update, deleteの変更が入る度に、変更に関する情報が記録されています。

changetrackingのテーブルは、base_tableのカラムと変更追跡のカラム、また転送バッチが実行された時刻のカラムによって定義されています。この機能に使われている追加のカラムのみ、説明します。

続きはこちら

株式会社ZOZO's job postings

Weekly ranking

Show other rankings
If this story triggered your interest, go ahead and visit them to learn more