AWS DMS を利用したトランザクションデータの Delta Lake への移行

AWS DMS と Delta Lakes を使用した CDC のビッグデータパイプラインの簡素化

Databricks の Notebook を試してみる

備考:この記事に関連するブログ Efficient Upserts into Data Lakes with Databricks Delta (Databricks の Delta Lake によるデータレイクへの効率的なアップサート)では、MERGE コマンドを使用した効率的なアップサートや削除の方法を解説しています。そちらもご一読ください。

データベースからデータレイクへのデータ移行における課題

Large enterprises are moving transactional data from scattered data marts in heterogeneous locations to a centralized data lake. Business data is increasingly being consolidated in a data lake to eliminate silos, gain insights and build AI data products. However, building data lakes from a wide variety of continuously changing transactional databases and keeping data lakes up to date is extremely complex and can be an operational nightmare.

ベンダー固有の変更データキャプチャ(Change Data Capture: CDC)ツールや Apache SparkTMの JDBC による直接のインジェストを用いた従来のソリューションは、以下のような一般的なシナリオでは実用的ではありません。

(a) データソースは通常、オンプレミスのサーバーとクラウドに分散しており、PostgreSQL、Oracle、MySQL などのデータベースから数十のデータソースと数千のテーブルが存在する。

(b) データレイクに取り込まれた変更データのビジネス SLA は 15 分以内。

(c) データは、データベース接続のための所有権やネットワークトポロジーがさまざまな状態で発生している。

上記のようなシナリオでは、Delta Lake と AWS Database Migration Services (DMS)を使用してデータレイクを構築し、過去のデータやリアルタイムのトランザクションデータを移行することが優れたソリューションであることがわかります。このブログ記事では、AWS Database Migration Service(AWS DMS)と Delta Lake を利用して、複数の RDBMS データソースからデータを取り込み、信頼性の高いデータレイクを構築する簡単なプロセスを説明します。データレイク構築後は、Databricks の統合分析プラットフォームを利用して、リアルタイムおよび履歴データに対して高度な分析を行うことができます。

Delta Lake とは

Delta Lake は、データレイクに信頼性をもたらすオープンソースのストレージレイヤーです。Delta Lake は、ACID トランザクション、スケーラブルなメタデータ処理、統合ストリーミングとバッチデータ処理を提供します。Delta Lake は既存のデータレイク上で実行され、Apache Spark API と完全な互換性があります。

具体的には、Delta Lake は以下を提供します。

  • Spark での ACID トランザクション:シリアル化可能な分離レベルにより、読者が矛盾したデータを見ることはありません。
  • スケーラブルなメタデータ処理:Spark の分散処理能力を活用し、数十億のファイルを持つペタバイトスケールのテーブルの全てのメタデータを容易に処理します。
  • ストリーミングとバッチの統一:Delta Lake のテーブルは、バッチテーブルであると同時に、ストリーミングのソースとシンクでもあります。ストリーミングデータのインジェスト、バッチ履歴データのバックフィル、対話型クエリなどの全てが、すぐに機能します。
  • スキーマエンフォースメント:スキーマのバリエーションを自動的に処理し、インジェスト時の不良レコードの挿入を防ぎます。
  • タイムトラベル:データのバージョニングにより、ロールバック、履歴の完全な監査証跡、再現可能な機械学習実験が可能です。
  • Databricks 上のマネージド Delta Lake でのアップサート (オープンソースの Delta Lakeでも近日公開予定):MERGE コマンドにより、データレイクのレコードのアップサートや削除を効率的に行うことができます。MERGE は、多くの一般的なデータパイプラインの構築方法を劇的に簡素化します。非効率的にパーティション全体を書き換えていた複雑な複数ステップの処理プロセスは、全てシンプルな MERGE クエリで置き換えることができます。このきめ細かな更新機能により、AWS DMSの変更ログから変更データを取得するためのビッグデータパイプラインの構築が簡素化されます。

AWS Database Migration Service(DMS)とは?

AWS DMS の利用により、既存のデータの移行とデータの変更の両方において、最も広く使用されている商用およびオープンソースのデータベースから S3 にデータを移行できます。このサービスは、Oracle から Amazon Aurora、Microsoft SQL Server から MySQL など、異なるデータベースプラットフォームからの移行をサポートしています。また、AWS DMS を利用すると、サポートされているソースから Amazon S3 にデータをストリーミングすることで、高可用性を持って継続的にデータを複製し、データベースを統合できます。

AWS Database Migration Services(DMS)を使った Delta Lake へのデータの移行

MySQL データベース上に構築された "person " テーブルがあり、図のようなカラムを持つアプリケーションのユーザーレコードのデータを保持しているとします。このテーブルは、人が移動したり、新しい人が追加されたり、既存の人が削除されたりするたびに更新されます。このテーブルを AWS DMS を使って S3 にインジェストした後、Delta Lake にロードし、データレイクへのインジェストとトランザクションデータストアとの同期を維持する例を紹介します。 MySQL でこのテーブルに対する変更データの取り込みを実行し、AWS DMS を使って S3 に変更をレプリケートし、Delta Lake を使って構築したデータレイクに簡単にマージします。

アーキテクチャ

このソリューションでは、DMS を使ってデータソースを Amazon S3 に取り込み、初期の取り込みと継続的な更新を行います。S3 から Delta Lake のテーブルに初期データをロードし、Delta Lake のアップサート機能を使って Delta Lake のテーブルに変更を取り込みます。元のソースと同期している Delta Lake テーブルでアナリティクスを実行し、ビジネスインサイトを得ます。次の図は、提案するソリューションを示しています。

データが Delta Lake 上で利用できるようになった後は、ダッシュボードや BI ツールを使ってインテリジェントなレポートを生成し、インサイトを簡単に得ることができます。また、さらに一歩進んで、Databricks で ML モデルを構築するためにデータを使用することもできます。

ソリューションの詳細

ここでは、MySQL エンジンを搭載した RDS データベースを作成し、いくつかのデータを読み込みます。実際には、ソースとなるデータベースは 1 つだけではないかもしれませんが、この記事で説明しているプロセスは同じようなものです。

チュートリアル:「Web サーバーと Amazon RDS データベースの作成」の手順に従って、ソースデータベースを作成します。チュートリアルのメインページからのリンクを使用して、特定のデータベースに接続してデータをロードする方法を確認します。詳細については、「MySQL Database Engine を実行する DB インスタンスの作成」を参照してください。

作成したセキュリティグループをメモして、全ての RDS インスタンスを関連付けます。これを「TestRDSSecurityGroup」と呼びます。その後、RDS Instances ダッシュボードにデータベースが表示されるようになります。

対象となる S3 バケットの設定

以下のように 2 つの S3 バケットを設定します。 1 つはバッチの初期ロード用、もう 1 つは増分変更データのキャプチャ用です。

次のステップでは、構成をシンプルにするために、非本番用として「Publicly Accessible」を選択します。また、シンプルにするために、RDS インスタンスを配置したのと同じ VPC を選択し、アクセスを許可するセキュリティグループのリストに TestRDSSecurityGroup を含めます。

DMS のセットアップ

AWS Database Migration Service のブログ記事にあるように、DMS は簡単にセットアップできます。以下のステップバイステップのアプローチを取ることができます。

  1. レプリケーションインスタンスを作成します。
  2. 前のステップでセットアップしたソースデータベースとターゲット S3 バケットのエンドポイントを作成します。
  3. 各ソースをターゲットに同期させるタスクを作成します。
エンドポイントの作成

DMS コンソールで、「エンドポイント」、「エンドポイントの作成」を選択します。MySQL RDS データベースを表すエンドポイントを設定する必要があります。また、前のステップで作成した S3 バケットを指定して、ターゲットのエンドポイントを作成する必要があります。設定後、エンドポイントは以下のスクリーンショットのようになります。

2 つのタスクを作成し、データ移行を開始

ターゲットの Amazon S3 バケット内のテーブルを移行するために、DMS を使用できます。

DMSコンソールで、[タスク]、[タスクの作成]を選択します。次のスクリーンショットに示すように、フィールドに記入します。

  1. Migration Task for Initial Load(初期ロードの移行タスク)

  1. CDC の移行作業

ソースが RDS MySQL で、データの移行と継続的な変更のレプリケーションを選択した場合、ビンログの保持を有効にする必要があることに注意してください。他のエンジンには他の要件があり、DMS はそれに応じてプロンプトを表示します。この特定のケースでは、次のコマンドを実行します。

call mysql.rds_set_configuration('binlogretention hours', 24);

両方のタスクが正常に完了すると、「タスク」タブは以下のようになります。

データ移行が順調に進んでいることを確認する
  1. S3 バケットに初期データが読み込まれていることを確認します。

サンプルレコード:



2. ソースデータベースのpersonテーブルに変更を加え、その変更が S3 に移行されたことを確認します。
INSERT into  person(id,first_name,last_name,email,gender,dob,address,city,state) values ('1001','Arun','Pamulapati','cadhamsrs@umich.edu','Female','1959-05-03','4604Delaware Junction','Gastonia','NC');
UPDATE person set state = 'MD' where id=1000;
DELETE from person  where id = 998;
UPDATE person set state = 'CA' where id=1000;


変更ログ:

初期移行データのDelta Lakeへのロード

初期ロードファイルから Delta Lake のテーブルを作成することになりますが、Spark SQL コードを使用して、フォーマットを parquet、csv、json などから delta に変更できます。全てのファイルタイプについて、ファイルを DataFrame に読み込み、delta 形式で書き出します。

personDF = spark.read.option("Header",True).option("InferSchema",True).csv("/mnt/%s/arun/person/" % initialoadMountName)
personDF.write.format("delta").save("/delta/person")spark.sql("CREATETABLE person USING DELTA LOCATION '/delta/person/'")
増分データを Delta Lake にマージする

Deltaの merge into 機能を使って、変更ログを Delta Lake に取り込むことになります。

personChangesDF = (spark.read.csv("dbfs:/mnt/%s/arun/person" % changesMountName,                         inferSchema=True, header=True,                         
ignoreLeadingWhiteSpace=True,
                        ignoreTrailingWhiteSpace=True))
personChangesDF.registerTempTable("person_changes")
MERGE INTO person target
USING
(SELECT Op,latest_changes.id,first_name,last_name,email,gender,dob,address,city,state,create_date,last_update
  FROM person_changes latest_changes
 INNER JOIN (
   SELECT id,  max(last_update) AS MaxDate
   FROM person_changes
   GROUP BY id
) cm ON latest_changes.id = cm.id AND latest_changes.last_update= cm.MaxDate) as source
ON source.id == target.id
WHEN MATCHED AND source.Op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED  THEN INSERT *

補足:

1 )Databricks の ジョブ 機能を使用して、SLA に基づいて CDC のマージをスケジュールし、マージが成功した後に cdc S3 バケットからアーカイブバケットに変更ログを移動することで、マージのペイロードを最新かつ少量に保つことができます。Databricks プラットフォームにおけるジョブとは、Notebook や JAR を即時またはスケジュールに沿って実行する方法です。ジョブの作成と実行は、UI、CLI、およびジョブ API を使って行うことができます。同様に、ジョブの実行結果は、UI、CLI、APIへの問い合わせ、E メールによるアラートなどで監視できます。

2 )大規模なテーブルの初期ロードを効率的に行うには、 Spark ネイティブな並列処理 を JDBC リードを使用して利用するか、 DMS のベストプラクティスを採用して、 AWS Database Migration Service(AWS DMS)を最も効果的に使用することをお勧めします。

まとめ ― シンプルなデータパイプラインと高信頼性 Delta Lake の構築

この記事では、Delta Lake に対し、AWS DMS を使って RDBMS データソースから変更を取り込み、インクリメンタルにキャプチャすることで、シンプルな構成と最小限のコードで、簡単で信頼性が高く、経済的なデータレイクを構築する方法を紹介しました。また、Databricks Notebook を使用して、データセット上のデータビジュアライゼーションを作成することも可能です。

Databricks の Notebook を試してみる

 

Databricks 無料トライアル 使ってみる

ご登録