目次
1. はじめに
皆さんこんにちは。
今回は、Azure DatabricksでAuto Loaderの利用方法について説明していきます。
第1回: Auto Loaderの概要、自動取込方法を使用してみる
第2回: Auto Loader と Delta Live Table を組み合わせて使用してみる (今回)
第3回: Auto LoaderのSchema inferenceとEvolutionの機能を使用してみる
Delta Live Tableの概要について、このリンクをご参照ください。
2. 前提条件
- Databrickワークスペースが必要です。
- Azure ストレージアカウントが必要です。
- ワークスペースのUnity Catalogが有効です。
- Unity Catalogの更新権限が必要です。
- ストレージ資格情報が必要です。
3. Auto Loader を Delta Live Table と組み合わせて使用方法
自動ローダーとDelta Live Tablesは、増加し続けるデータがクラウドストレージに到着するとすぐに読み込むように設計されています。そのため、Databricksでは、クラウドオブジェクトストレージからのほとんどのデータインジェストタスクについて、Delta Live Tablesで自動ローダーを使用することを推奨します。
- コスト削減のためのコンピューティング インフラストラクチャの自動スケーリング
- 期待(エクスペクテーション)を伴うデータ品質チェック
- 自動スキーマ推論の処理
- メトリクスによるイベントログの監視
DLT で自動ローダーを使用する場合、スキーマやチェックポイントの場所は DLT パイプラインによって自動的に管理されるため、指定する必要はありません。
今回の例では、ADLS(Azure Data Lake Storage)からCSVデータをロードし、読み込んだデータをUnity Catalogのテーブルに保存する方法について説明します。このプロセスは、Delta Live Tableと組み合わせて自動ローダーを使用します。この方法により、データのロードと保存が効率的に行えます。
今回デモのデータセット[2020_Yellow_Taxi_Trip_Data]には、タクシー運転手によって報告された乗車と降車の日時、乗車と降車の地点、移動距離、運賃、支払いタイプ、乗客数のフィールドが含まれています。
このデータセットから、ブロンズ、シルバー、ゴールドの3つ層を通して変換とクリーンデータを実行します。
乗車地点ごとの合計運賃を計算するために、ゴールドレヤーに対応する最終テーブルが作成されました。
次のスキーマが含まれています。
|– PuLocationId: Integer
|– Passenger_Pay: double
※ PuLocationId: 乗車地点のID
※ Passenger_Pay: 合計運賃
【シナリオ】
1. ストレージアカウントからの「yellow trip dataset」データセットを取り込み 、「tripdata_raw」テーブルを作成します。
2. 「tripdata_raw」テーブルをクレンジングし、「tripdata_clean」テーブルを作成します。
3. 「 top_pages」テーブルを作成し、「 tripdata_clean」テーブルから通行料金の合計額を計算します。
4. データ準備
まず、サンプルデータをAzure ストレージアカウントにアップロードして、次の手順を実施します。
① ストレージアカウントの画面から >「コンテナー」-> 「コンテナー」をクリックします。
②「autoloader-container」というコンテナー名を付けます。「作成」をクリックします。
③ コンテナーでフォルダーを作成するには、「ディレクトリーの追加」をクリックします。
④「 raw 」というフォルダー名を付けます。「保存」をクリックします。
⑤「raw」フォルダに移動して、以下のCSVファイルをアップロードします。
次に、外部ロケーションをDatabricksで作成します。
5. 外部ロケーション作成
① Azure Databricksの画面から >「カタログ」>「外部データ」>「外部ロケーション」>「ロケーションを作成」をクリックします。
②「外部ロケーション名」> 「autoloader-location」という外部ロケーション名を付けます。「ストレージ資格情報」> ストレージ資格情報を選択します。
③「URL」> 以下の内容を入力します。
文法:
1 |
abfss://<container>@<storage-account>.dfs.core.windows.net/<folder> |
例:
1 |
abfss://autoloader-container@dtbstraccadb.dfs.core.windows.net/raw |
④「作成」をクリックします。
6. ノートブック作成
次に、ストレージアカウントから作成したCSVファイルのデータを読み込むにはAuto loaderを利用してノートブックを作成します。
① Databricksワークスペース画面から >「新規」>「ノートブック」をクリックします。
②「demo-notebook」というノートブック名を付けます。SQLという言語を選択します。
③ 生データを格納する[tripdata_raw]テーブルを作成するには、以下のコマンドをノートブックにコピーします。
文法:
1 |
CREATE OR REFRESH STREAMING TABLE tripdata_raw AS SELECT * FROM cloud_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<folder>", "csv") |
※ <container>: 作成したコンテナー名
※ <storage-account>: ストレージアカウント名
※ <folder>: 作成したフォルダー名
例:
1 2 |
CREATE OR REFRESH STREAMING TABLE tripdata_raw AS SELECT * FROM cloud_files("abfss://autoloader-container@dtbstraccadb.dfs.core.windows.net/raw", "csv") |
④ 次に、以下のコマンドをノートブックに追加して「tripdata_raw」をクレンジングし、「tripdata_clean」を作成します。
文法:
1 2 3 4 5 6 7 |
CREATE OR REFRESH STREAMING TABLE tripdata_clean( <constraint1>, <constraint2>,… ) TBLPROPERTIES("quality" = "silver") AS SELECT <column1> ,<column2> ... FROM STREAM(LIVE.<table_name>) |
※ <constraint1>…: テーブルの制約
※ <column1>: コラム名
※ <table_name>: テーブル名
例:
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE OR REFRESH STREAMING TABLE tripdata_clean( CONSTRAINT valid_tollsAmount EXPECT(Tolls_Amount IS NOT NULL), CONSTRAINT valid_extra EXPECT(Extra > 0) ) COMMENT"yellow_tripdata with cleaned-up datatypes / column names and quality expectations." TBLPROPERTIES("quality" = "silver") AS SELECT CAST(puLocationID AS INT) AS PuLocationID, extra AS Extra, rateCodeID AS RateCodeID, tolls_amount AS Tolls_Amount FROM STREAM(LIVE.tripdata_raw) |
⑤ 以下のコマンドをノートブックに追加して、「top_pages」テーブルを作成して乗車地ごとに通行料金の合計額を計算します。
文法:
1 2 3 4 5 6 |
CREATE LIVE TABLE top_pages TBLPROPERTIES("quality" = "gold") AS SELECT <column1> ,<column2> ... FROM LIVE.<table_name> GROUP BY <condition> |
例:
1 2 3 4 5 6 7 8 9 10 |
CREATE LIVE TABLE top_pages COMMENT"A list of the top 50 pages by passenger pay" TBLPROPERTIES("quality" = "gold") AS SELECT PuLocationId, SUM(Tolls_Amount) as Passenger_Pay FROM LIVE.tripdata_clean GROUP BY PuLocationId ORDER BY 2 DESC LIMIT 50 |
7. Delta live tableパイプライン作成
ノートブックの作成完了後、Delta live tableのパイプラインを作成して、ノートブックを実施します。
① Databricksワークスペース画面から >「パイプラインを作成」をクリックします。
②「パイプライン名」>「demo-pipeline」というパイプライン名を付けます。
「製品エディション」:「Advanced」
「パイプラインモード」:「トリガー」
「パス」> 作成したノートブックへのパスを選択します。
「ストレージオプション」:「Unity Catalog」
「カタログ」:カタログ名
「スキーマ」:スキーマ名
他の項目をデフォルトのままにします。「作成」をクリックします。
③「開始」をクリックします。
④ 結果は以下の画像通りです。パイプラインが実施完了します。CSVデータがストレージアカウントから読み込まれ、Unity Catalogに保存されました。
8. まとめ
今回は、Auto Loader を Delta Live Table と組み合わせて使用方法について説明しました。
第1回: Auto Loaderの概要、自動取込方法を使用してみる
第2回: Auto Loader を Delta Live Table と組み合わせて使用してみる (今回)
第3回: Auto LoaderのSchema inferenceとEvolutionの機能について説明する
今回の記事が少しでもDatabricksを知るきっかけや、業務のご参考になれば幸いです。
日商エレクトロニクスでは、Azure Databricksの環境構築パッケージを用意しています。
Azure DatabricksやAzure活用、マイクロソフト製品の活用についてご相談事がありましたらぜひお問い合わせください!
Azure Databricks連載記事のまとめはこちら
お問い合わせはこちら
この記事を読んだ方へのオススメコンテンツはこちら
この記事を書いた人
- quanna