メインコンテンツへジャンプ

Apache Spark™ Structured Streamingでの変更データキャプチャのシンプル化

Share this post

Summary

  • 簡略化されたステートトラッキング:State Reader APIの新しいチェンジフィードとスナップショット機能により、Apache Spark™ Structured Streamingのステート変更のデバッグと分析が簡単になります。
  • 高速開発:最小限のクエリでマイクロバッチ間の状態変更を追跡し、正確な診断のための状態スナップショットを再構築します。
  • 強化されたアクセシビリティ:エンジニアとビジネスユーザーの両方の洞察を効率化し、ダッシュボードと分析ツールとのシームレスな統合を可能にします。

このブログでは、Apache Spark™ Structured StreamingのState Reader APIの新しい変更フィードとスナップショット機能について説明します。State Reader APIは、ユーザーがStructured Streamingの内部状態データにアクセスし、分析することを可能にします。読者は、新機能を活用してデバッグ、トラブルシューティング、状態変更の効率的な分析を学び、ストリーミングワークロードをスケールで容易に管理する方法を学びます。

状態変化を簡単に処理する方法

データエンジニアリングの絶えず進化する風景の中で、Apache Spark Structured Streamingは、大規模なリアルタイムデータの処理のための基盤となっています。しかし、ストリーミングワークロードが複雑さを増すにつれて、これらのシステムを開発、デバッグ、トラブルシューティングする課題も増大します。2024年3月、DatabricksはState Reader APIを導入することで、これらの課題に直接対処し、状態データとメタデータを簡単にクエリすることを可能にするという重要なステップを踏み出しました。

Databricksは、State Reader APIに大幅な強化を導入し、既存の機能をさらに強化して状態追跡と分析をより効率的に行うことができます。これらの改善は、ステートストアのチェンジログデータを活用して、標準的なChange Data Capture(CDC)形式での変更フィードを提供します。別の新機能は、チェックポイントディレクトリ内の優先スナップショットを使用してステートのビューを生成するのに役立ちます。

このブログ投稿では、これらの新機能について詳しく説明し、状態変更の追跡、データ変換の監査、状態スナップショットの再構築をいかに効率化するかを示します。変更フィードの利点は、時間経過に伴う状態値の変化を観察するためのより簡単な方法を提供することで開発を加速します。前のState Reader APIバージョンでも可能でしたが、異なる状態バージョンを反復処理し、検査するためにはより多くのコードが必要でした。今では、変更フィードを構築するためには数個のオプションだけで十分です。

開発とテストを超えて、これらの強化はアナリストのデータアクセシビリティを促進します。例えば、スケジュールされたクエリは、今では簡単にAI/BIダッシュボードのビジュアライゼーションを充実させることができ、複雑なストリーミングデータと行動可能な洞察の間のギャップを埋めることができます。

前提条件:

State Reader API Change Feedでは、差分 (delta) ベースのステートチェックポイントが有効になっていることが必要です。ここで、「delta」は「差分 (diff)」を意味し、Delta Lakeではありません。HDFSバックアップステートストアの実装は、デフォルトでデルタベースのステートチェックポイントを使用します。RocksDBベースのステートストア実装を使用する際には、チェンジログのチェックポイントを有効にするための追加のSpark設定が必要です。

State Reader APIのレビュー

基本的なステートストアフォーマットには次のオプションがあります:

  • batchId:状態ストアの値を読み取りたい対象のバッチ。指定されていない場合、デフォルトで最新のbatchIdが使用されます。
  • operatorId:ステートストアの値が求められるターゲットオペレーター。デフォルト値は0です。ストリーム内に複数のステートフルなオペレータが存在する場合、このオプションを使用して他のオペレータの状態にアクセスできます。
  • storeName:これは読み取り対象のステートストア名を表します。このオプションは、状態保持オペレータが複数の状態ストアインスタンスを使用する場合に使用されます。ストリーム-ストリーム結合のためには、storeNameまたはjoinSideのどちらか一方を指定する必要がありますが、両方を指定することはできません。
  • joinSide: このオプションは、ユーザーがストリーム-ストリーム結合から状態を読み取りたい場合に使用されます。このオプションが使用される場合、ユーザーが提供する予想されるオプション値は「right」または「left」です。

出力DataFrameスキーマには次の列が含まれます:

  • key:状態チェックポイント内の状態保持オペレータレコードのキー。
  • 値:ステートチェックポイント内のステートフルオペレータレコードの値。
  • partition_id:ステートフルオペレーターレコードを含むチェックポイントパーティション。

状態ストア形式の基本的な必須オプションは、特定のbatchIdの状態ストアに何があったかを理解するのに役立ちます。

以下の例は、statestore Sparkデータソース形式がどのようにしてステートストアデータのクエリを支援するかを示しています。私たちがuserId 8のカウント値を調査していると想像してみてください。次のセクションでレビューする新しいState Reader APIオプションが登場する前は、userId 8のデータがマイクロバッチ間でどのように変化するかを観察したい場合、さまざまなbatchIds(下の最初のセルの3行目を参照)に対して下のクエリを再実行する必要がありました。

新しいオプションが出る前は、キーの値の変化を観察するのは面倒で、複数のクエリが必要でした。これらの新しいオプションがどのように作業を簡単にするかを見てみましょう。

新しいオプションの導入

次の新しいオプションは、新しいState Reader APIの変更フィード機能の一部です:

  Option コメント
チェンジフィード
  readChangeFeed "true"を指定すると、変更フィード出力が有効になります。
  changeStartBatchId *必須変更フィードが開始するべきbatchId。
  changeEndBatchId オプション。変更フィードで使用する最後のバッチ。
スナップショット
  snapshotPartitionId snapshotStartBatchIdを使用する場合に必要。指定された場合、この特定のパーティションのみが読み取られます。
  snapshotStartBatchId snapshotPartitionIdを使用する場合に必要です。
  snapshotEndBatchIdまたはbatchId オプション。スナップショット値の生成に使用する最後のバッチ。

batchIdオプションの値に注意してください。デフォルトでは、100の歴史的なチェックポイントと関連するステートファイルが保持されます。プロパティspark.sql.streaming.minBatchesToRetainデフォルト値を上書きするために使用できます。期限切れで存在しなくなったバッチの状態データにアクセスしようとすると、次のようなエラーメッセージが表示されます:[STDS_OFFSET_LOG_UNAVAILABLE] オフセットログ92は存在しません、チェックポイントの場所:/Volumes/mycheckpoint-path。

変更フィードの例

下の例では、キーuserIdの値8に対する変更を観察するために変更フィードを使用しています。change_typeフィールドは、開発中、デバッグ中、または本番データの問題を調査する際に役立つことがあります。変更フィードデータを使用すると、キーの値がいくつかのマイクロバッチでどのように変化したかをすばやく確認できます。下の例では、ステートキーにウィンドウが含まれている場合、partition_idがどのように変更されたかを確認できます。

スナップショットの例

Apache Sparkのフォールトトレランスにより、マイクロバッチが計画され(オフセットがチェックポイントの場所に書き込まれ)、コミットが完了する(そしてステートデータがチェックポイントの場所と同期する)ため、ステートストアの破損はほとんどありません。しかし、人間のエラーやバグは常に可能性として存在します。State Reader APIのスナップショット機能は、変更ログデータから状態を再構築し、後続のスナップショットファイルをバイパスするための便利なツールとなることがあります。この機能は、スナップショットファイルが存在するバッチID(snapshotStartBatchIdオプションを介して)を開始点として必要とします。snapshotStartBatchIdバッチIdから始めて、State Reader APIのスナップショット機能は、開始バッチIdからsnapshotEndBatchIdオプションで指定されたバッチIdまでのステートの画像を構築します。

RocksDBステートストアを使用している場合、基礎となるファイル構造は次のようになります:

バッチ1800の状態の画像を構築するために、1740.zipのスナップショット状態の開始スナップショットを使用して、次のようなコードを使用します:

チェックポイントファイルをリストアップした画像に注目すると、スナップショットデータは1740.zipにあります。一方、State Reader APIを使用するときは、snapshotStartBatchIdとして1741を使用しました。その理由は、ファイル命名規則が1ベースのインデックスを使用しているのに対し、Spark UIのbatchId番号は0から始まるためです。

まとめ

State Reader APIの新機能は、状態変更の監査、探索、視覚化に新たな機会を提供します。新機能により、開発者はより効率的になります。なぜなら、それ以外の場合、一連のバッチ全体で状態値を抽出するためには別々のクエリが必要となるからです。しかし、この新機能の潜在的な恩恵を受けるのは開発者やサポートスタッフだけではありません。ビジネスのステークホルダーも、チェンジフィードデータを見ることで可能な洞察に興味を持つかもしれません。どちらの場合でも、データを表示するためのクエリとダッシュボードの構築が、State Reader APIの強化のおかげで、今ではより簡単になりました。

結論として、変更フィードはマイクロバッチ間での状態変化の詳細な追跡を可能にし、開発とデバッグの段階で非常に貴重な洞察を提供します。スナップショット機能は便利な診断ツールであり、エンジニアがチェンジログファイルからステートを再構築して、特定のポイント(batchId)でのステートの完全なビューを構築することを可能にします。

State Reader APIについてはここで詳しく読むことができます。また、デモはここで見ることができます。

Databricks 無料トライアル

関連記事

State Reader APIの発表:新しい "Statestore" データソース

Databricks Runtime 14.3には、 構造化ストリーミング の内部ステートデータへのアクセスと分析を可能にする新しい機能、 State Reader API が含まれています。 State Reader APIは、JSON、CSV、Avro、Protobufなどのよく知られた Sparkデータフォーマット とは一線を画しています。 その主な目的は、ステートフルな構造化ストリーミングワークロードの開発、デバッグ、トラブルシューティングを容易にすることです。 Apache Spark 4.0.0(今年後半にリリース予定)には、State Reader APIが含まれます。 新しいAPIはどのような課題に対応しているのか? Apache Spark™...

Apache Spark 構造化ストリーミングにおけるステートフルパイプラインのパフォーマンス改善

イントロダクション Apache Spark™ の 構造化ストリーミング は、Spark SQLエンジン上に構築された、スケーラビリティと耐障害性を提供する人気のオープンソースストリーム処理プラットフォームです。 Databricksレイクハウスプラットフォーム上のほとんどの増分的および ストリーミングワークロード は、 Delta Live Tables および Auto Loader を含む構造化ストリーミングを利用しています。 ここ数年、あらゆる業界における多様なユースケースにおいて、構造化ストリーミングの使用と採用が 飛躍的に伸びて います。 Databricksでは、1週間に1,400万以上の構造化ストリーミングジョブが実行されており、その数は年間2倍以上のペースで増加しています。 ほとんどの構造化ストリーミングのワークロードは、 分析ワークロードと運用ワークロード...

構造化ストリーミングにおける複数のステートフルオペレーター

翻訳:Junichi Maruyama. - Original Blog Link データエンジニアリングの世界では、ETLが誕生したときから使われているオペレーションがある。フィルターする。結合する。集約する。最後に結果を書く。これらのデータ操作は時代が変わっても変わりませんが、レイテンシーとスループットの要求範囲は劇的に変化しています。一度に数イベントを処理したり、1日に数ギガバイトを処理したりすることは、もはや不可能です。今日のビジネス要件を満たすには、テラバイト、あるいはペタバイトのデータを毎日処理する必要があり、そのレイテンシは分単位、秒単位で測定されます。 Apache SparkTMの構造化ストリーミングは、大容量データと低レイテンシに最適化されたオープンソースの主要ストリーム処理エンジンであり、 Databricks Lakehouse を ストリーミングに最適なプラットフォー ムとするコアテクノロジーです。 Project Lightspeed で提供される強化された機能のおかげで、単一のストリ
プラットフォーム一覧へ