주요 컨텐츠로 이동
Engineering blog

데이터 엔지니어링의 세계에는 ETL이 탄생할 때부터 사용되어 온 작업이 있습니다. 필터링과 조인(Join), 집계를 거쳐 마지막으로 결과를 저장하는 작업입니다. 이러한 데이터 작업은 오랜 시간동안 동일하게 유지되어 왔지만, 지연 시간 및 처리량 요구 사항의 범위는 크게 변화했습니다. 한 번에 몇 개의 이벤트를 처리하거나 하루에 몇 기가바이트를 처리하는 것만으로는 더 이상 충분하지 않습니다. 오늘날의 비즈니스 요구 사항을 충족하려면 매일 테라바이트 또는 페타바이트 단위의 데이터를 처리해야 하며, 작업 지연 시간은 분과 초 단위로 측정되어야 합니다.

Apache SparkTM의 구조화된 스트리밍(Structured Streaming)은 대용량 데이터와 짧은 지연 시간에 최적화된 오픈소스 스트림 처리 엔진으로, 데이터브릭스 레이크하우스를 스트리밍을 위한 최고의 플랫폼으로 만드는 핵심 기술입니다. 라이트스피드 프로젝트(Project Lightspeed)와 함께 제공되는 향상된 기능 덕분에 이제 단일 스트림 내에서 이러한 모든 전통적인 데이터 작업을 수행할 수 있습니다.

Databricks Runtime 13.1과 곧 출시될 Apache SparkTM 3.5.0 릴리즈부터는 스트림에 여러 개의 상태 저장 연산자(Stateful operators)를 포함할 수 있습니다. 더 이상 조인(join) 후 싱크(sink)에 기록했다가 이 데이터를 다시 집계를 위해 다른 스트림으로 읽을 필요가 없습니다. 스트림을 여러 개로 나누는 대신 하나의 스트림 내에서 조인과 집계를 수행하면 복잡성, 지연 시간, 비용을 줄일 수 있습니다. 이 포스팅에서는 몇 가지 상태 저장 스트리밍 개념에 대해 간략하게 살펴본 다음, 이 흥미로운 기능의 예제를 바로 살펴보겠습니다!

상태 저장 연산자(Stateful Operators)란?

구조화된 스트리밍은 한 번에 작은 데이터 배치에 대한 연산을 수행하며 이를 흔히 마이크로 배치라고 부릅니다. 구조화된 스트리밍의 연산자는 상태 비저장형(stateless)과 상태 저장형(stateful)의 두 가지 범주로 나눌 수 있습니다.

상태 비저장 스트림은 이전 마이크로 배치에서 처리된 데이터에 대해 아무것도 알 필요가 없는 연산을 수행합니다. 예를 들어, 값이 10보다 큰 행만 유지하도록 레코드를 필터링하는 것은 현재 작업 중인 데이터 외에 다른 데이터에 대한 지식이 필요하지 않으므로 상태 비저장 스트림입니다.

상태 저장 스트림은 현재 마이크로 배치에 있는 것 외에 더 많은 정보가 필요한 작업을 수행합니다. 예를 들어 5분 동안의 값 개수를 세는 경우, 구조화된 스트리밍은 레코드가 얼마나 많은 마이크로 배치에 분산되어 있는지에 관계없이, 5분 분량의 레코드에 대해 각 키에 대한 집계 횟수를 저장해야 합니다. 이렇게 저장된 데이터를 상태(State)라고 하며, 상태를 저장해야 하는 연산자를 상태 저장 연산자라고 합니다. 가장 흔히 볼 수 있는 상태 저장 연산자는 집계, 조인과 중복 제거입니다.

워터마크(Watermarks)란? 

구조화된 스트리밍의 모든 상태 저장 연산자는 워터마크를 지정해야 합니다. 워터마크를 사용하면 데이터의 허용 지연 시간과 상태 유지 기간을 제어할 수 있습니다.

각 레코드에 이벤트 타임스탬프가 포함된 데이터 집합을 해당 타임스탬프를 기준으로 5분 윈도우 단위로 집계하고 있다고 가정해 보겠습니다. 일부 레코드가 순서와 다르게 도착하면 어떻게 될까요? 타임스탬프가 12:11인 레코드를 처리한 후에 타임스탬프가 12:04인 레코드가 도착하는 경우, 다시 돌아가서 12:00-12:05 집계에 해당 레코드를 포함할까요? 늦게 도착한 데이터를 언제까지 허용하고 12:00-12:05 기간의 상태를 얼마나 오래 유지해야 할까요? 상태 데이터를 영원히 보관하고 싶지는 않습니다. 정기적으로 제거하지 않으면 상태 데이터가 결국 메모리를 가득 채워 성능 저하를 일으킬 수 있습니다. 바로 이런 경우에 워터마크가 필요합니다.

.withWatermark 설정을 사용하면 레코드가 몇 초, 몇 분, 몇 시간 또는 며칠 늦게 도착해도 되는지 지정할 수 있으며, 결과적으로 구조화된 스트리밍에서는 상태에 저장된 레코드가 더 이상 필요하지 않은 시점을 파악할 수 있게 됩니다. 이 예에서는 데이터의 event_timestamp 열을 기준으로 최대 10분 늦은 데이터까지 허용하도록 지정하고 있습니다:

.withWatermark("event_timestamp", "10 minutes")

구조화된 스트리밍은 각 마이크로 배치가 종료될 때 마지막 이벤트 타임스탬프에서 withWatermark 설정의 시간 간격을 빼서 워터마크 타임스탬프를 계산하고 저장합니다. 또한 구조화된 스트리밍은 각 마이크로 배치가 시작될 때 입력 레코드의 이벤트 타임스탬프와 현재 상태(State)의 데이터를 워터마크 타임스탬프와 비교합니다. 타임스탬프가 워터마크 값보다 이전인 입력 레코드와 상태는 삭제됩니다.

이 워터마킹 메커니즘을 통해 구조화된 스트리밍은 단일 스트림에 얼마나 많은 상태 저장 연산자가 있는지에 관계없이 지연된 레코드와 상태를 적절히 처리할 수 있습니다. 아래 예시에서 워터마크를 이용하는 예를 볼 수 있습니다.

사용 사례

이제 상태 저장 연산자가 무엇인지에 대한 일반적인 개념을 이해하셨으니, 실제로 어떻게 사용되는지 살펴 봅시다. 동일한 스트림에서 여러 상태 저장 연산자를 사용하는 몇 가지 예를 살펴보겠습니다.

시간 윈도우 집계를 연결하기

이 예제에서는 원시 이벤트 스트림을 수신하고 있습니다. 사용자별로 10분마다 발생한 이벤트 수를 세고, 그 수를 시간별로 평균한 다음 결과를 기록하고자 합니다. 이를 위해서는 두 개의 시간 윈도우 집계를 연결해야 합니다.

먼저 표준 readStream 호출을 사용하여 userId, eventTimestamp 형식으로 원본 데이터를 읽습니다:

events = spark.readStream…

구조화된 스트리밍과 함께 제공되는 모든 스트리밍 소스를 사용할 수 있습니다.

다음으로, userId에 대한 첫 번째 윈도우 집계를 수행합니다. 윈도우의 기준이 되는 타임스탬프 열과 윈도우 길이를 정의하고, 결과 출력 및 상태 삭제 전에 지연 데이터를 얼마나 오래 기다릴지 구조화된 스트리밍에 알려주는 워터마크를 정의해야 합니다. 여기서는 데이터가 최대 1분까지 지연될 수 있다고 정했습니다. 코드는 다음과 같습니다:

eventCount = events \
  .withWatermark("eventTimestamp", "1 minute") \
  .groupBy(
    window(events.eventTimestamp, "10 minutes"), 
    events.userId
  ).count()

윈도우 집계를 수행할 때 구조화된 스트리밍은 결과에 윈도우 열을 자동으로 생성합니다. 이 윈도우 열은 윈도우의 시작 및 종료 타임스탬프를 가진 구조체입니다. 한 시간 후 userId 1과 2에 대한 출력은 다음과 같이 표시될 것입니다:

windowuserIdcount
{"start": "2023-06-02T11:00:00", "end": "2023-06-02T11:10:00"}112
{"start": "2023-06-02T11:00:00", "end": "2023-06-02T11:10:00"}27
{"start": "2023-06-02T11:10:00", "end": "2023-06-02T11:20:00"}18
{"start": "2023-06-02T11:10:00", "end": "2023-06-02T11:20:00"}216
{"start": "2023-06-02T11:20:00", "end": "2023-06-02T11:30:00"}15
{"start": "2023-06-02T11:20:00", "end": "2023-06-02T11:30:00"}210
{"start": "2023-06-02T11:30:00", "end": "2023-06-02T11:40:00"}115
{"start": "2023-06-02T11:30:00", "end": "2023-06-02T11:40:00"}26
{"start": "2023-06-02T11:40:00", "end": "2023-06-02T11:50:00"}19
{"start": "2023-06-02T11:40:00", "end": "2023-06-02T11:50:00"}219
{"start": "2023-06-02T11:50:00", "end": "2023-06-02T12:00:00"}111
{"start": "2023-06-02T11:50:00", "end": "2023-06-02T12:00:00"}217

이러한 카운트를 입력받아 시간별 평균을 구하기 위해서는 위 윈도우 열의 타임스탬프를 사용하여 또 다른 윈도우 집계를 정의해야 합니다. 다중 상태 저장 연산자를 사용하기 전에는 다음 다이어그램과 같이 위의 결과를 writeStream을 사용하여 싱크에 쓴 다음, 그 데이터를 새 스트림으로 읽어 두 번째 집계를 수행해야 했을 것입니다.

Without Multiple Stateful Operators
다중 상태 저장 연산자가 없는 경우

이 새로운 기능 덕분에 이제 동일한 스트림에서 두 작업을 모두 연결할 수 있습니다.

With Multiple Stateful Operators
다중 상태 저장 연산자를 이용하는 경우

윈도우 집계들을 쉽게 연결할 수 있도록, 이전 집계에서 생성된 윈도우 열을 윈도우 함수에 직접 전달할 수 있는 편리한 새 구문이 추가되었습니다. 아래 코드에서 eventCount.window 열이 전달되는 것을 볼 수 있습니다. 이제 윈도우 함수는 윈도우 열의 구조체를 올바르게 해석하여 다른 윈도우를 생성할 수 있습니다. 다음은 시간 윈도우를 정의하고 카운트의 평균을 구하는 두 번째 집계입니다. 하나의 입력 소스로만 작업하고 있고 해당 워터마크가 이전 집계 전에 지정되었으므로 다른 워터마크를 별도로 정의할 필요가 없습니다:

eventAvg = eventCount \
  .groupBy(
    window(eventCount.window, "1 hour"), 
    eventCount.userId
  ).avg(eventCount.count)

두 번째 집계가 끝나면 userId 1과 2에 대한 데이터는 다음과 같이 표시됩니다:

windowuserIdavg
{"start": "2023-06-02T11:00:00", "end": "2023-06-02T12:00:00"}110
{"start": "2023-06-02T11:00:00", "end": "2023-06-02T12:00:00"}212.5

마지막으로, writeStream을 사용해 싱크에 데이터 프레임을 기록합니다. 이 예제에서는 델타 테이블에 쓰고 있습니다:

eventAvg.writeStream \
  .outputMode("append") \
  .format("delta") \
  .option("checkpointLocation",checkpointPath) \
  .trigger(processingTime="30 seconds") \
  .queryName("eventRate") \
  .start(outputPath)

 "append" 출력 모드를 지원하는 모든 싱크가 지원됩니다. append 모드를 사용하기 때문에 윈도우가 닫힐 때까지 해당 윈도우에 대한 데이터가 싱크에 기록되지 않습니다. 워터마크 값이 윈도우 종료 시각에 지연 허용 시간을 더한 것보다 늦을 때까지 윈도우는 닫히지 않습니다. 위의 시간별 집계 윈도우 예시에서 이벤트 타임스탬프가 12:01보다 늦은 데이터를 수신하기 시작하면, 워터마크 값은 해당 윈도우 종료 시각에 지연 허용 시간 1분을 더한 값보다 늦어집니다. 그러면 창이 닫히고 데이터가 싱크로 방출됩니다. 워터마크의 계산은 처리 시점(clock time)과 관련이 없으며, 수신하는 데이터의 이벤트 타임스탬프를 기반으로 합니다.

윈도우 집계를 이용한 스트림-스트림의 시간차 조인

이 예제에서는 두 스트림을 함께 조인한 다음 1시간 단위로 집계합니다. 클릭 데이터가 포함된 스트림과 광고 노출이 포함된 스트림을 조인한 다음, 매 시간마다 광고당 클릭 수를 계산하고자 합니다.

스트림-스트림 조인도 상태 저장 연산자입니다. 일치하는 레코드가 서로 다른 마이크로 배치에 존재할 수 있기 때문에 레코드는 두 조인 데이터셋 모두에 대해 상태(State)로 유지됩니다. 상태가 무한정 증가하여 메모리 및 성능 문제를 일으키지 않도록, 일정 시간이 지나면 구조화된 스트리밍이 상태 레코드를 삭제하게 하려 합니다. 이를 위해서는 각 입력 스트림에 워터마크를 정의하고 조인 조건에 시간 간격 절을 추가해야 합니다.

먼저 각 데이터셋를 읽고 워터마크를 적용합니다. 두 스트림의 워터마크에 동일한 시간 간격을 지정할 필요는 없습니다. 이 예제에서는 노출은 최대 2시간 늦게 도착하고, 클릭은 최대 3시간 늦게 도착하도록 허용하고 있습니다:

impressions = spark.readStream…
clicks =  spark.readStream…

impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

이제 두 입력 스트림을 조인합니다: 

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"
)

시간 간격 조건절에 주목하세요. 즉, 광고 노출 후 0초에서 1시간 이내에 클릭이 발생해야 조인 결과셋에 포함될 수 있습니다. 이 시간 제약 조건을 통해 구조화된 스트리밍은 더 이상 행을 상태에 유지할 필요가 없어지는 시점을 결정할 수 있습니다.

조인된 데이터셋이 구해지면 각 광고에 대해 매시간 동안 발생한 클릭 수를 계산할 수 있습니다. 다른 워터마크를 지정하거나 집계 함수에 다른 것을 추가할 필요 없이, 두 번째 상태 저장 연산자는 바로 작동합니다!

adCounts = joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

마지막으로 결과를 출력합니다. writeStream 구문은 쿼리 이름, 출력 위치 및 체크포인트 위치만 다를 뿐 이전 예제와 동일합니다.

장점 

그렇다면 여러 상태 저장 연산자를 연결하면 어떻게 복잡성, 지연 시간, 비용을 줄일 수 있을까요? 위의 스트림-스트림 조인 후 윈도우 집계가 뒤따르는 것을 예로 들어보겠습니다.

이 기능이 나오기 전에는 각 상태 저장 연산자마다 자체 스트림이 필요했기 때문에, 조인을 위한 스트림과 첫 번째 스트림의 출력을 입력으로 사용하는 윈도우 집계용 두 번째 스트림이 있었습니다. 두 스트림이 모두 동일한 Spark 클러스터에서 실행되더라도 각 스트림마다 체크포인팅 오프셋과 커밋 로그, Spark UI에 플로팅하기 위한 트래킹 메트릭 등의 자체 오버헤드가 있습니다. 첫 번째 스트림은 소스 데이터를 읽고 조인한 후 외부 저장소의 싱크에 씁니다. 그런 다음 두 번째 스트림은 조인된 데이터를 다시 읽고, 윈도우 집계를 수행한 다음, 결과를 다른 싱크에 씁니다. 두 스트림을 모두 모니터링해야 하며, 어느 스트림에서든 로직이 변경될 때마다 스트림 간의 종속성을 관리해야 했습니다.

이제 여러 상태 저장 연산자를 결합할 수 있으므로 조인과 윈도우 집계를 모두 동일한 스트림에서 처리할 수 있습니다.

  • 복잡성이 감소합니다. 모니터링할 스트림이 하나만 있고 관리해야 할 스트림 간의 종속성이 없기 때문입니다. 또한 하나의 데이터셋만 저장되므로 관리해야 할 데이터도 줄어듭니다.
  • 지연 시간이 단축됩니다. 조인 후 중간 데이터셋을 기록하고 윈도우 집계 전에 해당 데이터셋을 다시 읽는 과정이 모두 제거되기 때문입니다.
  • 비용이 절감됩니다. 중간 데이터 쓰기 및 읽기가 제거되고 스트리밍 오버헤드가 감소하므로 필요한 컴퓨팅 양이 줄어들기 때문입니다. 중간 데이터셋에 대한 스토리지 비용도 제거됩니다.

상태 저장 연산자가 3개, 4개 또는 5개가 있다면 복잡성, 지연 시간, 비용이 얼마나 줄어들지 생각해 보세요!

추가 고려 사항

단일 스트림에서 여러 상태 저장 연산자를 사용할 때는 몇 가지 유의해야 할 사항이 있습니다.

  • 첫째로, "append" 출력 모드를 사용해야 합니다. "update" 와 "complete" 출력 모드는 대상 싱크가 이를 지원하더라도 사용할 수 없습니다. append 모드를 사용하기 때문에, 윈도우 종료 타임스탬프가 워터마크보다 작을 때까지 윈도우 집계에 대한 출력이 싱크에 기록되지 않습니다. 매치되지 않는 외부 조인 행은 이벤트 타임스탬프가 워터마크보다 작을 때까지 기록되지 않습니다.
  • 다음으로, mapGroupsWithState, flatMapGroupsWithState, applyInPandasWithState 은 스트림의 마지막 연산자인 경우에만 다른 상태 저장 연산자와 결합할 수 있습니다. mapGroupsWithState, flatMapGroupsWithState 또는 applyInPandsWithState 이후에 다른 상태 저장 연산을 수행해야 하는 경우, 먼저 싱크에 기록한 다음 두 번째 스트림에서 다른 상태 저장 연산자를 사용해야 합니다.
  • 마지막으로, 상태 저장 스트리밍에서는 스트림에 상태 저장 연산자를 하나를 사용하든 여러 개를 사용하든 동일한 모범 사례가 적용됩니다. 많은 상태를 저장하는 경우, 상태 관리에 RocksDB를 사용하면 기본 메커니즘보다 100배 더 많은 상태 키를 유지할 수 있습니다. 데이터브릭스 레이크하우스 플랫폼에서 실행하는 경우, 비동기 체크포인트(asynchronous checkpoints)와 상태 리밸런싱(state rebalancing)을 사용하여 상태 저장 스트리밍 성능을 개선할 수도 있습니다.

결론

오늘날의 비즈니스 요구 사항을 충족하기 위해서는 그 어느 때보다 더 많은 양의 데이터를 더 빠르게 처리해야 합니다. 데이터브릭스는 라이트스피드 프로젝트를 통해 지연 시간, 기능, 커넥터, 배포와 모니터링 등 구조화된 스트리밍의 모든 측면을 지속적으로 개선하고 있습니다. 이 최신 기능 개선으로 이제 구조화된 스트리밍 사용자들은 단일 스트림 내에 여러 개의 상태 저장 오퍼레이터를 보유할 수 있어 지연 시간, 복잡성 및 비용을 줄일 수 있습니다. 데이터브릭스 레이크하우스 플랫폼 Runtime 13.1 이상 또는 곧 출시될 Apache SparkTM 3.5.0 릴리즈에서 바로 사용해 보세요!

Databricks 무료로 시작하기

관련 포스트

Engineering blog

Project Lightspeed Update - Advancing Apache Spark Structured Streaming

In this blog post, we will review the advancements in Spark Structured Streaming since we announced Project Lightspeed a year ago, from performance...
Engineering blog

State Rebalancing in Structured Streaming

In light of the accelerated growth and adoption of Apache Spark Structured Streaming, Databricks announced Project Lightspeed at Data + AI Summit 2022...
Engineering blog

Speed Up Streaming Queries With Asynchronous State Checkpointing

May 2, 2022 작성자: Craig Ng in 엔지니어링 블로그
Background / Motivation Stateful streaming is becoming more prevalent as stakeholders make increasingly sophisticated demands on greater volumes of data. The tradeoff, however...
모든 엔지니어링 블로그 포스트 보기