目次
1.はじめに
皆さんこんにちは。
今回は、CSVファイルを取込む方法について説明していきます。
2.前提要件
本書を実施する際の前提条件は
- ストレージアカウントが作成済みであること
- Access connectorが作成済みであること
- ストレージアカウントのBLOB データ共同作成者ロールをAccess connectorに付与していること
- Metastoreが作成済みであること
- Unity CatalogのDatabricks ワークスペースを有効にしていること
- ストレージの認証情報を作成済みであること
- Unity Catalog 外部ロケーションを作成済みであること
- 操作ユーザはUnity Catalogのテーブル更新権限があること
以上が作成済み、指定済みであることを前提としています。
3.CSVファイルをテーブルに格納する
3-1.ストレージアカウントへcustomer.csv ファイルをインポートする
① ストレージア カウントの画面でコンテナーを作成し、CSVファイルを格納します。
② サイドバーで「コンテナー」を選択し、「コンテナー」をクリックします。
③ コンテナーに「container-name」等の名前を付けます。
④「作成」ボタンをクリックします。
⑤ コンテナーの一覧が表示されます。
作成したコンテナーをクリックします。
⑥ 次に、フォルダーを作成し、CSVファイルを格納します。
⑦「ディレクトリーの追加」をクリックします。
⑧ フォルダーに「raw」等の名前を付けます。
⑨「保存」ボタンをクリックします。
⑩ フォルダーの一覧が表示されます。
作成したフォルダーをクリックします。
⑪ 作成したフォルダーにて 「アップロード」をクリックし、「ファイルを参照」をクリックします。
⑫ ダイアログでローカル コンピューターのファイルをアップロードできます。customer.csv ファイルを選択してください。
⑬ ファイルを選択して、「アップロード」ボタンをクリックします。
ストレージ アカウントの「raw」フォルダーにcustomer.csv ファイルがアップロードされました。
3-2.ノートブックを使用して、CSVファイルをテーブルに格納する
ワークスペースでノートブックを作成してCSVファイルを読み込みます。次に、 Unity Catalog でテーブルを作成して、CSVファイルをマージします。
① Databricksワークスペースをログインします。
② Databricks画面でサイドバーから「 Data science & Engineering 」をクリックします。
③サイドバーから「新規」をクリックして「ノートブック」を選択します。
④ダイアログの「名前」項目でノートブック名を入力します。(例: convert-notebook )
⑤「デフォルト言語」項目で「Python」を選択します。
⑥「クラスター」項目で存在するクラスターを選択します。
⑦「作成」ボタンをクリックすると、ノートブックが表示されます。
⑧ 次に、CSVファイルを読み取って、「demotempview」一時的なビューにデータを入力します。
以下のコードをノートブックの2行目にコピーして、 「Shift」+「Enter」キーを押します。
※ 文法
1 2 3 |
CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] view_name USING format OPTIONS (option1 "value1", option2 "value2", ...) |
※ 例
1 2 3 |
CREATE or replace TEMPORARY VIEW demotempview USING CSV OPTIONS (path "abfss://<your_container_name>@<your_storage_account_name>.dfs.core.windows.net/<your_folder_name>/", header "true", mode "FAILFAST") |
⑨ データをビューに格納したかを確認できるように、以下のコマンドを3行目にコピーして「Shift」+「Enter」キーを押します。
実行後、結果が画像のように表示されます。
CSVファイル を読み取ることができます。
※ 文法
1 |
SELECT * FROM view_name |
※ 例
1 |
select * from demotempview |
⑩ 次に、 Unity Catalog でカタログ、スキーマとテーブルを作成して、 「demotempview」 ビューのデータをマージします。
以下のコマンドを4行目にコピーして「Shift」+「Enter」キーを押します。「MyCatalog」カタログ、「MySchema」スキーマ及び、「MyTable」テーブルが作成されます。
※ 文法
1 2 3 |
CREATE CATALOG [IF NOT EXISTS] catalog_name; CREATE SCHEMA [IF NOT EXISTS] catalog_name.schema_name; CREATE TABLE [IF NOT EXISTS] catalog_name.schema_name.table_name ( column_specification ) |
※ 例
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE CATALOG IF NOT EXISTS MyCatalog; CREATE SCHEMA IF NOT EXISTS MyCatalog.MySchema; CREATE TABLE IF NOT EXISTS MyCatalog.MySchema.MyTable( customer_id bigint, tax_id int, tax_code int, customer_name string, state string, city string, postcode float, street string ) |
⑪ テーブルの作成後、以下のコマンドを実行して、 「demotempview」ビューのデータをテーブルにマージします。
以下のコマンドを5行目にコピーして「Shift」+「Enter」キーを押します。
実行後、画像のように9819行がテーブルにマージされました。
※ 文法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
MERGE INTO target_table_name [target_alias] USING source_table_reference [source_alias] ON merge_condition { WHEN MATCHED [ AND matched_condition ] THEN matched_action | WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action | WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...] matched_action { DELETE | UPDATE SET * | UPDATE SET { column = { expr | DEFAULT } } [, ...] } not_matched_action { INSERT * | INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] ) not_matched_by_source_action { DELETE | UPDATE SET { column = { expr | DEFAULT } } [, ...] } |
※ 例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
MERGE INTO MyCatalog.MySchema.MyTable a USING demotempview b ON a.customer_id = b.customer_id WHEN MATCHED THEN UPDATE SET customer_id = b.customer_id, tax_id = b.tax_id, tax_code = b.tax_code, customer_name = b.customer_name, state = b.state, city = b.city, postcode = b.postcode, street = b.street WHEN NOT MATCHED THEN INSERT (customer_id,tax_id,tax_code, customer_name,state,city, postcode,street) VALUES (b.customer_id,b.tax_id,b.tax_code, b.customer_name,b.state,b.city, b.postcode,b.street) |
⑫ データをテーブルにマージしたかを確認できるように以下のコマンドを実行します。
以下のコマンドを6行目にコピーして「Shift」+「Enter」キーを押します。
実行後、結果が画像のように表示されます。
※ 文法
1 |
SELECT * FROM catalog_name.schema_name.table_name; |
※ 例
1 |
SELECT * FROM MyCatalog.MySchema.MyTable; |
4.まとめ
これでCSVファイルを取込む方法について説明しました。
今回の記事が少しでもDatabricksを知るきっかけや、業務のご参考になれば幸いです。
日商エレクトロニクスでは、Azure Databricksの環境構築パッケージを用意しています。
Azure DatabricksやAzure活用、マイクロソフト製品の活用についてご相談事がありましたらぜひお問い合わせください!
・Azure Databricks連載シリーズはこちら
この記事を読んだ方へのオススメコンテンツはこちら
この記事を書いた人
- phongcq