eBook
Databricks、Spark、Delta Lake のワークロードを最適化する包括的ガイド
序章
このドキュメントは、Databricks、Apache Spark™、Delta Lake の重要なベストプラクティスと最適化手法のほとんど(全てではないにしても)を 1 か所に集約することを目的としています。全てのデータエンジニアとデータアーキテクトは、最適化されたコスト効率の高い効率的なデータパイプラインを設計・開発する際のガイドとして活用できます。コストは後付けではなく、プロジェクト開始当初から最も重要な非機能要件の一つとして扱われるべきです。そのため、本書で取り上げたベストプラクティスは全て、データパイプラインの開発と本番運用をとおして考慮すべきものです。本書はいくつかのセクションに分かれており、それぞれが特定の問題文に焦点を当て、それに対する可能なあらゆる解決策を提供するために深く掘り下げています。
序章
このドキュメントは、Databricks、Apache Spark™、Delta Lake の重要なベストプラクティスと最適化手法のほとんど(全てではないにしても)を 1 か所に集約することを目的としています。全てのデータエンジニアとデータアーキテクトは、最適化されたコスト効率の高い効率的なデータパイプラインを設計・開発する際のガイドとして活用できます。コストは後付けではなく、プロジェクト開始当初から最も重要な非機能要件の一つとして扱われるべきです。そのため、本書で取り上げたベストプラクティスは全て、データパイプラインの開発と本番運用をとおして考慮すべきものです。本書はいくつかのセクションに分かれており、それぞれが特定の問題文に焦点を当て、それに対する可能なあらゆる解決策を提供するために深く掘り下げています。
Delta Lake - レイクハウス形式
Delta Lake は、データレイクに信頼性、セキュリティ、パフォーマンスを提供するオープンフォーマットのストレージレイヤーです。これにより、Spark、PrestoDB、Flink、Trino、Hive などのコンピューティングエンジンと、Python、SQL、Scala、Java、Rust、Ruby 用の API を使用してレイクハウスアーキテクチャを構築できます。Parquet、Avro、ORC などの他のオープンフォーマットに比べて、次のような多くの利点があります。
- Deltaは、オープンソースの Parquet フォーマットの上に、ACID トランザクション、高性能、その他多くの機能を保証するプロトコルです。Deltaはオープンソースで、完全なプロトコルはここにあります。
- Delta Lake は ACID トランザクションをサポートし、バッチとストリーミングのパラダイムを統合することで、増分データの削除/挿入/更新トランザクションを簡素化します。
- Delta は、タイムトラベルを可能にし、テーブルのポイントインタイムスナップショットバージョンを読むことができます。
- また、Delta は、効率的なデータレイアウト、インデックス作成、データスキッピング、キャッシングなど、多くのパフォーマンス強化機能を備えています。
したがって、全てのメリットを享受するために、Delta デフォルトのデータレイクストレージフォーマットとして使用することを強くお勧めします。Databricks DBR 8.x 以上では、Delta Lake がデフォルトのフォーマットです。
基礎となるデータレイアウト
Delta テーブルの下には、データを格納する Parquet ファイルがあります。また、_delta_log
というサブディレクトリがあり、Parquet ファイルのすぐ隣に Delta のトランザクションログが格納されています。これらのパーケットファイルのサイズは、クエリのパフォーマンスにとって非常に重要です。
極小ファイル問題は、ビッグデータの世界ではよく知られた問題です。テーブルの基礎となる極小ファイルが多すぎると、極小ファイルを開いたり閉じたりするのに時間がかかるため、読み取りレイテンシが悪化します。
この問題を避けるには、ファイルサイズを 16 MB~1 GB の間に設定するのが最適です。これは、作業負荷と特定の使用ケースに基づいてケースバイケースで設定可能です。
Databricks Runtime 11.3 以上を使用して、Unity Catalog(Databricks のデータカタログ) にカタログされたマネージド Delta テーブルを作成する場合、Databricks が自動チューニング機能の一部としてバックグラウンドで自動的にこのタスクを実行するため、基礎となるファイルサイズの最適化や、Delta テーブルのターゲットファイルサイズの設定について心配する必要はありません。将来的には、この機能は外部テーブルでも利用できるようになる予定です。
それ以外の場合は、特定の Spark ジョブを使用してファイルを手動で圧縮して適切なファイルサイズを取得する必要がありますが、Delta テーブルにはすぐに使えるビンパッキング(圧縮)機能が備わっているため、これは必要ありません。Delta では、以下に詳述する 2 つの方法でビンパッキングを行うことができます。
1. 最適化と Z オーダー
OPTIMIZE はファイルを圧縮して最大 1 GB のファイルサイズにします。これは設定可能です。このコマンドは基本的に、設定したサイズ(設定されていない場合はデフォルトで 1 GB)にファイルのサイズを調整しようとします。OPTIMIZE コマンドと ZORDER を組み合わせることもできます。ZORDER は、選択した列に基づいてデータを物理的にソートしたり、同じ場所に配置したりします。
- Z オーダーには、常にカーディナリティの高いカラム(例えば、orders テーブルの
customer_id
)を選択します。通常、日付カラムはカーディナリティが低いカラムなので、Z オーダーに使用すべきではありません。パーティショニングカラムとして使用する方が適しています(ただし、常にテーブルをパーティショニングする必要はありません。詳細は後述のパーティショニングセクションを参照してください)。Z オーダーでは、フィルター句や下流クエリの結合キーとして最も頻繁に使用されるカラムを選択します。 - 列数が多すぎると Z オーダー効果が低下するため、4 列以上は使用しないでください。
- OPTIMIZE コマンドは、ジョブ自体の一部としてではなく、必ず別のジョブクラスタで実行してください。そうしないと、対応するジョブの SLA に影響する可能性があります
- OPTIMIZE コマンドは計算負荷が高いため、計算最適化インスタンスファミリーを推奨します。
- OPTIMIZE(ZORDER の有無にかかわらず)は、下流のクエリパフォーマンスを向上させるために、ファイルレイアウトを維持するために、1 日 1 回(または週 1 回、または要件に応じて)など、定期的に実行する必要があります。
2. 自動最適化
Auto optimize は、その名の通り、Delta テーブルへの個々の書き込み中に、小さなファイルを自動的にコンパクト化する機能で、デフォルトでは、128 MB のファイルサイズを達成しようとします。これには 2 つの機能があります。
1. 書き込みの最適化
Optimize Write は、実際のデータに基づいて Apache Spark のパーティションサイズを動的に最適化し、各テーブルパーティションに対して 128 MB のファイルの書き出しを試みます。同じ Spark ジョブの中で行われます。
2. オートコンパクト
Spark ジョブの完了後、Auto Compact は新しいジョブを起動し、ファイルをさらに圧縮して128 MB のファイルサイズを達成できるかどうかを確認します。
- 適切なファイルサイズを得るために、手動 OPTIMIZE(ZORDER の有無にかかわらず)コマンドをまだ利用していない場合は、常に optimizeWrite テーブルプロパティを有効にしてください。最適化された書き込みは、ターゲットテーブルのパーティショニング構造に従ってデータのシャッフルを必要とすることに留意してください。このシャッフルには当然、追加コストが発生します。しかし、書き込み時のスループット向上がシャッフルのコストをペイする可能性があります。そうでない場合でも、データをクエリする際のスループット向上により、この機能の価値はあるはずです。
- ジョブが SLA に厳密に縛られていない場合は、autoCompact テーブルプロパティを有効にして、Delta ビンパッキングをさらに活用することもできます。
3. パーティショニング
パーティショニングは、パーティションカラムをフィルターとして提供したり、パーティションカラムで結合したり、パーティションカラムで集約したり、パーティションカラムでマージしたりすると、スキャン時に不要なデータパーティション(サブフォルダーなど)をス キップすることができるため、クエリを高速化することができます。
- Databricks では、1TB 未満のテーブルにはパーティションを設定せず、インジェスト時のクラスタリングを自動的に有効にすることを推奨しています。この機能は、デフォルトで全てのテーブルに対して、データがインジェストされた順番に基づいてデータをクラスタリングします。
- 各パーティションのデータが少なくとも 1 GB になる場合は、カラム単位でパーティショニングできます。
- パーティションカラムには、常にカーディナリティの低いカラム(たとえば、年や日付)を選択します。
- パーティションカラムを選択する際に、Delta の生成カラム機能を利用することもできます。生成されたカラムは、Delta タブ内の他のカラムに対してユーザーが指定した関数に基づいて値が自動的に生成される特殊なタイプのカラムです。
4. ファイルサイズ調整
自動最適化(128 MB)または 最適化(1 GB)で目標とするデフォルトのファイルサイズが適切でない場合は、必要に応じて微調整できます。ターゲットファイルサイズの設定にdelta.targetFileSize
テーブルプロパティを使用すると、自動最適化と最適化は代わりに指定されたサイズになるようにビンパックします。
delta.tuneFileSizesForRewrites
テーブルプロパティをオンにすることで、ターゲットファイルサイズを手動で設定したくない場合に、このタスクを Databricks に委譲することもできます。このプロパティを true に設定すると、Databricks はワークロードに基づいてファイルサイズを自動的に調整します。例えば、Delta テーブル上で多くのマージを行う場合、マージ操作を高速化するために、ファイルは自動的に 1 GB よりもはるかに小さいサイズに調整されます。
データシャッフリング - なぜ起こるのか、どう制御するのか
データのシャッフルは、結合、集計、ウィンドウ操作などの幅広い変換の結果として発生します。ワーカーノード間でネットワークを介してデータを送信するため、高価なプロセスです。私たちは、シャッフルの効率と速度を排除または改善するために、いくつかの最適化アプローチを使用することがあります。
1. ブロードキャストハッシュ結合
データのシャッフルを完全に避けるには、 結合する 2 つのテーブルまたは DataFrame(小さい方)の一方をブロードキャストします。テーブルはドライバによってブロードキャストされ、ドライバはそれを全てのワーカーノードにコピーします。
結合を実行すると、Spark は 10 MB 未満のテーブルを自動的にブロードキャストします。ただし、以下に示すように、このしきい値を調整して、さらに大きなテーブルをブロードキャストすることもできます。
クエリ内のテーブルのいくつかが小さなテーブルであることがわかっている場合、ヒントを使用して明示的にブロードキャストするように Spark に指示できます。
Spark 3.0 以降には AQE(Adaptive Query Execution)が搭載されており、いずれかの結合側の実行時統計が適応型ブロードキャストハッシュ結合のしきい値(デフォルトでは 30 MB)よりも小さい場合に、ソートマージ結合をブロードキャストハッシュ結合(BHJ)に変換することもできます。次の構成を変更してこのしきい値を増やすこともできます。