最終更新日:

CSVを自動でBigQueryに取り込む方法 – 後編

CSVを自動でBigQueryに取り込む方法 – 後編

株式会社jeki Data-Driven Lab データマネジメント部の俊です。

CSVのDB化について、前回記事ではGoogle Cloud StorageとBigQueryのみを用いたシンプルな構成をまとめました。

後編である今回は、より発展的なアーキテクチャについてご紹介します。

この記事を読んでほしい人(再掲)

– CSVをDBに取り込みたい人
– GCPの基礎的なアーキテクチャを知りたい人

アーキテクチャ概要

サービス紹介(再掲)

今回用いるGoogle Cloud Platformのサービスは以下の3つです。

サービス名役割AWSでいうと?
Google Cloud Storage様々なファイルをクラウドに保存できる
オブジェクトストレージサービス
Amazon S3
Google Cloud Functions指定したイベントをきっかけに関数を実行する
クラウドコンピューティングサービス
AWS Lambda
Google Cloud BigQueryデータ分析プロダクトに最適化された
データウェアハウスサービス
Amazon Redshift

個人的な話ではありますが、GCPに触れてこなかった筆者としては前職で使っていたAWSと同様のサービスに置き換えて考えてみると非常にわかりやすかったです。

連携イメージ(一部再掲)

アーキテクチャとしては大きく分けて2パターンあります。

– Cloud Functionsを使わない方法
– Cloud Functionsを使う方法(後編でご紹介)

CSVファイルの中身がすでに整形済みでそのままBigQueryに保存できる場合には、Cloud Functionsは導入不要です。このケースではCloud StorageからBigQueryへ直接データを取り込みます。

一方でCSVデータの整形・加工が必要な場合には、Cloud Functionsを介してデータクレンジングを行ってからBigQueryにデータを取り込みます。データの自動更新にも対応しているため、汎用性が高いことが特徴です。

今回は本題であるCloud Functionsを使う方法についてのご紹介となります。

活用例

– CSVデータを加工して保存したい
– データの更新が頻繁に発生する
– 更新フローを自動化したい

といった状況での活用が想定されます。

Cloud Functionsを用いるとコーディングが必要になるため開発コストがかさみますが、その一方で汎用性が非常に高くなります。

こちらは当社でも頻繁に採用するアーキテクチャです。

お客様のデータを分析するにあたって、当社GCP環境下でデータマートを構築することを目的としてデータのつなぎ込みを行っております。

なお本記事ではコードは全てPython 3にてご紹介いたします。

その他.NET, Go, Java, Node.js, Rubyが使用可能です(2024年6月時点)

取り込み方法

GCS設定

GCSトップページから「作成」をクリック

バケットに名前を付けます。以降の設定はデフォルトのままで「作成」をクリック。

バケットが作成されました。

CSVファイルは後ほどアップロードしますので、先にBigQueryとCloud Functionsの設定を進めます。

BigQueryの設定

BigQuery コンソールにアクセスします。

任意のプロジェクトIDを選択し、右の三点ボタンから「データセットを作成」をクリック。

データセット名を入力し、他はデフォルトのままで「データセットを作成」をクリック。

今回はデータセット名をsample_datasetとしました。

作成されたデータセットの右の三点ボタンから「テーブルを作成」をクリック。

テーブル名をsample_tableに設定し、その他はデフォルトのまま「テーブルを作成」をクリック。

sample_datasetデータセット配下にsample_tableテーブルが作成されました。

BigQueryの設定は以上です。

Cloud Functionsの設定

Cloud Functionsコンソールにアクセスします。

「ファンクションを作成」をクリック。

トリガーのタイプを「Cloud Storage」に変更し、バケット欄右の「参照」から、先ほど作成したGCSのバケットを選択します。

これで、GCSへのファイルアップロード完了をトリガーとしてCloud Functionsが起動されるようになります。

設定が完了したら「次へ」をクリック。

– ランタイム→Python 3.12
– エントリポイント→load_data※
– コード→main.pyとrequirements.txtに下記コードを入力

※エントリポイントは任意の文字列で問題ありませんが、main.py内で定義した関数名と同じにする必要があります。

main.py

import pandas as pd
from io import BytesIO
from google.cloud import bigquery, storage

def load_data(data, context):
    content_type = data['contentType']
    if content_type != 'text/csv':
        print('Not supported file type: {}'.format(content_type))
        return
    # CSV以外は弾く

    project_id = 'プロジェクトID'
    bucket_name = data['bucket']
    file_name = data['name']
    table_id = 'プロジェクトID.sample_dataset.sample_table'
    # 「プロジェクトID」は環境に合わせてテキスト変更

    bq_client = bigquery.Client(project_id)
    gcs_client = storage.Client(project_id)
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.blob(file_name)

    content = blob.download_as_bytes()
    df = pd.read_csv(BytesIO(content), dtype={'id': 'object'}, parse_dates=['purchase_dt'])

    df['total'] = df['price'] * df['quantity']
    # データ加工箇所(新規カラムの追加)

    table_object = bigquery.TableReference.from_string(table_id)
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField('id', 'STRING', mode='REQUIRED'),
            bigquery.SchemaField('purchase_dt', 'DATETIME', mode='NULLABLE'),
            bigquery.SchemaField('product', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('price', 'INT64', mode='NULLABLE'),
            bigquery.SchemaField('quantity', 'INT64', mode='NULLABLE'),
            bigquery.SchemaField('user', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('total', 'INT64', mode='NULLABLE'),
        ],
        write_disposition='WRITE_APPEND',
    )
    bq_client.load_table_from_dataframe(df, table_object, job_config=job_config).result()
    print("Job finished.")
    return
functions-framework==3.*
pandas==1.4.1
google-cloud-bigquery==3.20.1
google-cloud-storage==2.16.0
pyarrow==15.0.2

全ての入力が完了したら「デプロイ」をクリックします。

デプロイ完了には時間がかかるため気長に待ちましょう。

以上でGCS、Cloud Functions、BigQueryの接続設定が完了しました。

動作確認

CSVファイル

ECサイトの購入データを模したサンプルCSVを2つ使用します。

運用イメージとしては、基幹システムの日次バッチ処理で前日分の購入履歴がCSV出力されており、それらをGCSにアップロードすることでBigQueryへ都度反映し、蓄積するというものです。

id,purchase_dt,product,price,quantity,user
0001,2024/1/1 9:00,商品A,1000,2,JDDL一郎
0002,2024/1/1 12:30,商品A,1000,1,JDDL二郎
0003,2024/1/1 18:00,商品B,300,10,JDDL三郎
id,purchase_dt,product,price,quantity,user
0004,2024/1/2 8:00,商品B,300,7,JDDL四郎
0005,2024/1/2 15:00,商品C,1500,2,JDDL五郎
0006,2024/1/2 16:30,商品C,1500,4,JDDL二郎

※いずれも文字コードはUTF-8

ファイルアップロード

GCSコンソールに移ります。

ドラッグアンドドロップでsample_transaction_20240101.csvファイルをアップロードしましょう。

アップロードが完了したら、Cloud Functionsの画面から先ほど作成したファンクションを選択し、「ログ」をクリック。

最下部までスクロールし、ログが更新されて最終的に”Job finished.”と表示されることを確認します。

長くても数分程度でCloud Functionsの動作は終了するはずです。

最後にBigQueryにて接続したテーブルの「プレビュー」を表示し、先ほどアップロードしたCSVのデータが反映されていることを確認します。

おめでとうございます!無事にデータの取り込みが完了しました!

もう1つのファイルを再度GCSにアップロードすれば、BigQueryにレコードが追加されて合計6行になることが確認できるかと思います。

【補足】テーブルの作成・更新について

bigquery.LoadJobConfigオブジェクトのwrite_dispositionパラメータを変更することで、テーブルに対する挙動を指定できます。

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField('id', 'STRING', mode='REQUIRED'),
        bigquery.SchemaField('purchase_dt', 'DATETIME', mode='NULLABLE'),
        bigquery.SchemaField('product', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('price', 'INT64', mode='NULLABLE'),
        bigquery.SchemaField('quantity', 'INT64', mode='NULLABLE'),
        bigquery.SchemaField('user', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('total', 'INT64', mode='NULLABLE'),
    ],
    write_disposition='ここを変更します',
)

新規テーブル作成

write_disposition = 'WRITE_EMPTY'

指定したテーブルを作成します。

ただし同名のテーブルがすでに存在する場合はエラーとしてプログラムを停止し、テーブルは作成されません。

テーブルの初回作成時などの限定的なシチュエーションで使用します。

SQLのCREATE TABLE IF NOT EXISTSに相当します。

テーブル上書き

write_disposition = 'WRITE_TRUNCATE'

指定したテーブル全体を上書きします。

ただし指定したテーブルが存在しない場合はテーブルを作成します。

データの定期更新において、過去分から直近までの全量データが毎回アップロードされるようなシチュエーションの場合はこちらが最適です。

SQLのCREATE OR REPLACE TABLEに相当します。

レコード追加

write_disposition = 'WRITE_APPEND'

指定したテーブルに対してレコードを追加します。

ただし指定したテーブルが存在しない場合はエラーとしてプログラムを停止し、テーブルは作成されません。

今回のように直近データのみが毎回アップロードされる場合にはこちらを用いるのが最適です。

SQLのINSERTに相当します。

さいごに

CSVをDB化するにあたって、GCPコンソール上で完結するシンプルな方法をご紹介しました。

今回はCSVファイルのアップロードは手動で行いましたが、GCS APIを用いることでファイルのアップロードからBigQueryへの格納までを完全自動で実装することも可能です。

当社ではデータ分析支援の一環として、GCPを用いたデータ基盤構築も行っております。ご興味のある方はこちらよりお気軽にお問い合わせください。

この記事を書いた人:

エンジニアリングディレクター
ウェブライター、システムエンジニア、データエンジニアを経験。前職では賃貸物件データの分析を行い、不動産業界初となる指標の確立に貢献した。 2023年11月よりjeki Data-Driven Labにジョイン。現在ではデータ分析提案とBIツールによる可視化のサポートを担当している。