メインコンテンツへジャンプ

Delta Live Tablesを使用して、複数のストリーミングプラットフォームから同時にデータを処理する

Easily manage and consume from diverse streaming platforms across multiple clouds through a single ETL pipeline.
Uday Satapathy
ディパンカル・クシャリ
Akash Jaiswal
Share this post

Original Blog : Processing data simultaneously from multiple streaming platforms using Delta Live Tables

翻訳: junichi.maruyama 

今日の組織における大きな課題の1つは、ビジネスのスピードに合わせた意思決定を可能にすることです。ビジネスチームや自律的な意思決定システムは、意思決定や迅速な対応に必要なすべての情報を、ソースとなるイベントが発生すると同時に、リアルタイムまたはほぼリアルタイムで必要とすることが多い。このような情報は、ストリーム処理用語でイベントと呼ばれ、ソースからデスティネーションへ非同期でリレーされ、一般的にメッセージブローカーやメッセージバスを介して行われる。

組織が成長し、チームが他のチームに分岐するにつれ、メッセージブローカーの使用パターン、数、種類は増加します。合併や買収のシナリオでは、企業が新しいメッセージブローカーを継承することが多く、その場合、既存のデータエンジニアリング・プロセスと統合する必要があります。メッセージブローカー、そのプロデューサ、コンシューマ、データエンジニアリングパイプラインを一貫して管理することは、多くのテクノロジー、クラウドプラットフォーム、プログラミング言語の専門性を必要とするため、困難な場合があります。

例えば、マルチストリームのユースケースを見てみましょう。世界的なコングロマリットで、次のような状況を想像してください。

  • 同社には、Amazon Kinesis Data StreamsとAmazon Managed Service for Kafka(MSK)という2つのメッセージブローカーを使用する主要部門があります。
  • またAzure Event Hubsを社内で使用している小規模な製品会社も買収しました。
  • 彼らはまた、csvやjsonファイルの継続的なストリームという形で来るデータの主要なソースを持っています。

下の図は、上記のマルチストリームのユースケースを描いたものです:

Fig 1 - Multiple stream Use Case
Fig 1 - マルチストリーム使用例

このようなシナリオで、すべてのデータを処理する際の主な課題を紹介します:

  • 多様なデータソースとその技術をどのように統合するのか?
  • データエンジニアリング・システムを、拡張や保守が容易なように構築するにはどうすればよいか?それぞれの技術に特化した人材など、馬の骨を雇うのか?
  • このようなテクノロジーの難問を解決する過程で、最終目標である「データの意味を理解し、意思決定を迅速に行う」ことを忘れないようにするには、どうすればよいのでしょうか。
  • バッチ処理とストリーミング処理の両方のニーズを、同じデータ処理システムで実現するにはどうすればいいのでしょうか?

このブログでは、Delta Live Tables (DLT)を通して、これらの問題を簡単かつきれいに解決できる方法を紹介します。Databricks Lakehouse Platform上のDelta Live Tablesは、高品質のバッチおよびストリーミングデータパイプラインを簡単に作成、管理することができます。

マルチストリームの使用例

DLTによるマルチストリーム処理のシナリオを示すために、ヘルスケア領域のユースケースを想像してみましょう。ユースケースの中心は患者である。この患者は、別のエンティティ(hospital)と次のようなやり取りをします:

  • 病院で病気に対する予防接種を受ける
  • 病院で病気の検査を受ける
  • 特定の病気で入院する。
  • 治療費の請求は、クレーム処理によって行われます。

これらのやりとりは、以下のビジネスユースケースダイアグラムで表現することができます。

Fig 2 - Business Use Case Diagram for a Patient’s interactions
Fig 2 - 患者さんとのやりとりのビジネスユースケース図

これらの相互作用に関与するエンティティは、以下のエンティティ関係(ER)図の形で表現することができます。この先、データ・ウェアハウス次元モデリングの辞書にある「ファクト」と「ディメンション」という用語を十分に使用することになります。ER図では、PatientとHospitalがディメンション・テーブルであり、Admissions、Vaccinations、Testing records、Claimsのデータはファクト・テーブルの形で表現されていることがわかります。

Fig 3 - Entity Relationship Diagram for a Patient’s interactions
Fig 3 - 患者の相互作用のための実体関係図

例えば、入学、予防接種、検査記録、請求のファクトデータが、異なるメッセージブローカーを通じて届くとします。このブログでは、業界で人気のあるApache Kafka (Amazon MSK), AWS Kinesis Data Streams, Azure Event Hubsを選択しました。ストリーム処理のシナリオを示す4つ目の方法は、一連のファイルをテーブル(この場合はDelta lake)にインクリメンタルに取り込むことです。この目的のためにDatabricks Autoloaderを選択しました。この情報により、患者のインタラクション図を関連するデータソースで更新できるようになりました:

Fig 4 - Business Use Case Diagram with data sources
Fig 4 - データソースを含むビジネスユースケース図

以下、表形式でまとめたデータソースを紹介します:

Fact

Data source

Admissions

Amazon MSK (Apache Kafka)

Testing Records

Azure Event Hubs

Vaccinations

AWS Kinesis Data Streams

Claims

Incremental stream of files

次のセクションでは、単一のDLTパイプラインが上記のストリーミングソースから同時にデータを取り込み、さらにそれに対してETLを実行する方法を示します。このブログではAWSのインフラを使ってリソースとパイプラインをセットアップしましたが、AzureやGCPなど他のパブリッククラウドでも同様にシナリオを構成できるはずです。

DLTによるマルチストリーム処理

下図は、一般的なDLTのパイプラインを表したものである。多様なソースからのストリーミングデータの取り込みと変換がこのブログの主題ですが、DLTにはさらに注目すべき機能があります。例えば、データ品質管理は、チームが特定の「期待値」をクリアしたデータのみを処理できるようにするための重要な機能です。チームは、誤ったデータに対して是正措置や予防措置を講じることができます。DLTのその他の利点として、管理されたチェックポイントと強化されたオートスケーリングがあります。これらの機能やその他の機能については、こちらの記事でご紹介しています: Delta Live Tables concepts

Fig 5 - Data Processing Pipeline with DLT
Fig 5 - DLTによるデータ処理パイプライン

私たちのマルチストリーム処理DLTパイプライン(簡単のためにdlt_multistream_consumerと呼ぶことにします)は、以下のような操作を行います:

  1. メッセージブローカーからのストリーミングデータをデルタレイクテーブルの生レイヤーにロードする。
  2. メッセージペイロードから意味のあるファクト(ファクトテーブル)を生成するために、生データをパースしてスキーマを適用する。
  3. BIおよびアナリティクスチームが利用するための集計データセットを生成する。これは、様々なファクトテーブルとディメンションテーブルを結合し、生成されたレコードをスライス/ダイシングすることで行われます。

DLT Pipelineは、以下のモードで動作するように構成することができます:

  • 連続:ライブクラスタは、データが到着するとすぐに連続的に処理します。これは、レイテンシーが最も重要なストリーミングシナリオに推奨されます。
  • Triggered(トリガー): 任意のスケジュールでクラスタが起動し、その時点で利用可能なデータを処理します。処理待ちのデータがなくなると(一定の時間持続する閾値)、クラスタはシャットダウンされます。このモードは、コスト削減とスループットがレイテンシーよりも優先される場合に推奨されます。

このブログでは、2つの理由から連続モードを使用しています。1つ目は、データをリアルタイムで取り込んで処理する「真の」ストリーム処理シナリオを実証するためです。2つ目は、このブログを公開している時点では、Apache Spark用のAmazon Kinesis Data Streamsコネクタは、トリガーモードのようなバッチモードの取り込みをサポートしていません(つまり、一度トリガーするか、今すぐ利用できます)。近々登場する予定ですが。

もう1つ強調したいのは、一般的に本番環境では、生データが取り込まれた後、多くの処理と充実化の段階を経るという点です。これらの段階は、データ分析、データの履歴および予測分析を含むビジネスユースケースのために、データを浄化、変換、集計します。私たちは、データがアーキテクチャの各層を流れる際に、データの構造と品質が徐々に改善されることを、メダリオンレイクハウス・アーキテクチャと呼んでいます。メダリオンアーキテクチャは主流のユースケースに対するベストプラクティスですが、このブログを簡単にするために、データ変換の部分を省略し、ビジネスユースケースをサポートするためのデータ分析の実行に移行することにします。

それでは、DLTを利用した多数のストリーミングシステムからDelta Lakeにストリーミングデータを一度に取り込み、そのデータを分析して意味のあるインサイトを生成する方法を見ていきましょう。

ストリームからのデータ取り込み

このセクションでは、複数のストリーミングプラットフォームからデータをインジェストする方法を紹介します。

Amazon MSKからのデータ取り込み

Amazon Managed Streaming for Apache Kafka(Amazon MSK)は、Apache Kafkaを使用してストリーミングデータを処理するアプリケーションの構築と実行を可能にするフルマネージドサービスです。Amazon MSKは、クラスタの作成、更新、削除といったコントロールプレーンの操作を提供します。

本ブログでは、以下のような構成のMSKクラスタを使用しています:

  • Cluster type - Provisioned
  • Total number of brokers - 2
  • Apache Kafka version - 2.5.1
  • Broker type - kafka.m5.large
  • Number of availability zones - 2
  • EBS storage volume per broker - 300 GiB
  • Encryption between clients and brokers:
    - TLS encryption - Enabled
    - Plaintext - Enabled

AWS環境での認証のために、DLTクラスタにインスタンスプロファイルをアタッチしています。これは、役割定義で設定された適切な読み取り/書き込み権限を持つIAMロールを参照しています。

パイプラインdlt_multistream_consumerは、MSKトピックadmissionからHospital Admissionデータセットをmsk_admissions_tbl_rawというデルタレイクテーブルにインジェストしています。Admissions ファクトの kafka ペイロード(文字列として表現されるバイナリ)は次のようになります:

{
    "admissions_id": "93c9b070-e636-4e66-b758-964b6d7ce9a1", 
    "hospital_id": 103658, 
    "patient_id": 141620, 
    "Timestamp": "2022-07-10T05:00:00.000+0000"
}

インジェストにはpythonノートで以下のコードを使い、DLTパイプラインのdlt_multistream_consumerにアタッチしています。

import dlt
import pyspark.sql.functions as F
from pyspark.sql.types import *

@dlt.create_table(
  name='msk_admissions_tbl_raw',
  comment="BRONZE TABLE FOR ADMISSIONS DATA FROM AWS MSK",
  table_properties={
    "quality": "bronze"
  }
)


def msk_stream():
  input_schema = StructType(
    [ 
      StructField('admissions_id', StringType()),
      StructField('hospital_id', IntegerType()),
      StructField('patient_id', IntegerType()),
      StructField('Timestamp', TimestampType())
    ]
  )
  topic = "admission"
  kafka_bootstrap_servers_plaintext=dbutils.secrets.get(scope='myscope_aj',key='MSK_kafka_bootstrap_servers_plaintext')
  return (
   spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext ) 
  .option("subscribe", topic )
  .option("failOnDataLoss", "false")
  .option("startingOffsets", "earliest" )
.load()
  .withColumn("admission_json", F.from_json(F.col("value").cast("string"), input_schema))
  .withColumn("msk_event_timestamp",F.col("timestamp"))
  .select("msk_event_timestamp","admission_json.*")
    )

パイプライン dlt_multistream_consumer が開始されると、目的のデータベース/スキーマ内に msk_admissions_tbl_raw テーブルを作成し、そのテーブルに入場トピックから読み込んだデータを入力します。その後、そのトピックに新しいデータが書き込まれると、さらにデータを取り込みます。

Kafkaトピックから取り込まれる4つのフィールドに加えて、msk_event_timestamp(Kafkaが生成するイベントタイムスタンプから派生)列をmsk_admissions_tbl_rawテーブルに追加しています。メッセージは、バイナリ形式でKafkaペイロードのフィールド値で受信されます。下流のプロセスで解析できるように、文字列に変換する必要があります。

Amazon Kinesisからのインジェスト

Amazon Kinesis Data Streamsは、リアルタイムのストリーミングデータを簡単に収集、処理、分析することができます。データは、機械学習、分析、その他のアプリケーションのためのビデオ、オーディオ、アプリケーションログ、ウェブサイトのクリックストリーム、およびIoTテレメトリデータである可能性があります。当ブログでは、以下の構成でKinesisデータストリームを使用しました:

  • Data Stream Name: test_vaccination_data
  • Capacity Mode: On Demand
  • Write Capacity: 200 MiB/second or 200,000 records/second (Default Maximum)
  • Read Capacity: 400 MiB/second (Default)

スループットを向上させるために、Databricks Runtime KinesisコネクタのAmazon Kinesis enhanced fan-out (EFO)機能を本番シナリオで使用することができます。

今回のユースケースでは、test_vaccination_dataストリームが、患者のワクチン接種の事実(イベント)を運びます。以下は、ペイロードのサンプルです:

{
    "hospital_id": 102269,
    "patient_id": 101060,
    "vaccination_type": "hepatitis",
    "Timestamp": "2021-04-02 16:00:00"
}

KinesisデータストリームからLake houseのLanding zoneにデータを消費するためのDLTコードは以下の通りです:

import dlt
import pyspark.sql.functions as F
from pyspark.sql.types import *


my_stream_name = 'test_vaccination_data_1' 
kinesisRegion = 'us-west-2'

@dlt.create_table(
  name='kinesis_vaccination_tbl_raw',
  comment="BRONZE TABLE FOR VACCINATION DATA FROM KINESIS",
  table_properties={
    "quality": "bronze"
  }
)
def kinesis_stream():
  input_schema = StructType(
    [ 
      StructField('hospital_id', IntegerType()),
      StructField('patient_id', IntegerType()),
      StructField('vaccination_type', StringType()),
      StructField('Timestamp', TimestampType())
    ]
  )

  return (
    spark
    .readStream
    .format("kinesis")
    .option("streamName", my_stream_name)
    .option("initialPosition", "earliest")
    .option("region", kinesisRegion)
    .load()
    .withColumn('value',F.from_json(F.col("data").cast("string"), input_schema))                
    .withColumn('key',F.col('partitionKey').cast("string"))
    .select('key','value.hospital_id','value.patient_id','value.vaccination_type','value.Timestamp')
  )

パイプラインdlt_multistream_consumerは、Kinesisデータストリームで利用可能なデータをターゲットテーブルkinesis_vaccination_tbl_rawに入力し、データの再読み込みを回避するためにチェックポイントを実行する。メッセージはバイナリ形式でKinesisペイロードのフィールドデータで受信されます。下流のプロセスで解析できるように、文字列に変換する必要があります。

Azure Event Hubsからのインジェスト

Azure Event Hubsは、Microsoftが提供する一般的なストリーミングプラットフォームです。Event Hubsは、AMQP、HTTPS、Apache Kafkaなど、読み書きのための複数のプロトコルをサポートしています。このブログでは、DLTパイプラインdlt_multistream_consumerは、Event HubsのKafkaサーフェスを使用してメッセージを消費しています。私たちは、Event Hubsのネームスペースに2 Throughput Unitの標準価格帯を使用しています。

Azureのドキュメントでは、名前空間はイベントハブ(Kafkaでいうところのトピック)の管理コンテナであると説明されています。私たちの関心のあるイベントハブ(別名トピック)はtesting_records_streamで、患者の臨床検査結果に関連するイベントを運びます。各メッセージは以下のような感じです:

{
  "testing_record_id":"e0a2e0a8-f0a6-4f1f-a080-974026c3606a",
  "hospital_id":104164,
  "patient_id":152430,
  "is_positive":"N",
  "Timestamp":"2021-07-12 17:00:00.000"
}

testing_records_streamイベントハブからメッセージを消費するDLTコードは、Amazon MSKのインジェストパターンとよく似ています(どちらもKafkaプロトコルを使っていますね)

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Event Hub details
EH_CONN_STR = "my-event-hubs-connection-string"
EH_NAMESPACE = "my-event-hubs-namespace"
EH_KAFKA_TOPIC = "testing_records_stream"
EH_BOOTSTRAP_SERVERS = f"{EH_NAMESPACE}.servicebus.windows.net:9093"
EH_SASL_WRITE = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";"
# standard configuration options (similar to Structured Streaming)
topic_name = EH_KAFKA_TOPIC
eh_namespace_name = EH_NAMESPACE
eh_sasl = EH_SASL_WRITE
bootstrap_servers = EH_BOOTSTRAP_SERVERS
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"startingOffsets": "earliest",
"kafka.sasl.jaas.config": eh_sasl,
"subscribe": topic_name,
}
# creates a DLT table
@dlt.table(name = "eventhubs_testing_records_tbl_raw",
         comment = "Reads raw messages from event hub stream for testing records of patients")
def read_eventhub():
 return spark.readStream.format("kafka").options(**kafka_options).load().withColumn("value_text", F.col("value").cast("string"))

DLTパイプラインは、イベントハブからテストレコードメッセージをインクリメンタルに読み込み、新しい行としてeventhubs_testing_records_tbl_raw delta lake tableに追加する。メッセージは、Kafka-surfaceのペイロードのフィールド値でバイナリ形式で受信されます。下流のプロセスが解析できるように、これらは文字列に変換する必要があります。

Autoloaderによるファイルの取り込み

Auto Loaderは、業界で最も強力で堅牢なデータファイルインジェストツールの1つです。新しいデータファイルがクラウドストレージに到着すると、追加設定をすることなく、段階的かつ効率的に処理します。現在、JSON、CSV、PARQUET、AVRO、ORC、TEXT、BINARYFILEのファイル形式を取り込むことができます。

Auto Loaderでは、コンシューマーはcloudFilesと呼ばれるStructured Streamingソースを設定することができます。設定オプションには、クラウドストレージのファイルパス、ファイルフィルタリングパターン、Azure Queue Storage、AWS SQS、AWS SNSなどのキューに対するファイル到着イベントオプションなどがあります。DLTパイプラインは、これらのオプションを使用して、到着した新しいファイルを自動的に取り込み、そのディレクトリにある既存のファイルも処理するオプションがあります。Auto Loaderは、DLTパイプラインにおけるPythonとSQLの両方をサポートしています。クライアントはAuto Loaderを使用して、何十億ものファイルを処理し、データの移行やテーブルの埋め戻しを行ったことがあります。Auto Loaderは、1時間あたり数百万ファイルのほぼリアルタイムのインジェストをサポートするように拡張されています。

このブログでは、Auto Loaderを使用して、以下のサンプルで患者請求データを取り込みました:

{
   "claim_id":"595be279-1ac3-4831-8a32-62d338c280ab",
   "admissions_id":"f6f17bf8-d735-4f75-91e4-e2c04e30ecec",
   "claim_timestamp":"2022-01-31T13:00:00.000Z",
   "total_invoiced_amount":32146.0,
   "patient_payable_amount":18174.0,
   "payer_covered_amount":13972.0
}

dlt_multistream_consumerパイプラインでは、以下のコードのように、患者クレームのjsonファイルがソースロケーションに到着すると、Auto Loaderを介してインジェストされます:

import os
import pyspark.sql.functions as F
import dlt 

files_path = "dbfs:/mnt/multistream-dlt-blog/raw_data/claims"
checkpoint_root = 'dbfs:/mnt/multistream-dlt-blog/ss-checkpoints/autoloader'
checkpoint_schema = os.path.join(checkpoint_root, "schemas")
checkpoint_writestream = os.path.join(checkpoint_root, "writestream")

cloudfiles_options = {
  "header": "true",
  "cloudFiles.format": "json",
  "cloudFiles.useNotifications": "false",
  "cloudFiles.inferColumnTypes": "true",
  "cloudFiles.rescuedDataColumn": "_rescued_data",
  "cloudFiles.schemaHints": "timestamp timestamp",
  "cloudFiles.schemaLocation": checkpoint_root,
  "cloudFiles.schemaEvolutionMode": "addNewColumns"
}

@dlt.table
def autoloader_filestream_claims_tbl_raw ():
  return (spark.readStream
                  .format('cloudFiles')
                  .options(**cloudfiles_options)
                  .load(files_path).withColumn("source_file", F.input_file_name())
                  .withColumn("ingestion_timestamp", F.current_timestamp()))

インジェストされたデータを分析し、ビジネスインサイトを生み出す

前述したように、このブログでは、Amazon MSK、Amazon Kinesis Data Streams、Azure Event Hubs、Databricks Auto Loaderという4つの異なるソースから患者データセットをすでにインジェストしています。簡潔にするために、データ変換部分はスキーマの強制のみに限定し、ビジネスユースケースをサポートするために、これらのデータセットに対してデータ分析を実行するために前進しています。

このブログでは、2つのビジネス分析のユースケースを作成しました。これは、ユーザーがDLTによって作成されたデルタレイクのテーブルを使用して、ビジネスの疑問に答えるためにデータ分析を実行する方法を示すためのものです。

Covidワクチン接種後にCovid陽性となった患者数(州・年齢層別)

gold_patients_by_state_age_group テーブルは、DLT を介したデータ取り込みによって入力されたテーブルを結合することによって DLT で作成されます。新しいデータがインジェストレイヤーに到着すると、継続的なDLTパイプラインがそのデータをインジェストレイヤーに取り込み、データ分析のためにこのgold_patients_by_state_age_groupテーブルを更新します。--

-- Creating a temporary PATIENT View using LIVE keyword for using it with DLT
-- Views in DLT lives within DLT execution and are not accessible from outside e.g. using DBSQL


-- Here we are reading the patients table from health_analytics database
CREATE TEMPORARY LIVE VIEW PATIENT AS
SELECT
  *
FROM
  health_analytics.patient

-- Creating patients_by_state_age view using LIVE keyword for using it with DLT
-- This is an intermediate step to calculate patient's age 
CREATE TEMPORARY LIVE VIEW patients_by_state_age AS
  select
    count(*) as patient_count,
    state,
    ceil(
      date_diff(current_date(), cast(date_of_birth as date)) / 365
    ) as age
  from
    LIVE.eventhubs_testing_records_tbl_raw as testing_records_sample
    join LIVE.kinesis_vaccination_tbl_raw as vaccinations_sample on testing_records_sample.patient_id = vaccinations_sample.patient_id
    join LIVE.PATIENT on testing_records_sample.patient_id = patient.patient_id
  where
    testing_records_sample.`Timestamp` > vaccinations_sample.`Timestamp`
    AND vaccinations_sample.vaccination_type = 'covid-19'
    AND testing_records_sample.is_positive = 'Y'
  group by
patient.state,
    ceil(
      date_diff(current_date(), cast(date_of_birth as date)) / 365
    )

-- Create age groups for patients
CREATE TEMPORARY LIVE VIEW patients_by_state_age_group AS
select
  patient_count,
  state,
  case
    when age between 0
    and 10 then '1-child'
    when age between 11
    and 25 then '2-youth'
    when age between 26
    and 60 then '3-adult'
    else '4-senior'
  end as age_group
from
  LIVE.patients_by_state_age

-- Creating GOLD table gold_patients_by_state_age_group as part of Medallion Architecture to create aggregated data for analysis

CREATE
  OR REFRESH LIVE TABLE gold_patients_by_state_age_group
  select
    state,
    age_group,
    sum(patient_count) as patient_count
  from
    LIVE.patients_by_state_age_group
  group by
    state,
    age_group
  order by
    state,
    age_group

以下は、Databricks SQLクエリエディタでクエリした場合のデータの見え方です:

Simultaneous stream processing using DLT

都市・州ごとの月別入学者数

月ごとの都市および州ごとの入場者数は、gold_monthly_admissions_by_state_cityテーブルを照会することでサポートされる別のユースケースです。

-- Creating a temporary Hospital View using LIVE keyword for using it with DLT 
-- Here we are reading a static hospital table from health_analytics database 

CREATE TEMPORARY LIVE VIEW hospital AS
SELECT
  *
FROM
  health_analytics.hospital

-- Creating GOLD table gold_monthly_admissions_by_state_city as part of Medallion Architecture to create aggregated data for analysis

-- Converting Year-Month into a YYYYMM format
CREATE
OR REFRESH LIVE TABLE gold_monthly_admissions_by_state_city
select
count(*) as patient_count,
  city,
  state,
  extract(
    Year
    FROM
      Timestamp
  ) || (
    case
      when extract(
        month
        FROM
          Timestamp
      ) < 10 then '0' || extract(
        month
        FROM
          Timestamp
      )
      else extract(
        month
        FROM
          Timestamp
      )
    end
  ) AS YYYYMM
from
  LIVE.msk_admissions_tbl_raw admissions_source
  join LIVE.hospital on admissions_source.hospital_id = hospital.hospital_id
group by
  city,
  state,
  extract(
    Year
    FROM
      Timestamp
  ) || (
    case
      when extract(
        month
        FROM
          Timestamp
      ) < 10 then '0' || extract(
        month
        FROM
 Timestamp
      )
      else extract(
        month
        FROM
          Timestamp
      )
    end
  )

以下は、Databricks SQLクエリエディタでクエリした場合のデータの見え方です:

Simultaneous stream processing using DLT

DLTパイプラインのEnd to Endビュー

DLTの主な特徴の1つは、パイプラインの一部として含まれるコードアーティファクト間のテーブルレベルの依存関係をすべてクロールし、それを系統グラフの形で表示する機能です。このグラフは「ライブ」です。ETL処理が進むにつれて、成功した処理は緑色で、失敗した処理は赤色で表示されます。継続的に実行されているDLTデータパイプラインのスクリーンショットを以下に示します:

Simultaneous stream processing using DLT

まとめ

このブログでは、Databricks Delta Live Tableを使用して、複数のクラウドにまたがる多様なストリーミングプラットフォームから、単一のデータパイプラインを使用してデータを取り込み、消費する方法を紹介しました。ETLプロセスは、データが到着するとすぐに、連続的に行われます。変換によって生成されたデータは、データ分析を実行してビジネスの疑問に対する答えを得るために継続的に利用できます。このようなソリューションは、全体的な技術アーキテクチャを大幅に簡素化し、保守と拡張を容易にします。DatabricksはDLTに定期的に新機能を導入しており、ETLワークロードのために顧客の間で広く採用されています。Delta Live Tablesを今すぐお試しください。

Databricks 無料トライアル

関連記事

エンジニアリングのブログ一覧へ