今回は、Snowflakeでデータをリアルタイムで取り込む方法として「Snowpipe」をご紹介します。データ分析を行う上で、最新の情報をすぐに活用できることってとても大事ですよね。そこで登場するのがSnowpipeです。
Snowpipeは、Snowflakeというクラウドデータウェアハウスが提供するサービスで、データを自動的に取り込む仕組みを簡単に設定できます。
例えば、AWS S3やGCPのStorageやAzure Blob Storageなどにデータをアップロードすると、Snowpipeがそれを即座に検出してSnowflakeのデータベースにロードしてくれます。とても便利ですよね?
データがS3などに自動で溜まっていくケースは多いですから、ユースケースとしては多そうです。

Snowpipeの使い方
次は具体的に手を動かしながら、どのように設定していくのか一緒に見ていきましょう。今回はAWS S3を例にして紹介していきます。
前提条件
まず、以下のものが揃っていることを確認してください。
-
Snowflakeアカウント:Snowpipeを利用するために必要です。
-
AWS S3バケット:データを保存する場所として使用します。
-
適切な権限:SnowflakeとAWS S3を連携するための権限が必要です。
ステップ1:テーブルの準備
まず、データを格納するためのデータベースとテーブルを作成します。模擬的に以下のSQLコマンドでテーブルを作成することにします。
1 2 3 4 5 6 7 8 |
USE MY_DB; CREATE OR REPLACE TABLE users ( userid INT, name STRING, age INT, city STRING ); |
-
USE MY_DB;:MY_DBというデータベースを使用します。存在しない場合は作成してください。
-
CREATE OR REPLACE TABLE users:usersというテーブルを作成します。userid, name, age, cityの4つのカラムがあります。
ステップ2:ファイルフォーマットの作成
次に、S3から取り込むデータのフォーマットを定義します。ここではCSV形式を使用します。
1 |
CREATE OR REPLACE FILE FORMAT my_csv_format TYPE = 'CSV' SKIP_HEADER = 1; |
-
CREATE OR REPLACE FILE FORMAT my_csv_format:my_csv_formatという名前のファイルフォーマットを作成します。
-
TYPE = 'CSV':データ形式はCSVです。
-
SKIP_HEADER = 1:CSVファイルの最初の行はヘッダーとしてスキップします。
ステップ3:ステージの作成
Snowpipeがデータを取り込むためのステージ(データの保存場所)を設定します。ここではAWS S3を使用します。
1 2 3 |
CREATE OR REPLACE STAGE S3_STAGE_SNOWPIPE URL = 's3://my-snowflake/snowpipe' STORAGE_INTEGRATION = S3_INT ; |
-
CREATE OR REPLACE STAGE S3_STAGE_SNOWPIPE:S3_STAGE_SNOWPIPEという名前のステージを作成します。
-
URL = 's3://my-snowflake/snowpipe':今回データが保存されているS3バケットのURLです。
-
STORAGE_INTEGRATION = S3_INT:SnowflakeとS3を連携するためのストレージ統合設定です。事前にS3_INTを設定しておく必要があります。
ステップ4:S3の設定
SnowpipeがS3に新しいデータがアップロードされたことを検出できるように、S3側で設定を行います。具体的には、S3バケットにイベント通知を設定し、Snowpipeにデータが到着したことを知らせます。この部分はAWSマネジメントコンソールから設定しますが、詳細な手順は公式ドキュメントを参考にしてください。
ステップ5:Snowpipeの作成
実際にデータを自動的に取り込むためのパイプを作成します。
1 |
CREATE OR REPLACE pipe user_pipe auto_ingest = TRUE AS COPY INTO USERS FROM @s3_stage_snowpipe FILE_FORMAT = my_csv_format; |
説明:
-
CREATE OR REPLACE pipe user_pipe:user_pipeという名前のパイプを作成します。
-
auto_ingest = TRUE:自動的にデータを取り込む設定です。これにより、S3に新しいファイルが追加されると自動的にデータがロードされます。
-
COPY INTO USERS FROM @s3_stage_snowpipe FILE_FORMAT = my_csv_format:S3_STAGE_SNOWPIPEステージからusersテーブルにデータをコピーします。my_csv_formatで定義したCSVフォーマットを使用します。
ステップ6:パイプの確認
パイプが正しく作成されたか確認します。
1 |
DESC PIPE USER_PIPE; |
説明:
-
DESC PIPE USER_PIPE;:user_pipeの詳細情報を表示します。ここでステータスや設定内容を確認できます。
ステップ7:データの確認
最後に、データが正しくロードされたかを確認します。
1 |
SELECT * FROM USERS; |
説明:
-
SELECT * FROM USERS;:usersテーブルの全データを表示します。S3にアップロードしたCSVファイルの内容が反映されていることを確認してください。
まとめ
以上が、Snowpipeを使用してAWS S3と連携し、ストリームデータを自動的にロードする手順です。Snowpipeを活用することで、データの取り込みが自動化され、リアルタイムで最新の情報を分析できるようになります。初めての方でも、ステップバイステップで設定を進めることで簡単に導入できると思います。ぜひ、あなたのデータ活用にSnowpipeを取り入れてみてくださいね!
✅ Youtubeメンバーシップ始めました。有料公開している学習動画を見ることができます。 ぜひご参加ください!
https://www.youtube.com/channel/UC9E8K7BCH3LP5ttbA7S4DOg/join
✅ お仕事のご依頼はこちらまでお願いします info@mikage-tech.com