Translation Reviewed by Akihiro.Kuwano
概要
このブログ投稿は、Data + AI Summit 2024でのセッションスーパーノヴァからLLMsへのフォ ローアップで、ここでは誰でもApache Kafkaから公開されているNASAの衛星データを消費し、処理する方法を示しました。
多くのKafkaのデモとは異なり、再現性が低いか、シミュレートされたデータに依存しているのではなく、私はNASAの公開されているガンマ線座標ネットワーク(GCN)からのライブデータストリームの分析方法を示します。これは、さまざまな衛星から来るスーパーノヴァとブラックホールのデータを統合しています。
オープンソースのApache Spark™とApache Kafkaだけを使ってソリューションを作ることも可能ですが、このタスクにはDatabricks Data Intelligence Platformを使うことの大きな利点を示します。また、両方のアプローチのソースコードも提供されます。
データインテリジェンスプラットフォーム上に構築されたこのソリューションは、データの取り込みと変換のためのDelta Live Tablesとサーバーレスコンピューティング、データガバナンスとメタデータ管理のためのUnity Catalog、そしてNASAのデータストリームの自然言語クエリと可視化のためのAI/BI Genieの力を活用しています。また、このブログでは、複雑なSQL変換の生成、デバッグ、ドキュメンテーションの ためのDatabricks Assistantの力を示しています。
スーパーノヴァ、ブラックホール、ガンマ線バースト
夜空は静止していません。超新星のような宇宙のイベントは頻繁に起こり、強力なガンマ線バースト(GRB)を伴います。このようなガンマ線バーストは通常、わずか2秒しか持続せず、2秒間のGRBは通常、約100億年の寿命を持つ太陽が一生涯で放出するエネルギーと同じくらいのエネルギーを放出します。
冷戦時代、秘密の核兵器試験を検出するために作られた特別な衛星が、宇宙深部から来るこれらの強烈なガンマ線の閃光を偶然発見しました。今日、NASAはSwiftやFermiのような衛星群を利用して、何十億年も前に遠くの銀河で起こったこれらのバーストを検出し、研究しています。次のアニメーションの緑の線は、2024年8月1日の午前11時CESTにMarko Andlar氏の提供によるSatellite Tracker 3Dで生成されたSWIFT衛星の軌道を示しています。
GRB 221009Aは、最も明るく、最もエネルギーの高いGRBの一つで、そのエネルギーのためにほとんどの機器が視力を失いました。それは矢座から起源を持ち、約19億年前に起こったと考えられています。しかし、宇宙の膨張により、バーストの源は現在、地球から約24億光年離れています。以下の画像にはGRB 221009Aが表示されています。
ウィキペディア。2024年7月18日。"GRB 221009A."https://en.wikipedia.org/wiki/GRB_221009A.
現代の天文学は、光やガンマ線に加えてニュートリノなどのさまざまな信号を一緒に捉えるマルチメッセンジャーのアプローチを採用しています。例えば、南極のIceCube観測所では、南極の氷の立方キロメートルに5000以上の検出器が埋め込まれ、地球を通過するニュートリノを検出しています。
ガンマ線座標ネットワークプロジェクトは、これらの先進的な観測所をつなぎ、宇宙衛星からの超新星データと南極からのニュートリノデータをリンクし、NASAのデータストリームを世界中で利用可能にします。
NASAの衛星からのデータを分析することは難しそうに思えますが、Databricks データインテリジェンスプラットフォームを使えば、その堅牢なツールと実用的な抽象化のおかげで、どんなデータサイエンティストでもこれらの科学データストリームを簡単に探索できることを示したいと思います。
おまけとして、自分の研究に簡単に再利用できる最もクールな公開データストリームの一つについて学ぶことができます。
さて、この課題に取り組むために私が踏んだステップを説明しましょう。
Apache Kafkaからの超新星データの消費
GCN QuickstartからのOICDトークンの取得
NASAはGCNのデータストリームをApache Kafkaのトピックとして提供しており、KafkaブローカーはOIDC資格情報を介した認証を必要とします。GCNの資格情報の取得は簡単です:
- GCN Quickstartページを訪れてください
- Gmailや他のソーシャルメ ディアアカウントを使用して認証する
- クライアントIDとクライアントシークレットを受け取る
Quickstartは、Confluent Kafkaコードベース上に構築されたGCN Kafkaブローカーを利用するPythonコードスニペットを作成します。
GCN Kafkaラッパーは使いやすさを優先する一方で、OAuth認証のためのKafka接続パラメータなどのほとんどの技術的詳細を抽象化していることに注意が必要です。
Apache Spark™を使用したオープンソースの方法
そのスーパーノヴァデータについてもっと学びたいと思い、私はすべてのパラメータと設定を完全に制御できる最も一般的なオープンソースソリューションから始めることにしました。そこで、私はSpark Structured Streamingを使用したノートブックでPoCを実装しました。その核心は、次の行に集約されます:
もちろん、ここでの重要な詳細は、私がGCNラッパーから抽出した **kafka_config
接続詳細にあります。完全なSparkノートブックはGitHubで提供されています(ブログの最後のリポジトリを参照ください)。
しかし、私の最終的な目標は、低レベルの詳細を抽象化し、データの取り込みと変換にDatabricks Delta Live Tables(DLT)を活用した恒星データパイプラインを作成することでした。
Delta Live Tablesを使用してGCN Kafkaから超新星データを増分的に取り込む
DLTを選んだいくつかの理由がありました:
- 宣言的なアプローチ:DLTを使用することで、パイプラインを宣言的に記述することに集中でき、複雑さを大幅に抽象化できます。私はデータ処理ロジックに集中することができ、Databricks Assistant、Auto Loader、AI/BIの利点を享受しながら、パイプラインの構築と維持が容易になります。
- サーバーレスインフラストラクチャ:DLTでは、インフラストラクチャ管理が完全に自動化され、コンピューティングリソースがサーバーレスでプロビジョニングされ、手動のセットアップと設定が不要になります。これにより、インクリメンタルなマテリアライズドビューの計算や垂直オートスケーリングなどの高度な機能が可能になり、効率的でスケーラブルなコスト効率の良いデータ処理が可能になります。
- SQLでのエンドツーエンドのパイプライン開発:私は、OIDC資格情報と複雑なメッセージ変換を含むパイプライン全体にSQLを使用する可能性を探求したかった。これにはKafkaからのデータの取り込みも含まれます。
このアプローチにより、開発プロセスを合理化し、インフラストラクチャの詳細に詰まることなく、宇宙データのシンプルでスケーラブルなサーバーレスパイプラインを作成することができました。
DLTデータパイプラインは完全にSQLでコーディングすることができます(Pythonも利用可能ですが、一部の稀なメタプログラミングタスク、つまりパイプラインを作成するコードを書きたい場合にのみ必要です)。
DLTの新しい開発者向けの改善を使えば、ノートブックでコードを書き、それを実行中のパイプラインに接続することができます。この統合により、パイプラインビューとイベントログが直接ノートブックに表示され、開発体験が一元化されます。そこから、パイプラインを検証し、実行することができます。これはすべて、最適化されたインターフェース内で行うことができます - 事実上、DLTのミニIDEです。
DLTストリーミングテーブル
DLTはストリーミングテーブルを使って、あらゆる種類のクラウドオブジェクトストアやメッセージブローカーからデータを増分的に取り込みます。ここでは、SQLのread_kafka()関数を使用して、GCN Kafkaブローカから直接データをストリーミングテーブルに読み込むためにそれを使用しています。
これは、Kafkaブローカからデータを取得するためのパイプラインの最初の重要なステップです。Kafkaブローカー上では、データは一定の保持期間だけ生存しますが、一度レイクハウスに取り込まれると、データは永久に保存され、あらゆる種類の分析や機械学習に使用することができます。
ライブデータストリームの取り込みは、基礎となるDeltaデータ形式のおかげで可能です。デルタテーブルはDWHアプリケーションの高速データフォーマットであり、デルタテーブルにデータを同時にストリームする(またはからストリームする)こ とができます。
Delta Live Tablesを使用してKafkaブローカーからデータを消費するコードは次のようになります:
簡潔さのため、上記の例では接続設定の詳細を省略しました(完全なコードはGitHubにあります)。
UIでUnity Catalog Sample Dataをクリックすると、データが取り込まれた後のKafkaメッセージの内容を表示できます:
ご覧のように、SQLはメッセージ全体をキーワードと値を含む行で構成される単一のエンティティとして取得します。
注:Swiftメッセージには、衛星がGRBのような宇宙イベントを観測するための位置にスリューするときと方法の詳細が含まれています。
上記の私のKafkaクライアントと同様に、地球上の最大の望遠鏡の一部、および小型のロボット望遠鏡がこれらのメッセージを拾います。イベントのメリット値に基づいて、彼らは予定されたスケジュールを変更して観測するかどうかを決定します。
上記のKafkaメッセージは次のように解釈できます:
通知は2024年5月24日、木曜日の23:51:21ユニバーサルタイムに発行されました。それは衛星の次の指向方向を指定し、その方向は天空での赤経(RA)と赤緯(Dec)の座標によって特徴付けられ、どちらも度で与えられ、J2000エポックで与えられます。RAは213.1104度、Decは+47.355度です。この方向のための宇宙船のロール角は342.381度です。衛星はこの新しい位置に83760.00秒(SOD)でスリューします。これは23:16:00.00 UTに変換されます。計画された観測時間は60秒 です。
この観測の対象の名前は"URAT1-687234652,"で、メリット値は51.00です。メリット値は目標の優先度を示し、特に複数の潜在的な目標が利用可能な場合に観測を計画し、優先順位をつけるのに役立ちます。
レイテンシと頻度
上記のKafkaの設定をstartingOffsets => 'earliest',
と使用すると、パイプラインはKafkaトピックからすべての既存のデータを消費します。この設定により、新しいメッセージが到着するのを待つことなく、既存のデータをすぐに処理することができます。
ガンマ線バーストは稀なイベントであり、特定の銀河で約100万年に1回しか発生しないものの、観測可能な宇宙には銀河が非常に多く存在するため、頻繁に検出されます。私自身の観察に基づくと、新しいメッセージは通常10から20分ごとに到着し、分析のためのデータの安定したストリームを提供します。
ストリーミングデータは、低レイテンシについてだけであるとよく誤解されますが、実際には、到着するメッセージの無制限のフローを増分的に処理することについてです。これにより、リアルタイムの洞察と意思決定が可能になります。
GCNシナリオは、遅延の極端なケースを示しています。我々が分析しているイベントは何十億年も前に起こったもので、そのガンマ線は今までに我々に到達しました。
これはおそらく、あなたのキャリアで遭遇する最も劇的なイベントタイムからインジェストタイムまでのレイテンシの例でしょう。それでも、GCNのシナリオは、ストリーミングデータの使用例として素 晴らしいものです!
DLTマテリアライズドビューによる複雑な変換
次のステップでは、このKafkaメッセージの大きなキャラクターオブジェクト(CLOB)をスキーマに入れて、データを理解できるようにする必要がありました。そこで私は、まず各メッセージを行に分割し、次に各行をキー/値のペアに分割するSQLソリューションが必要でした。これはSQLのピボットメソッドを使用して行います。
Databricks Assistantと私たち自身のDatabricks playgroundのDBRX大言語モデル(LLM)をサポートに利用しました。最終的な解決策は、リポジトリで利用可能な完全なコードと比べると少し複雑ですが、以下に示すようにDLTマテリアライズドビューに基づいた基本的なスケルトンが構築されています:
上記のアプローチは、各メッセージを適切な列に分割するマテリアライズドビューを使用しています。これは次のスクリーンショットで見ることができます。
マテリアライズドビューは、Delta Live Tablesで特に複雑なデータ変換が繰り返し行われる必要がある場合に非常に便利です。マテリアライズドビューは、データ分析を高速化し、レイテンシの低減したダッシュボードを可能にします。
コード生成のためのDatabricks Assistant
Databricks Assistantのようなツールは、複雑な変換を生成するのに非常に便利です。これらのツールは 、そのようなユースケースにおいて、あなたのSQLスキル(または少なくとも私の!)を簡単に上回ることができます。
プロのヒント:Databricks AssistantやDatabricks DBRX LLMのようなヘルパーは、解決策を見つけるだけでなく、簡略化されたデータセットを使用して、その解決策をステップバイステップで説明するように頼むこともできます。個人的には、この生成的AIのチュータリング能力を、コード生成スキルよりもさらに印象的だと感じています!
AI/BI Genieを使った超新星データの分析
今年のData + AI Summitに参加していたら、AI/BIについてたくさん聞いていたでしょう。Databricks AI/BIは、組織内の誰でも分析と洞察を民主化するために作られた新しいタイプのビジネスインテリジェンス製品です。それは二つの補完的な能力、Genieとダッシュボードから成り立っており、これらはDatabricks SQLの上に構築されています。AI/BI Genieは、Databricks Platform内でのデータ分析と視覚化を簡素化し、強化するために設計された強力なツールです。
Genieの核心は、ユーザーが自分のデータについて質問をし、テーブルや視覚化の形で答えを得ることができる自然言語インターフェースです。Genieは、Data Intelligence Platformで利用可能な豊富なメタデータを活用し、その統一ガバナンスシステムUnity Catalogからもデータを取得して、ユーザーの質問の意図を理解する機械学習アルゴ リズムに供給します。これらのアルゴリズムは、ユーザーのクエリをSQLに変換し、関連性と正確性のある応答を生成します。
私が最も愛しているのはGenieの透明性です:結果の背後にある生成されたSQLコードを表示し、それをブラックボックスに隠すのではなく。
DLTでデータを取り込み、変換するパイプラインを構築した後、私はストリーミングテーブルとマテリアライズドビューを分析することができました。データをよりよく理解するために、Genieに数多くの質問をしました。私が探索した一部のサンプルはこちらです:
- 過去30日間に何回のGRBイベントが発生しましたか?
- 最も古いイベントは何ですか?
- 月曜日に何回発生しましたか?(それは文脈を覚えています。イベントの数について話していましたが、データストリームに時間条件を適用する方法を知っています。)
- 平均して1日に何回発生しましたか?
- メリット値のヒストグラムを教えてください!
- 最大のメリット値は何ですか?
それほど昔であれば、私は「平均して1日に」のような質問を複雑なSpark、Kafka、あるいはFlinkのステートメントを使用してウィンドウ関数としてコード化していたでしょう。これで、平易な英語になりました!
最後に、私はその座標を使用して宇宙イベントの2Dプロットを作成しました。データのフィルタリングと抽出の複雑さから、私はまず別のノートブックで実装しました。なぜなら、座標データは