주요 컨텐츠로 이동

(번역: Youngkyong Ko) Original Post

데이터브릭스 런타임 14.3에는 사용자가 구조화된 스트리밍의 내부 상태 데이터에 액세스하고 분석할 수 있는 새로운 기능인 State Reader API가 포함되어 있습니다. State Reader API는 JSON, CSV, Avro, Protobuf와 같은 잘 알려진 Spark 데이터 포맷과는 차별화됩니다. 주요 목적은 상태 유지 구조화된 스트리밍(stateful Structured Streaming) 워크로드의 개발, 디버깅 및 문제 해결을 용이하게 하는 것입니다. 올해 말에 출시될 예정인 Apache Spark 4.0.0에는 State Reader API가 포함될 예정입니다.

새로운 API는 어떤 문제를 해결하나요?

Apache Spark™의 구조화된 스트리밍은 다양한 스테이트풀(stateful) 기능을 제공합니다. 이에 대해 자세히 알아보려면 먼저 스테이트풀 연산자, 워터마크, 상태 관리에 대해 설명하는 "Multiple Stateful Operators in Structured Streaming"를 읽어보시기 바랍니다.

State Reader API를 사용하면 상태 데이터와 메타데이터를 쿼리할 수 있습니다. 이 API는 개발자의 몇 가지 문제를 해결해 줍니다. 개발자는 개발 중 상태 저장소(state store)를 이해하기 어려워 디버깅을 위해 과도한 로깅에 의존하는 경우가 많아 프로젝트 진행 속도가 느려집니다. 이벤트 시간 처리의 복잡성과 신뢰할 수 없는 테스트로 인해 테스트 문제가 발생하여 일부는 중요한 단위 테스트를 건너뛰기도 합니다. 프로덕션 환경에서 분석가들은 데이터 불일치 및 액세스 제한으로 어려움을 겪으며, 긴급한 문제를 해결하기 위해 시간이 많이 소요되는 코딩 해결 방법을 사용하기도 합니다.

두 파트로 구성된 API

state-metadatastatestore라는 두 가지 새로운 데이터프레임 포맷 옵션이 State Reade API를 구성합니다. state-metadata 데이터 형식은 상태 저장소에 저장된 내용에 대한 높은 수준의 정보를 제공하는 반면, statestore 데이터 형식은 키-값 데이터 자체를 세밀하게 살펴볼 수 있게 해줍니다. 프로덕션 문제를 조사할 때는 state-metadata 형식으로 시작하여 사용 중인 상태 유지 연산자, 관련된 배치 ID, 데이터가 분할되는 방식에 대한 개략적인 이해를 얻을 수 있습니다. 그런 다음 statestore 형식을 사용하여 실제 상태 키와 값을 검사하거나 상태 데이터에 대한 분석을 수행할 수 있습니다.

State Reader API를 사용하는 것은 간단하며 친숙하게 느껴질 것입니다. 두 형식 모두 상태 저장소 데이터가 유지되는 체크포인트 위치에 대한 경로를 제공해야 합니다. 새로운 데이터 형식을 사용하는 방법은 다음과 같습니다:

  • State store overview: spark.read.format("state-metadata").load("<checkpointLocation>")
  • Detailed state data: spark.read.format("statestore").load("<checkpointLocation>")

추가적인 설정 및 리턴되는 데이터의 전체 스키마에 대한 자세한 내용은 구조화된 스트리밍 상태 정보 읽기에 대한 Databricks 설명서를 참조하세요. 데이터브릭스 런타임 14.2 이상에서 실행되는 구조화된 스트리밍 쿼리에 대한 상태 메타데이터 정보를 읽을 수 있다는 점에 유의하세요.

State Reader API 사용에 대해 자세히 알아보기 전에 스테이트풀 작업을 포함하는 예제 스트림을 설정해야 합니다.

예시: 실시간 광고 청구

여러분의 업무가 스트리밍 미디어 회사의 광고주에 대한 청구 프로세스를 지원하는 파이프라인을 구축하는 것이라고 가정해 보겠습니다. 서비스를 이용하는 시청자에게 다양한 광고주로부터 주기적으로 광고가 표시된다고 가정해 봅시다. 사용자가 광고를 클릭하면 미디어 회사는 이 사실을 수집하여 광고주에게 비용을 청구하고 광고 클릭에 대한 적절한 크레딧을 받을 수 있어야 합니다. 몇 가지 다른 가정도 있습니다:

  1. 시청 세션의 경우, 1분 동안 여러 번의 클릭은 '중복 제거'되어 한 번의 클릭으로 계산되어야 합니다.
  2. 5분 윈도우는 광고주의 타겟 델타 테이블에 집계 횟수를 출력해야 하는 빈도를 정의합니다.
  3. 스트리밍 미디어 애플리케이션의 사용자가 이벤트 데이터에 포함된 profile_id로 고유하게 식별된다고 가정합니다.

이 글의 마지막 부분에서 가짜 이벤트 스트림을 생성하기 위한 소스 코드를 제공하겠습니다. 지금은 소스 코드에 중점을 두겠습니다:

  1. 스트림을 읽어 들입니다.
  2. 이벤트 클릭을 중복 제거합니다.
  3. 각 advertiser_id에 대한 광고 클릭 수(고유 profile_id 기준)를 집계합니다.
  4. 결과를 델타 테이블에 출력합니다.

원천 데이터

먼저 이벤트 데이터를 살펴보겠습니다. 이 데이터를 생성하는 데 사용되는 코드는 이 글의 부록에서 확인할 수 있습니다.

profile_id는 미디어 앱에서 스트리밍하는 고유한 인간 사용자를 나타낸다고 생각하면 됩니다. 이벤트 데이터는 특정 타임스탬프에 해당 사용자(profile_id)에게 표시된 광고와 그 광고를 클릭했는지 여부를 전달합니다.

Source Data

레코드 중복 제거

프로세스의 두 번째 단계는 스트리밍 파이프라인의 모범 사례인 중복 제거입니다. 예를 들어 빠른 클릭이 두 번 카운트되지 않도록 하려면 이 방법을 사용하는 것이 좋습니다.

withWatermark 메서드는 중복된 레코드(동일한 profile_idadvertiser_id의 경우)가 스트림에서 더 이상 이동하지 않도록 삭제되는 시간 간격을 지정합니다.

Deduplicating records

레코드 집계 및 결과 기록

광고 과금을 추적하는 마지막 단계는 각 5분 간격의 광고주별 총 클릭 수를 유지하는 것입니다.

요약하면, 이 코드는 겹치지 않는 5분 간격(텀블링 윈도우)으로 데이터를 집계하고 각 윈도우 내에서 광고주당 클릭 수를 계산합니다.

아래 스크린샷에서 "Write to Delta Lake" 셀을 보면, Raw Data 탭의 스트림에 대한 몇 가지 유용한 정보가 표시되어 있는 것을 볼 수 있습니다. 여기에는 워터마크 세부 정보, 상태 세부 정보, numFilesOutstandingnumBytesOutstanding과 같은 통계가 포함됩니다. 이러한 스트리밍 메트릭은 개발, 디버깅 및 문제 해결에 매우 유용합니다.

Aggregating records and writing results

마지막으로, 대상 델타 테이블은 advertiser_id, 광고 클릭 수(click_count), 이벤트가 발생한 기간(window)으로 채워집니다.

Delta table

State Reader API 사용

이제 실제 스테이트풀 스트리밍 작업을 살펴봤으니 State ReaderAPI가 어떻게 도움이 되는지 살펴보겠습니다. 먼저 state-metadata 데이터 포맷을 살펴보고 상태 데이터에 대한 개괄적인 그림을 그려보겠습니다. 그런 다음 statestore 데이터 형식을 사용하여 더 세분화된 세부 정보를 얻는 방법을 살펴보겠습니다.

state-metadata를 사용한 상위 수준 세부 정보

state-metadata

이 예제의 state-metadata 정보는 몇 가지 잠재적인 문제를 발견하는 데 도움이 될 수 있습니다:

  1. 비즈니스 로직. 이 스트림에는 두 개의 스테이트풀 연산자가 있음을 알 수 있습니다. 이 정보는 개발자가 스트림이 상태 저장소를 사용하는 방식을 이해하는 데 도움이 될 수 있습니다. 예를 들어, 일부 개발자는 dedupeWithinWatermark(PySpark 메서드 dropDuplicatesWithinWatermark의 기본 연산자)가 상태 저장소를 활용한다는 사실을 모를 수 있습니다.
  2. 상태 유지. 시간이 지남에 따라 스트림의 상태 데이터가 정리되는 것이 이상적입니다. 이는 일부 스테이트풀 연산자를 사용하면 자동으로 수행됩니다. 그러나 임의의 상태 저장 연산(예: FlatMapGroupsWithState)은 개발자가 상태 데이터를 삭제하거나 만료하는 로직을 염두에 두고 코딩해야 합니다. 시간이 지나도 minBatchId가 증가하지 않는다면, 이는 상태 데이터 풋프린트가 무제한으로 증가하여 결국 작업 성능 저하 및 실패로 이어질 수 있음을 나타내는 위험 신호일 수 있습니다.
  3. 병렬 처리. spark.sql.shuffle.partitions의 기본값은 200입니다. 이 구성 값은 클러스터 전체에 걸쳐 생성되는 상태 저장소 인스턴스 수를 결정합니다. 일부 상태 저장 워크로드의 경우 200이 적합하지 않을 수 있습니다.

statestore를 사용한 상세한 세부 정보

statestore 데이터 포맷은 상태 저장소 데이터베이스의 각 스테이트풀 작업에 사용된 키와 값의 내용을 포함하여 세분화된 상태 데이터를 검사하고 분석할 수 있는 방법을 제공합니다. 이러한 데이터는 데이터프레임의 출력에서 Structs로 표시됩니다:

Granular details with statestore

이 세분화된 상태 데이터에 액세스하면 코드 전체에 디버깅 메시지를 포함할 필요가 없어져 스테이트풀 스트리밍 파이프라인의 개발을 가속화하는 데 도움이 됩니다. 또한 프로덕션 문제를 조사하는 데에도 매우 유용할 수 있습니다. 예를 들어 특정 광고주의 클릭 수가 크게 부풀려졌다는 보고를 받은 경우, 상태 저장소 정보를 검사하면 코드를 디버깅하는 동안 조사 방향을 잡을 수 있습니다.

스테이트풀 연산자가 여러 개 있는 경우 operatorId 옵션을 사용하여 각 연산자에 대한 세부 정보를 검사할 수 있습니다. 이전 섹션에서 보았듯이 operatorIdstate-metadata 출력에 포함된 값 중 하나입니다. 예를 들어, 여기서는 워터마크의 상태 데이터 중 dedupeWithinWatermark에 대해 구체적으로 쿼리합니다:

multiple stateful operators

분석 수행 (skew 감지)

익숙한 기술을 사용하여 State Reader API로 표시된 데이터프레임에 대한 분석을 수행할 수 있습니다. 이 예제에서는 다음과 같이 skew를 확인할 수 있습니다:

Performing analytics

state-metadata API를 사용한 인사이트와 결합하여 200개의 파티션이 있다는 것을 알 수 있습니다. 하지만 여기에는 100개의 고유 광고주 중 단 3개의 광고주만 상태가 유지되는 파티션이 있다는 것을 알 수 있습니다. 이 간단 예제에서는 걱정할 필요가 없지만, 대규모 워크로드에서는 성능 및 리소스 문제로 이어질 수 있으므로 skew의 증거를 조사해야 합니다.

언제 State Reader API를 사용하는가

개발 및 디버깅

이 새로운 API는 스테이트풀 스트리밍 애플리케이션의 개발을 크게 간소화합니다. 이전에는 개발자가 비즈니스 로직을 확인하기 위해 디버그 출력 메시지에 의존하고 executior 로그를 샅샅이 뒤져야 했습니다. 이제 State Reader API를 사용하면 직접 상태를 보고, 새 레코드를 입력하고, 상태를 다시 쿼리하고, 반복적인 테스트를 통해 코드를 개선할 수 있습니다.

예를 들어, 수백만 개의 케이블TV 셋탑 박스에 대한 진단을 추적하기 위해 스테이트풀 애플리케이션에서 flatMapGroupsWithState 연산자를 사용하는 Databricks 고객을 생각해 보겠습니다. 이 작업의 비즈니스 로직은 복잡하고 다양한 이벤트를 고려해야 합니다. 케이블 박스 ID는 스테이트풀 연산자의 키 역할을 합니다. 새로운 API를 사용하면 개발자는 스트림에 테스트 데이터를 입력하고 각 이벤트 후 상태를 확인하여 비즈니스 로직이 올바르게 작동하는지 확인할 수 있습니다.

또한 개발자는 이 API를 통해 상태 저장소의 내용을 검증하는 보다 안정적인 단위 테스트와 테스트 케이스를 기대치의 일부로 포함할 수 있습니다.

병렬 처리와 skew 살펴보기

두 데이터 포맷 모두 개발자와 운영자에게 상태 저장소 인스턴스 전반의 키 분포에 관한 인사이트를 제공합니다. state-metadata 형식은 상태 저장소의 파티션 수를 보여줍니다. 개발자는 대규모 클러스터에서도 기본 설정인 spark.sql.shuffle.partitions (200)를 고수하는 경우가 많습니다. 그러나 상태 저장소 인스턴스 수는 이 설정에 따라 결정되며, 워크로드가 큰 경우에는 200개의 파티션으로는 충분하지 않을 수 있습니다.

statestore 형식은 이 글의 앞부분에서 설명한 것처럼 skew를 감지하는 데 유용합니다.

프로덕션 문제 조사

데이터 분석 파이프라인에 대한 조사는 다양한 이유로 발생합니다. 분석가는 레코드의 출처와 기록을 추적하려고 할 수도 있고, 프로덕션 스트림에서 상태 저장소 데이터를 포함해 상세한 포렌식 분석이 필요한 버그가 발생할 수도 있습니다.

State Reader API는 스트리밍 소스가 아닌 상시적인 맥락에서 사용하도록 고안된 것이 아닙니다. 하지만 개발자는 앞서 설명한 것과 같은 기술을 통해 노트북을 워크플로우로 사전에 패키징하여 상태 메타데이터 검색과 상태 분석을 자동화하는 데 도움을 줄 수 있습니다.

결론

State Reader API는 스테이트풀 스트리밍 프로세스에 절실히 필요한 투명성, 접근성, 사용 편의성을 제공합니다. 이 문서에서 설명한 것처럼, API의 사용법과 출력은 간단하고 사용자 친화적이어서 복잡한 조사 작업을 간소화합니다. 

State Reader API는 Apache Spark 4.0.0에 SPARK-45511의 일부로 포함되어 있습니다. 데이터브릭스 문서 Read Structured Streaming state information에서 API의 옵션과 사용법을 설명합니다.

Appendix

Source code

아래는 이 문서에서 설명한 사용 사례 예제의 소스 코드입니다. 이를 ".py" 파일로 저장하고 데이터브릭스로 임포트 할 수 있습니다.

# Databricks notebook source
# DBTITLE 1,Best practice is to use RocksDB state store implementation
spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

# COMMAND ----------

# DBTITLE 1,Imports
import random
import pyspark.sql.functions as F
import pyspark.sql.types as T

# COMMAND ----------

# DBTITLE 1,Directories for the demo. Change, as needed
demo_root = "/Volumes/main/default/ad_click_demo"
fake_media_events = f"{demo_root}/bronze_event_data"
fake_media_events_checkpoint = f"{demo_root}/bronze_event_checkpoint"
ad_clicks = f"{demo_root}/silver_clicks"
ad_clicks_checkpoint = f"{demo_root}/silver_clicks_checkpoint"

dbutils.fs.rm(f"{demo_root}", True)

# COMMAND ----------

# DBTITLE 1,UDFs for random data
random_profile_id = udf(lambda: random.randint(1, 100), T.IntegerType())
random_advertiser_id = udf(lambda: random.randint(1, 100), T.IntegerType())
random_ad_was_clicked = udf(lambda: (random.randint(0, 100) <= 10), T.BooleanType())

# COMMAND ----------

# DBTITLE 1,Fake data DataFrame
event_dataframe = (
  # fake records per second
  spark.readStream.format("rate").option("rowsPerSecond", "100").load()
  .withColumn("profile_id", random_profile_id())
  .withColumn("advertiser_id", random_advertiser_id())
  .withColumn("ad_was_clicked", random_ad_was_clicked()).drop("value")
)

# COMMAND ----------

# DBTITLE 1,Stream to an event table
event_dataframe.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{demo_root}/tmp/fake_media_events_checkpoint/") \
    .start(fake_media_events)

# COMMAND ----------

# MAGIC %md
# MAGIC Before proceeding, wait until the stream is running...

# COMMAND ----------

# DBTITLE 1,Incoming event data
display(spark.read.format("delta").load(fake_media_events))

# COMMAND ----------

# DBTITLE 1,Read and set 1 minute watermark
df_stream = (
  spark.readStream.format("delta").load(fake_media_events)
  .withWatermark("timestamp", "1 minutes")
  )

# COMMAND ----------

# DBTITLE 1,Drop duplicates received within the 1-minute watermark
df_drop_dupes = df_stream.dropDuplicatesWithinWatermark(["profile_id", "advertiser_id"])

# COMMAND ----------

# DBTITLE 1,Aggregate, grouped by advertiser_id
df_counted = (
    df_drop_dupes.filter(F.col("ad_was_clicked") == True)
    .groupBy("advertiser_id", F.window("timestamp", "5 minutes"))
    .agg(F.count("profile_id").alias("click_count"))
)

# COMMAND ----------

# DBTITLE 1,Write to Delta Lake
(
  df_counted.writeStream.format("delta")
  .option("checkpointLocation", ad_clicks_checkpoint)
  .start(ad_clicks)
)

# COMMAND ----------

# MAGIC %md
# MAGIC Before proceeding, wait until the stream is running...

# COMMAND ----------

# DBTITLE 1,High-level statestore info
display(spark.read.format("state-metadata").load(ad_clicks_checkpoint))

# COMMAND ----------

# DBTITLE 1,Granular statestore details
display(spark.read.format("statestore").load(ad_clicks_checkpoint))

# COMMAND ----------

# DBTITLE 1,Granular statestore details
display(spark.read.format("statestore").option("operatorId", "1").load(ad_clicks_checkpoint))

# COMMAND ----------

# DBTITLE 1,Make easy to query with a temp view
spark.read.format("statestore").load(ad_clicks_checkpoint).createOrReplaceTempView("statestore_data")

# COMMAND ----------

# DBTITLE 1,Look for skew
# MAGIC %sql
# MAGIC with partition_counts as (
# MAGIC   SELECT
# MAGIC     partition_id, count(*) keys_for_partition, count(distinct key.advertiser_id) uniq_advertisers
# MAGIC   FROM
# MAGIC     statestore_data
# MAGIC   group by
# MAGIC     partition_id
# MAGIC )
# MAGIC select min(keys_for_partition) min_keys_for_partition, avg(keys_for_partition) avg_keys_for_partition,
# MAGIC        max(keys_for_partition) max_keys_for_partition, sum(uniq_advertisers) uniq_advertisers
# MAGIC from
# MAGIC   partition_counts

# COMMAND ----------

# DBTITLE 1,Inspect the output (target Delta table)
display(spark.read.format("delta").load(ad_clicks))
Databricks 무료로 시작하기

관련 포스트

모든 엔지니어링 블로그 포스트 보기