dltを用いてOpenWeatherMap APIからSnowflakeにデータを格納してみた

初めに

以前執筆したブログで、dltを用いてOpenWeatherMap APIからDuckDBにデータを格納する方法を紹介しました。今回は、その続編としてSnowflakeにデータを格納する方法を紹介します。dltや当該APIの説明は前回のブログをご覧ください。

Snowflakeとは

Snowflakeとは、クラウド型のデータプラットフォームの一つです。データの格納、加工、連携など幅広い機能を持ち、それでいてクラウド型のためサーバーの導入・運用コストがかからない点が特長です。 データ活用に関わる主なシステムには、データレイク、データウェアハウス、データマートの3つがあります。それぞれの概要や特長は以下の通りです。
システム名 概要 特長
データレイク システムから収集したデータを元データと同じ形式で格納するデータベース データの構造や種類を問わずに一元管理が可能
データウェアハウス システム間でデータの構造やフォーマット等を統一した上で格納するデータベース 様々なシステム上のデータを集約するため、横断的なデータ分析が可能
データマート データの用途、利用目的に応じて必要なもののみを抽出・加工した上で格納するデータベース 個別の分析に特化した構造のため、素早い分析が可能。
これら3つのシステムが以下のような流れでつながり、最終的なデータの活用へと至ります。
Snowflakeを用いることで、これらのシステムを物理的に異なるデータベースで分けるのではなく、単一のプラットフォームの中でスキーマやテーブルの設計によって論理的に分割することが可能になり、管理がシンプルで楽になります。

前準備

データベースの作成

Snowflakeのアカウント(トライアル可)を作成し、SQL Worksheetから以下のSQLを実行します。
CREATE DATABASE dlt_weather;
CREATE ROLE DLT_LOADER_ROLE;
GRANT ROLE DLT_LOADER_ROLE TO USER <username>;
GRANT USAGE ON DATABASE dlt_weather TO DLT_LOADER_ROLE;
GRANT CREATE SCHEMA ON DATABASE dlt_weather TO ROLE DLT_LOADER_ROLE;
GRANT USAGE ON WAREHOUSE COMPUTE_WH TO DLT_LOADER_ROLE;
GRANT ALL PRIVILEGES ON FUTURE SCHEMAS IN DATABASE dlt_weather TO DLT_LOADER_ROLE;
GRANT ALL PRIVILEGES ON FUTURE TABLES IN DATABASE dlt_weather TO DLT_LOADER_ROLE;
成功すると、DLT_WEATHERというデータベースが作成されます。

パイプラインの構築

初めに、以下のパッケージをインストールします。
pip install "dlt[snowflake]"
次に、任意の場所にディレクトリを作成し、その直下で以下のコマンドを実行します。
dlt init <pipeline_name> snowflake
実行後はディレクトリ内に2つのファイルが作られます。
<pipeline_name>/  
│── <pipeline_name>_pipeline.py  
│── requirements.txt
ここで、<pipeline_name>_pipeline.pyはdltを使って様々なデータソースからSnowflakeにデータをロードするサンプルスクリプト、requirements.txtはPythonのパッケージ管理を指定するファイルです。 同ディレクトリ直下で以下のコマンドを実行することで、必要なパッケージがインストールされます。
pip install -r requirements.txt
最後に、.dlt/secrets.tomlの中身を先ほど実行したSQLに沿って編集します(dltは隠しフォルダのため、Macの場合は「command 」+「ShIft」+「.」で表示する)
[destination.snowflake.credentials]
database = "DLT_WEATHER" # please set me up!
password = "password" # please set me up!
username = "username" # please set me up!
host = "Account Identifier" # please set me up!
warehouse = "COMPUTE_WH" # please set me up!
role = "DLT_LOADER_ROLE" # please set me up!
passwordとusernameは最初にアカウントを作成したときに設定したものを、hostはAccount Details内のAccount Identifierを入力します。

前準備はこれで以上です。

データの取得と格納

以下のコードを実行します。前半ではAPIを実行し指定した都市の天気データを取得しています。後半では格納するパイプラインを設定し実行しています。前回から少しコードを変更し、都道府県名をまとめたExcelファイルを読み込むことで1回の実行で同時に複数の天気情報を取得しています。
import dlt
import pandas as pd
from dlt.sources.helpers import requests

df = pd.read_excel("prefecture.xlsx")  # A列にidがある
city_list = df["City_id"].tolist()  # "City_id"列をリスト化

API_key = "hoge"

weather_data = []

# 各都道府県についてAPIを実行 
for city in city_list: 
    url = f"https://api.openweathermap.org/data/2.5/weather?id={city}&appid={API_key}" 
    response = requests.get(url) 

    if response.status_code == 200: 
        weather_data.append(response.json()) # JSONデータをリストに追加 

    else: 
        print(f"Error fetching data for {city}: {response.status_code}")

weather_df = pd.DataFrame(weather_data)

pipeline = dlt.pipeline( 
  pipeline_name="weather_snowflake", 
  destination="snowflake", 
  dataset_name="DLT_WEATHER", 
)

#実際のコードでは@を_に変えてください
load_info = pipeline.run(weather_data, table@name="weather")

実行結果

前準備で作成したデータベース:DLT_WEATHERの中にテーブル:WEATHERが追加されており、47都道府県のデータが格納されています。

Tableauとの連携

以下の接続情報を入力します。サーバーはAccount Details内記載のURLを、役割・ウェアハウスは前準備のSQLで設定したものを、ユーザー名・パスワードはアカウント作成時に設定したものを入力します。
都道府県ごとの気温を色付きマップで表しました。

最後に

今回はOpenWeahterMapのAPIから取得したデータをdltを用いてSnowflakeに格納し、Tableauと連携して可視化しました。前回紹介したローカルのDuckDBに格納する方法と比較して、Snowflakeへの接続情報を追加する以外はほとんど同じ手順で実行可能です。また、Google BigQuery等の他のデータベースへ格納する際も今回と同様の手順で実行可能であり、非常に使いやすいライブラリだと感じました。 ShtockData

お問い合わせフォーム

お問い合わせ項目を選択してください