주요 컨텐츠로 이동

Apache Spark™ Structured Streaming의 간소화된 상태 추적 기능 출시

Introducing Easier Change Data Capture in Apache Spark™ Structured Streaming

Published: January 27, 2025

솔루션1분 이내 소요

Summary

  • 간편해진 상태 추적: Apache Spark™ 구조화된 스트리밍의 State Reader API에 새롭게 추가된 변경 피드와 스냅샷 기능은 상태 변경의 디버깅과 분석 과정을 크게 단순화합니다.
  • 개발 속도 향상: 최소한의 쿼리만으로 여러 마이크로 배치에 걸친 상태 변화를 추적할 수 있으며, 정확한 진단을 위해 특정 시점의 상태 스냅샷을 쉽게 재구성할 수 있습니다.
  • 향상된 접근성: 대시보드 및 다양한 분석 도구와의 원활한 통합이 가능해져, 엔지니어뿐만 아니라 비즈니스 관계자들도 손쉽게 데이터 인사이트를 얻을 수 있습니다.

이 블로그에서는 Apache Spark™ 구조화된 스트리밍의 State Reader API에 새롭게 추가된 변경 피드와 스냅샷 기능을 소개합니다. State Reader API를 통해 사용자들은 구조화된 스트리밍의 내부 상태 데이터에 접근하고 이를 분석할 수 있게 되었습니다. 독자들은 이 새로운 기능을 활용하여 디버깅, 문제 해결, 그리고 상태 변화를 효과적으로 분석하는 방법을 익힐 수 있습니다. 이러한 기능은 대규모 스트리밍 작업을 더욱 수월하게 관리할 수 있도록 도와줍니다

상태 변경을 간편하게 처리하는 방법

Apache Spark 구조화된 스트리밍은 대규모 실시간 데이터 처리의 핵심으로 자리 잡았습니다. 하지만 스트리밍 작업이 복잡해질수록 개발, 디버깅, 문제 해결이 더욱 어려워졌습니다. 이에 Databricks는 2024년 3월 State Reader API를 도입하여 상태 데이터와 메타데이터를 쉽게 조회할 수 있는 강력한 도구를 제공했습니다.

Databricks는 최근 State Reader API를 개선하여 상태 추적과 분석을 더욱 용이하게 만들었습니다. 이 개선된 기능은 상태 저장소의 변경 로그 데이터를 활용해 표준 Change Data Capture (CDC) 형식의 변경 피드를 제공합니다. 또한 체크포인트 디렉토리에서 원하는 스냅샷을 사용해 상태의 뷰를 생성할 수 있게 되었습니다.

이 블로그에서는 이러한 새로운 기능들을 자세히 살펴보며, 상태 변경 추적, 데이터 변환 감사, 상태 스냅샷 재구성이 어떻게 간소화되는지 설명하겠습니다. 변경 피드를 통해 시간에 따른 상태 값 변화를 쉽게 관찰할 수 있어 개발 속도가 향상됩니다. 이전 버전의 State Reader API에서도 가능했지만 더 많은 코드가 필요했던 반면, 이제는 몇 가지 옵션 설정만으로 변경 피드를 구축할 수 있습니다.

이러한 개선 사항들은 개발과 테스트를 넘어 분석가들의 데이터 접근성도 높였습니다. 예를 들어, 예약된 쿼리로 AI/BI 대시보드 시각화를 쉽게 구현할 수 있게 되어 복잡한 스트리밍 데이터와 실행 가능한 인사이트 사이의 간극을 좁힐 수 있게 되었습니다.

필수 구성 요소:

State Reader API의 변경 피드 기능을 사용하려면 델타 기반 상태 체크포인팅이 활성화되어 있어야 합니다. 여기서 "델타"는 "변화" 또는 "차이"를 의미하며, Delta Lake와는 관련이 없습니다. HDFS 기반 상태 저장소 구현은 기본적으로 이 델타 기반 상태 체크포인팅을 사용합니다. 반면, RocksDB 기반 상태 저장소 구현을 사용할 경우에는 변경 로그 체크포인팅을 활성화하기 위해 추가적인 Spark 설정이 필요합니다.

State Reader API 리뷰

기본 상태 저장소은 다음과 같은 옵션을 제공합니다:

  • batchId: 상태 저장소 값을 읽을 대상 배치를 지정합니다. 설정하지 않으면 최신 batchId가 사용됩니다.
  • operatorId: 상태 저장소 값이 필요한 연산자를 지정합니다. 기본값은 0입니다. 여러 상태 유지 연산자가 있는 경우 이 옵션으로 다른 연산자의 상태에 접근할 수 있습니다.
  • storeName: 읽을 상태 저장소의 이름을 나타냅니다. 하나의 상태 유지 연산자가 여러 상태 저장소 인스턴스를 사용할 때 활용됩니다. 스트림-스트림 조인 시 storeName과 joinSide 중 하나만 지정해야 하며, 둘 다 지정할 수는 없습니다.
  • joinSide: 스트림-스트림 조인에서 상태를 읽을 때 사용하며, "오른쪽" 또는 "왼쪽"을 지정할 수 있습니다.

출력 DataFrame 스키마는 다음 열을 포함합니다:

  • key: 상태 체크포인트에서의 상태 유지 연산자 레코드 키
  • value: 상태 유지 연산자 레코드의 값
  • partition_id: 상태 유지 연산자 레코드가 포함된 체크포인트 파티션

이러한 기본 옵션과 출력 스키마를 통해 특정 batchId에 대한 상태 저장소의 내용을 쉽게 이해하고 분석할 수 있습니다.

예제

다음 예시는 statestore Spark 데이터 소스 형식을 사용하여 상태 저장소 데이터를 쿼리하는 방법을 보여줍니다. 예를 들어, userId 8의 카운트 값 변화를 조사한다고 가정해 보겠습니다. 새로운 State Reader API 옵션이 도입되기 전에는, userId 8의 데이터가 마이크로 배치에 걸쳐 어떻게 변화하는지 관찰하려면 아래 쿼리를 여러 batchId에 대해 반복적으로 실행해야 했습니다. (아래 첫 번째 코드 블록의 세 번째 줄에서 batchId를 변경하며 실행) 이러한 방식은 데이터 변화를 추적하는 데 있어 비효율적이고 번거로웠습니다. 하지만 새로운 State Reader API 옵션들이 도입되면서 이 과정이 크게 개선되었습니다.

이전에는 특정 키의 값 변화를 추적하는 것이 복잡하고 시간 소모적인 작업이었습니다. 여러 번의 쿼리를 실행해야 했고, 그 결과를 수동으로 비교해야 했죠. 하지만 이제 새롭게 도입된 옵션들로 인해 이 과정이 크게 간소화되었습니다. 이 기능들이 어떻게 데이터 변화 추적을 쉽고 효율적으로 만드는지 살펴보겠습니다. 

새로운 옵션 소개

State Reader API의 변경 피드 기능에 새롭게 추가된 옵션들은 다음과 같습니다:

  Option 설명
변경 피드
  readChangeFeed "true"로 설정하면 변경 피드 출력이 활성화됩니다.
  changeStartBatchId (필수) 변경 피드가 시작될 batchId를 지정합니다.
  changeEndBatchId (선택 사항) 변경 피드에서 사용할 마지막 배치를 지정합니다.
스냅샷
  snapshotPartitionId 특정 파티션만 읽고자 할 때 사용합니다. snapshotStartBatchId와 함께 사용해야 합니다.
  snapshotStartBatchId snapshotPartitionId와 함께 사용해야 합니다.
  snapshotEndBatchId 또는 batchId (선택 사항) 스냅샷 값 생성에 사용할 마지막 배치를 지정합니다.

batchId 옵션 사용 시 유의해야 할 점이 있습니다. 기본적으로 최근 100개의 체크포인트와 관련 상태 파일만 유지됩니다. 이 설정은 spark.sql.streaming.minBatchesToRetain 속성으로 조정할 수 있습니다. 만약 더 이상 존재하지 않는 오래된 배치의 상태 데이터에 접근하려 하면 다음과 같은 오류 메시지가 표시될 수 있습니다:
[STDS_OFFSET_LOG_UNAVAILABLE] The offset log for 92 does not exist, checkpoint location: /Volumes/mycheckpoint-path.

변경 피드 예제

아래 예시에서는, 키 userId 값 8에 대한 변경 사항을 관찰하기 위해 변경 피드를 사용합니다. change_type 필드는 개발, 디버깅 또는 생산 데이터 문제를 조사할 때 유용할 수 있습니다. 변경 피드 데이터를 통해 키의 값이 여러 마이크로 배치에 걸쳐 어떻게 변경되었는지 빠르게 확인할 수 있습니다. 아래 예시에서는, 상태 키가 윈도우를 포함하는 경우, partition_id가 어떻게 변경되었는지도 확인할 수 있습니다.

 

이번 예시에서는 변경 피드를 활용하여 userId 값 8에 대한 변화를 추적해보겠습니다. 변경 피드에서 제공하는 change_type 필드는 개발 과정, 디버깅, 또는 실제 데이터 문제를 조사할 때 매우 유용한 정보를 제공합니다. 변경 피드 데이터를 통해 우리는 특정 키의 값이 여러 마이크로 배치에 걸쳐 어떻게 변화했는지 한눈에 파악할 수 있습니다. 이 예시에서는 상태 키가 윈도우를 포함하는 경우 partition_id의 변화도 함께 관찰할 수 있습니다. 

스냅샷 예시

Apache Spark의 뛰어난 장애 허용성(tolerance) 덕분에 상태 저장소의 손상은 매우 드물게 발생합니다. 마이크로 배치가 계획되고 오프셋이 체크포인트 위치에 기록된 후, 커밋이 완료되면 상태 데이터가 체크포인트 위치와 동기화되기 때문입니다. 하지만 인적 오류나 버그로 인한 문제는 여전히 발생할 수 있습니다. 이러한 상황에서 State Reader API의 스냅샷 기능이 유용하게 활용될 수 있습니다. 이 기능은 변경 로그 데이터를 사용해 상태를 재구성할 수 있어, 후속 스냅샷 파일을 거치지 않고도 필요한 정보를 얻을 수 있습니다. 스냅샷 기능을 사용하려면 스냅샷 파일이 존재하는 시작 batchId(snapshotStartBatchId 옵션으로 지정)가 필요합니다. 이 시작 batchId부터 snapshotEndBatchId 옵션으로 지정된 batchId까지의 상태를 기반으로 전체적인 상태 그림을 구성합니다. 이를 통해 특정 시점의 상태를 정확하게 파악하고 분석할 수 있습니다.

RocksDB 상태 저장소를 사용하는 경우, 기본 파일 구조는 다음과 같습니다:

1740.zip 스냅샷 상태를 시작점으로 배치 1800의 상태를 재구성하려면 다음과 같은 코드를 사용할 수 있습니다:

체크포인트 파일 목록을 보면 스냅샷된 데이터가 1740.zip에 저장되어 있음을 알 수 있습니다. 그러나 State Reader API를 사용할 때는 snapshotStartBatchId를 1741로 지정했습니다. 이는 파일 명명 규칙이 1부터 시작하는 인덱스를 사용하는 반면, Spark UI의 batchId 번호는 0부터 시작하기 때문입니다.

결론

State Reader API의 새로운 기능은 데이터 감사, 탐색, 상태 변경 시각화 등 다양한 가능성을 열어줍니다. 이 기능은 개발자들의 작업 효율성을 크게 향상시키며, 기존에는 여러 배치에 걸친 상태 값 추출을 위해 별도의 쿼리가 필요했던 작업을 간소화합니다. 이러한 혁신의 혜택은 개발 및 지원 인력에 국한되지 않습니다. 비즈니스 관계자들 역시 변경 피드 데이터를 통해 얻을 수 있는 인사이트에 큰 관심을 보일 수 있습니다. State Reader API의 개선으로 데이터를 표면화하는 쿼리와 대시보드 구축이 더욱 용이해졌습니다.

결론적으로, 변경 피드 기능은 마이크로 배치 전반에 걸친 상태 변경을 상세히 추적할 수 있게 해주어 개발 및 디버깅 과정을 크게 개선합니다. 또한, 스냅샷 기능은 강력한 진단 도구로 작용하여 엔지니어들이 변경 로그 파일에서 상태를 재구성하고 특정 시점(batchId)의 상태를 완전히 파악할 수 있게 해줍니다.

State Reader API에 대해 더 자세히 알아보고 싶다면 여기를 클릭하거나, 실제 데모를 보고 싶다면 여기를 클릭하세요.

(한글화: 황경태 원문보기)

게시물을 놓치지 마세요

관심 있는 카테고리를 구독하고 최신 게시물을 받은편지함으로 받아보세요