メインコンテンツへジャンプ

新登場!Python Data Source APIでデータ取り込みが驚くほど簡単に!

Share this post

Summary

  • Apache Spark™向けのPython Data Source APIは、複雑なデータソースやシンクを効率的にSparkパイプラインに統合するためのツールを提供します。
  • APIは、REST APIベースのデータ取得などのカスタムコードをカプセル化し、Sparkのソースまたはシンクとして扱える抽象クラスを提供します。
  • APIは、データエンジニアが求めているのは、シンプルさと再利用性であり、データエンジニアはすでにDataFrame APIに慣れているため、シンプルなコードを書くだけで済むことを望むでしょう。

データエンジニアリングチームは、多様なカスタムデータや業界固有のデータソースに対応するため、専用の取り込みソリューションを構築するタスクを頻繁に求められます。しかし、この取り込みソリューションの構築作業は煩雑で時間がかかることが多いのが現状です。こうした課題を解決するために、さまざまな業界の企業にインタビューを実施し、多岐にわたるデータ統合ニーズを深く理解しました。この包括的なフィードバックを基に開発されたのが、Apache Spark™向けのPython Data Source APIです。

Shellとの取り組み
私たちが密接に協力してきた企業の一つがShellです。エネルギー業界では、設備の故障が安全性、環境、運用の安定性に重大な影響を及ぼす可能性があり、Shellではこれらのリスクを最小化することが重要課題となっています。そのため、設備の信頼性の高い運用に注力しています。

Shellは1,800億ドル以上の価値を持つ多種多様な資本設備と機器を所有しており、その運用から生成される膨大なデータを管理するために、データチームがさまざまなプロジェクトでシームレスに作業できる高度なツールに依存しています。Databricks Data Intelligence Platformは、Shellのアナリスト、エンジニア、科学者の間でデータへのアクセスを民主化し、コラボレーションを促進する重要な役割を果たしています。しかし、特定のユースケースでは、IoTデータの統合に課題がありました。

このブログでは、Shellとの取り組みを例に挙げながら、この新しいAPIが以前の課題にどのように対処するかを探り、実際の適用例を示すコードを提供します。

新しいPython Data Source APIで、データ取り込みの負担を軽減しましょう!

課題

まず、Shellのデータエンジニアが直面した課題を見てみましょう。データパイプライン内の多くのデータソースは、Kafkaなどの組み込みのSparkソースを使用していましたが、一部のデータソースはREST API、SDK、またはその他のメカニズムを介してデータを公開していました。この事実が、Shellのデータエンジニアにとって大きな障害となっていました。最終的に、組み込みのSparkソースとこれらのデータソースを結合するために、独自のソリューションを構築することを余儀なくされていました。

このような取り組みは、データエンジニアの貴重な時間とエネルギーを消耗させるだけでなく、大規模な組織でよく見られるように、実装や結果に一貫性を欠く原因となっていました。ShellのChief Digital Technology AdvisorであるBryce Bartmann氏は、シンプルさを求め、次のように語っています。

「私たちは、多くの優れたREST APIを開発しており、その中にはストリーミングユースケース向けのものも含まれます。これらをDatabricksでデータソースとしてそのまま利用できれば、複雑なコードを自分たちで書かなくて済むのでとても助かります。」 ーシェル デジタル技術部門チーフアドバイザー Bryce Bartmann 氏

解決策

新しいPythonカスタムデータソースAPIは、オブジェクト指向の概念を活用して問題に取り組むことを可能にし、この課題を解消します。この新しいAPIは、REST APIベースのデータ取得などのカスタムコードをカプセル化し、Sparkのソースまたはシンクとして扱える抽象クラスを提供します。

データエンジニアが求めているのは、シンプルさと再利用性です。たとえば、ストリーミングパイプラインで天気データを取り込もうとしているデータエンジニアを想像してみてください。理想的には、以下のようなコードを書くだけで済むことを望むでしょう:

このコードはシンプルで、データエンジニアにとって使いやすいものです。なぜなら、彼らはすでにDataFrame APIに慣れているからです。従来、SparkジョブでREST APIにアクセスする一般的なアプローチは、PandasUDFを使用することでした。しかし、PandasUDFを使ってREST APIにデータを送信可能な再利用性のあるコードを書くのがいかに複雑かは、このトピックに関する記事でも取り上げられています。

一方で、新しいAPIは、Sparkジョブ(ストリーミングやバッチ、シンクやソースを問わず)がネイティブでないソースやシンクと連携する方法を簡素化し、標準化します。

次に、実際の例を見てみましょう。この新しいAPIを使って、新しいデータソース(この例では「weather」)をどのように作成できるかを紹介します。この新しいAPIは、ソース、シンク、バッチ、ストリーミングのすべてに対応しており、以下の例では新しいストリーミングAPIを使用して「weather」ソースを実装する方法に焦点を当てています。

PythonデータソースAPIの使用 - 実世界のシナリオ

想像してみてください。データエンジニアとして、予知保全のユースケースに対応するデータパイプラインの構築を任されているとします。このパイプラインでは、油井装置からの圧力データが必要です。仮に、油井装置の温度と圧力データがIoTセンサーを通じてKafkaに流れてくるとしましょう。Structured StreamingにはKafkaデータの処理に対応するネイティブサポートがあるので、ここまでは順調です。

しかし、ビジネス要件が課題を突きつけます:同じデータパイプラインで、油井現場の天気データも取り込む必要があるのです。この天気データはKafkaでストリーミングされているわけではなく、REST APIを通じてアクセス可能な形で提供されています。ビジネス関係者やデータサイエンティストは、天気が装置の寿命や効率に影響を与え、それがメンテナンススケジュールに直結することを認識しています。

シンプルな方法から始める

新しいAPIは、多くのユースケースに適したシンプルなオプションを提供します。それがSimpleDataSourceStreamReader APIです。
このAPIは、スループットが低く、パーティション分割を必要としないデータソースに適しています。この例では、取得するのは限られた数の油井現場の天気データのみで、データの取得頻度も低いため、このAPIを使用します。

以下は、SimpleDataSourceStreamReader APIを使用した簡単な例です。
後ほど、より複雑なアプローチについても説明しますが、複雑なアプローチはパーティション対応のPythonデータソースを構築する場合に理想的です。今はその詳細を気にせず、シンプルなAPIを使用した例を見てみましょう。

コード例

以下のコード例は、「シンプル」APIが十分であるケースを想定しています。
__init__ メソッドは、このリーダークラス(以下の例では WeatherSimpleStreamReader)が監視対象の油井現場を理解するために不可欠です。このクラスは「locations」オプションを使用して、天気情報を出力する対象の場所を特定します。

このシンプルなアプローチで、限られたスケールの天気データを簡単に取り込むことができます。

データソースを定義し、ストリーミングリーダーの実装を組み込んだので、次はこのデータソースをSparkセッションに登録する必要があります。

これにより、天気データソースがデータエンジニアに馴染みのあるDataFrame操作で扱える新しいストリーミングソースとして登録されます。このポイントは重要です。なぜなら、このカスタムデータソースは、オブジェクト指向アプローチを採用することで、より広範なチームにも恩恵をもたらすからです。たとえば、他のユースケースで天気データが必要な場合でも、このデータソースを再利用できます。そのため、データエンジニアは、このカスタムデータソースをPythonのホイールライブラリとして抽出し、他のパイプラインでも再利用可能にすることを検討するかもしれません。

カスタムストリームの活用例
以下は、データエンジニアがこのカスタムストリームをどれだけ簡単に活用できるかを示す例です。

例示結果:

その他の考慮事項

パーティション対応APIの利用タイミング

Python Data Sourceの「シンプル」APIを説明しましたが、次はパーティション対応オプションについて説明します。パーティション対応データソースを利用することで、データ生成を並列化できます。たとえば、パーティション対応のデータソース実装では、ワーカータスクが複数のタスクに地点を分配し、REST API呼び出しをワーカーやクラスター全体に分散させることが可能です。今回の例では、予想されるデータ量が少ないため、この高度な実装は含めていません。

バッチAPI vs ストリームAPI

ユースケースによっては、ソースストリームを生成するか、データをシンクするかで実装すべきメソッドが異なります。この例では、データをシンク(送信)する必要はありません。また、バッチリーダーの実装も含めていません。ただし、特定のユースケースに必要なクラスの実装に集中できます。

  ソース sink
バッチ reader() writer()
ストリーミング streamReader()またはsimpleStreamReader() streamWriter()

Writer APIを使用するタイミング


この記事では、readStreamで使用するReader APIに焦点を当ててきましたが、Writer APIはデータパイプラインの出力側で同様に柔軟なロジックを実現します。たとえば、油井現場の運用管理者が、パイプラインのロジックを活用して、設備の状態(赤/黄/緑)を示すAPIを現場で呼び出すようにデータパイプラインを構築したいとします。この場合、Writer APIを使用すれば、データエンジニアがロジックをカプセル化し、writeStreamフォーマットのように操作可能なデータシンクを提供することができます。

まとめ

「シンプルさは究極の洗練である」 – レオナルド・ダ・ヴィンチ

アーキテクトやデータエンジニアとして、私たちはPySparkのカスタムデータソースAPIを活用し、バッチおよびストリーミングのワークロードを簡素化する機会を手にしています。新しいデータソースがデータチームにとって有益である場合、それをエンタープライズ全体で再利用可能にするために、Pythonホイールなどを利用してデータソースを分離することを検討してください。

Python Data Source APIはまさに私たちが必要としていたものです。 これは、私たちのREST APIやSDKとやり取りするために必要なコードをモジュール化するためのデータエンジニアにとっての機会を提供します。これで、組織全体で再利用可能なSparkデータソースを構築、テスト、公開できるようになったことは、チームがより迅速に動き、自分たちの仕事により自信を持つのに役立ちます。 ーシェル デジタル技術部門チーフアドバイザー Bryce Bartmann 氏

Apache Spark™向けのPython Data Source APIは、これまでデータエンジニアが複雑なデータソースやシンク、特にストリーミングの文脈で直面していた大きな課題に対処する強力な追加機能です。「シンプル」APIでもパーティション対応APIでも、エンジニアはより幅広いデータソースやシンクを効率的にSparkパイプラインに統合するためのツールを手にしました。この記事の説明やコード例が示すように、このAPIの実装と使用は簡単で、予知保全などのユースケースで迅速な成果を上げることができます。より詳しい説明はDatabricksのドキュメンテーション(およびオープンソースのドキュメンテーション)に記載されており、いくつかのPythonデータソース例もここで確認できます。

さらに、カスタムデータソースをモジュール化して再利用可能なコンポーネントとして構築する重要性は非常に高いです。これらのデータソースを独立したライブラリとして抽象化することで、コード再利用とチーム間のコラボレーションを促進し、生産性とイノベーションをさらに向上させることができます。ビッグデータやIoTの可能性を追求し続ける中で、Python Data Source APIのような技術は、エネルギー業界をはじめとするデータ駆動型意思決定の未来を形作る重要な役割を果たすでしょう。

Databricksのお客様へ
REST APIの背後にあるデータを解放するために、サンプルコード()を参考にしてカスタマイズしてみてください。まだDatabricksをご利用でない方は、無料で始めてさっそく試してみましょう。

Databricks 無料トライアル

関連記事

Python ユーザー定義テーブル関数(UDTFs)の紹介

Apache Spark™ 3.5とDatabricks Runtime 14.0は、エキサイティングな機能をもたらした:Pythonのユーザー定義テーブル関数(UDTFs)です。 このブログでは、UDTFとは何か、なぜUDTFは強力なのか、そしてどのようにUDTFを使うことができるのかについて説明する。 Pythonのユーザー定義テーブル関数(UDTF)とは? Pythonのユーザー定義テーブル関数(UDTF)は、出力として単一のスカラー結果値の代わりにテーブルを返す新しい種類の関数です。 一度登録されると、SQLクエリの FROM 句に登場させることができる。 各Python UDTFは0個以上の引数を受け入れ、各引数は整数や文字列のような定数スカラー値である。 関数本体は、これらの引数の値を調べて、どのデータを返すべきかを決定することができる。 PythonのUDTFを使うべき理由 要するに、複数の行や列を生成する関数が必要で、Pythonの豊富なエコシステムを活用したいのであれば、Python UDTFが

DataFrameの等式関数を使ったPySparkテストのシンプル化

DataFrameの等式テスト関数 は、PySparkのユニットテストを簡素化するためにApache Spark™ 3.5とDatabricks Runtime 14.2で導入されました。 このブログ記事で説明した機能一式は、次期Apache Spark 4.0とDatabricks Runtime 14.3から利用可能になります。 DataFrameの等式テスト関数を使用して、より信頼性の高いDataFrame変換を記述 PySparkでデータを扱うには、DataFrameに変換、集約、操作を適用します。 変換が蓄積されるにつれて、コードが期待通りに動作することをどうやって確信できるでしょうか? PySparkの等式テストユーティリティ関数は、データを期待される結果と照らし合わせてチェックする効率的で効果的な方法を提供し、予期しない差異を特定して分析プロセスの初期段階でエラーを検出するのに役立ちます。 さらに、デバッグに多くの時間を費やすことなく、即座に対策を講じることができるように、違いを正確に特定する直感的

Apache Spark 構造化ストリーミングにおけるステートフルパイプラインの最新パフォーマンス改善へのディープダイブ

この投稿は、ステートフル・パイプラインの最新のパフォーマンス改善に関する2部構成のシリーズの第2部です。 このシリーズの最初の部分は、 Apache Spark 構造化ストリーミングにおけるステートフルパイプラインのパフォーマンス改善 でカバーされています。 Project Lightspeedの更新ブログ では、ステートフルパイプラインに追加したさまざまなパフォーマンス改善の概要を紹介しました。 このセクションでは、パフォーマンス分析中に観察されたさまざまな問題を掘り下げ、それらの問題に対処するために実施した具体的な機能強化の概要を説明します。 RocksDBステートストア・プロバイダの改善 メモリ管理 RocksDBは主に メモリ を memtables 、ブロックキャッシュ、その他のピン留めブロックに使用します。以前は、マイクロバッチ内のすべての更新は、 WriteBatchWithIndex を 使用してメモリにバッファリングされていました。 さらに、ユーザーは書き込みバッファとブロックキャッシュの使用に
エンジニアリングのブログ一覧へ