주요 컨텐츠로 이동

Apache Spark™ 3.5 소개

데이터브릭스 런타임 14.0에서 사용 가능
이 포스트 공유하기

(번역: Sangbae Lim) Original Blog Post

오늘, 데이터브릭스 런타임 14.0에서 Apache Spark™ 3.5를 사용할 수 있다는 소식을 발표하게 되어 기쁘게 생각합니다. Spark 3.5 릴리스에 귀중한 기여를 해주신 Apache Spark 커뮤니티에 진심으로 감사드립니다.

Spark를 이전보다 더 쉽게 접근할 수 있고 다용도로 사용하며 효율적으로 만들겠다는 우리의 사명에 맞춰 이 업데이트에는 다음을 포함한 새로운 기능과 개선 사항이 포함되어 있습니다:

  • Spark Connect Scala 클라이언트 정식 버전, 분산 학습 및 추론 지원, SPARK에서 Pandas API의 동일성, 구조화된 스트리밍에 대한 호환성 개선으로 더 많은 시나리오를 지원합니다.
  • 배열을 조작하기 위한 내장 SQL 함수, SQL IDENTIFIER 절, Scala, Python 및 R API에 대한 확장된 SQL 함수 지원, SQL 함수 호출을 위한 명명된 인수 지원, HyperLogLog 근사 집계를 위한 SQL 함수 지원, Arrow에 최적화된 Python UDF, Python 사용자 정의 테이블 함수, PySpark 테스트 API, PySpark의 향상된 오류 클래스 등 새로운 PySpark 및 SQL 기능으로 개발자의 생산성 향상하였습니다.
  • Spark 클러스터에서 DeepSpeed로 분산 트레이닝 간소화를 지원합니다.
  • RocksDB 상태 저장소 공급자의 성능과 안정성이 개선되어 인메모리 상태 저장소 공급자와 비교할 때 트레이드 오프가 감소되었습니다.
  • Apache Spark용 영문 SDK를 통해 사용자는 프로그래밍 언어로 일반 영어를 사용할 수 있어 데이터 변환에 더 쉽게 접근할 수 있고 사용자 친화적으로 사용할 수 있습니다.

이 블로그 포스팅에서는 Apache Spark 3.5의 주요 특징과 획기적인 기능 및 개선 사항을 간략히 소개합니다. 이 흥미로운 업데이트에 대한 자세한 내용은 다음 블로그 포스팅을 기대해 주세요. 핵심적인 세부 사항을 알아보려면 모든 Spark 구성 요소에 대한 주요 기능 및 해결된 JIRA 티켓의 전체 목록이 포함된 포괄적인 Apache Spark 3.5 릴리스 노트를 살펴보는 것을 권장드립니다.

Spark Connect

Spark 3.4.0이 릴리스된 이후 Spark Connect 구현과 관련된 약 680개의 커밋이 있었습니다. 여기에서 변경 사항을 자유롭게 찾아보실 수 있습니다.

Spark 3.5 및 Spark Connect 구성 요소의 주요 결과물은 Spark Connect용 Scala 클라이언트(SPARK-42554)의 정식 버전 출시입니다. 클래스패스를 격리하는 클라이언트에 필요한 종속성 집합을 줄이기 위해 SQL 서브 모듈을 클라이언트(sql-api) 및 서버 호환(sql) 모듈로 분리하는 주요 리팩터링이 포함되어 있습니다(SPARK-44273).

Spark 3.5가 출시되기 전까지는 클라이언트 애플리케이션을 함께 배치해야 하는 Py4J 게이트웨이에 의존했기 때문에 Apache Spark의 MLlib를 Spark Connect와 함께 직접 사용할 수 없었습니다. Spark 3.5에서는 PyTorch 기반의 새로운 분산 실행 프레임워크(SPARK-42471)를 사용하여 Spark Connect를 사용하여 분산 학습 및 추론을 수행할 수 있는 기능을 도입했습니다. 현재 이 모듈은 로지스틱 회귀 분류기, 기본 특징 트랜스포머, 기본 모델 평가기, ML 파이프라인 및 교차 검증을 지원합니다. 이 프레임워크는 Spark의 벡터화된 Python UDF 프레임워크와 원활하게 통합되어 배리어 실행 모드를 사용해 UDF를 실행할 수 있는 기능으로 확장됩니다.

지난 릴리스에서는 Spark Connect(SPARK-42497)를 사용해 Spark에서 Pandas API의 동일성을 제공하기 위해 노력해왔으며, Python과 Scala(SPARK-49238)에서 구조화된 스트리밍 워크로드에 대한 Spark Connect 클라이언트의 호환성을 지속적으로 개선해왔습니다.

마지막으로, 커뮤니티는 별도의 리포지토리(https://github.com/apache/spark-connect-go)에서 개발 중인 Golang의 Spark Connect용 클라이언트(SPARK-43351) 작업을 시작했습니다.

PySpark 기능

이번 릴리스에서는 Arrow에 최적화된 Python 사용자 정의 함수(UDF), Python 사용자 정의 테이블 함수(UDTF), 개선된 오류 메시지, PySpark의 사용성, 성능 및 테스트 용이성을 크게 향상시키는 새로운 테스트 API 등 PySpark의 중요한 개선 사항이 도입되었습니다.

애로우-최적화된 파이썬 UDF(SPARK-40307): 다음 예제와 같이 spark.sql.execution.pythonUDF.arrow.enabled 구성이 True로 설정되거나 UDF 데코레이터를 사용하여 useArrowTrue로 설정하면 Python UDF가 Arrow 열 형식을 활용하여 성능을 개선합니다. 이 최적화를 통해 파이썬 UDF는 벡터화된 I/O를 활용하여 최신 CPU 아키텍처에서 피클된 파이썬 UDF보다 최대 2배 빠른 성능을 발휘할 수 있습니다.

spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
@udf("integer", useArrow=True)
def my_len_udf(s: str) -> int:
    return len(s)

PySpark

파이썬 사용자 정의 테이블 함수(SPARK-43798): 사용자 정의 테이블 함수(UDTF)는 단일 스칼라 결과 값 대신 전체 출력 테이블을 반환하는 사용자 정의 함수의 한 유형입니다. 이제 PySpark 사용자는 파이썬 로직을 통합한 자체 UDTF를 작성하여 PySpark 및 SQL에서 사용할 수 있습니다.

from pyspark.sql.functions import udtf

class MyHelloUDTF:
    def eval(self, *args):
        yield "hello", "world"  

# in PySpark
test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

# in SQL
spark.udtf.register(name="test_udtf", f=test_udtf)
spark.sql("SELECT * FROM test_udtf()").show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

테스팅 API(SPARK-44042): Apache Spark™ 3.5에는 DataFrame 스키마와 DataFrame 내 데이터 간의 차이점을 명확하게 나타내는 자세한 색상 코드 테스트 오류 메시지를 포함하는 새로운 DataFrame 동일성 테스트 유틸리티 기능이 도입되었습니다. 이를 통해 개발자는 애플리케이션에 실행 가능한 결과를 생성하는 동일성 테스트를 쉽게 추가하여 생산성을 높일 수 있습니다. 새로운 API는 다음과 같습니다:

  • pyspark.testing.assertDataFrameEqual
  • pyspark.testing.assertPandasOnSparkEqual
  • pyspark.testing.assertSchemaEqual
pyspark.errors.exceptions.base.PySparkAssertError: [DIFFERENT_ROWS] Results do not match: ( 33.33333 % )
*** actual ***
  Row(name='Amy', languages=['C++', 'Rust'])
! Row(name='Jane', languages=['Scala', 'SQL', 'Java'])
  Row(name='John', languages=['Python', 'Java'])


*** expected ***
  Row(name='Amy', languages=['C++', 'Rust'])
! Row(name='Jane', languages=['Scala', 'Java'])
  Row(name='John', languages=['Python', 'Java'])

PySpark 오류 메시지 개선(SPARK-42986): 이전에는 Python Spark 드라이버에서 발생한 예외 세트가 Apache Spark™ 3.3에 도입된 오류 클래스를 활용하지 않았습니다. DataFrame 및 SQL의 모든 오류가 마이그레이션되었으며 적절한 오류 클래스 및 코드가 포함되어 있습니다.

SQL 기능

Apache Spark™ 3.5에는 많은 새로운 SQL 기능과 개선 사항이 추가되어 Spark에서 SQL/DataFrame API를 사용하여 쿼리를 더 쉽게 작성하고 다른 일반적인 데이터베이스에서 Spark로 마이그레이션할 수 있습니다.

배열을 조작하기 위한 새로운 내장 SQL 함수(SPARK-41231): Apache Spark™ 3.5에는 사용자가 배열 값을 쉽게 조작할 수 있도록 도와주는 새로운 내장 SQL 함수가 다수 포함되어 있습니다. 이 작업을 위해 기본 제공 함수를 사용하면 동일한 목적으로 사용자 정의 함수를 구성하는 것보다 더 쉽고 더 효율적입니다.

IDENTIFIER 절(SPARK-41231): 새로운 IDENTIFIER 절은 SQL 주입 공격의 위험 없이 새로운 SQL 쿼리 템플릿을 안전하게 구축할 수 있는 유연성을 제공합니다. 예를 들어 문자열 리터럴과 함께 IDENTIFIER 절을 사용하여 테이블/열/함수 이름을 지정하고 이전 Spark 릴리스에 추가된 쿼리 매개 변수 기능과 함께 사용하면 매우 강력합니다.

spark.sql(
  "CREATE TABLE IDENTIFIER(:tbl)(col INT) USING json",
  args = {
    "tbl": "my_schema.my_tbl"
  }
)

spark.sql(
  "SELECT IDENTIFIER(:col) FROM IDENTIFIER(:tbl)",
  args = {
    "col": "col",
    "tbl": "my_schema.my_tbl"
  }
).show()

Scala, Python 및 R API에 대한 확장된 SQL 함수 지원(SPARK-43907): Spark 3.5 이전에는 Scala, Python 또는 R DataFrame API에서 사용할 수 없는 SQL 함수가 많았습니다. 사용자가 자동 완성의 도움 없이 문자열 리터럴로 함수 이름을 입력해야 했기 때문에 DataFrames 내에서 함수를 호출하는 데 어려움이 있었습니다. Spark 3.5는 DataFrame API에서 150개 이상의 SQL 함수를 사용할 수 있도록 하여 이 문제를 해결합니다.

SQL 함수 호출에 대한 명명된 인수 지원(SPARK-44059): Python과 유사하게 Spark의 SQL 언어를 사용하면 이제 사용자가 값 앞에 매개변수 이름을 사용하여 함수를 호출할 수 있습니다. 이는 SQL 표준의 사양과 일치하며 함수에 매개변수가 많거나 일부 매개변수에 기본값이 있는 경우 더 명확하고 강력한 쿼리 언어를 제공합니다.

SELECT mask(
  'AbCD123-@$#',
  lowerChar => 'q',
  upperChar => 'Q',
  digitChar => 'd')

Apache Datasketches에 기반한 HyperLogLog 근사 집계를 위한 새로운 SQL 함수 지원(SPARK-16484): Apache Spark™ 3.5에는 중간 계산 결과를 스케치 버퍼에 저장하여 저장소에 영구적으로 저장하고 나중에 다시 로드할 수 있는 것을 포함하여 그룹 내에서 고유 값을 정확하고 효율적으로 계산하기 위한 새로운 SQL 함수가 포함되어 있습니다. 이러한 구현은 오픈 소스 커뮤니티와의 일관성 및 다른 도구와의 손쉬운 통합을 위해 Apache Datasketches 라이브러리를 사용합니다. 

사용 예:

> SELECT hll_sketch_estimate(
    hll_sketch_agg(col, 12))
  FROM VALUES (50), (60), (60), (60), (75), (100) tab(col);
  4

> SELECT hll_sketch_estimate(
    hll_sketch_agg(col))
  FROM VALUES ('abc'), ('def'), ('abc'), ('ghi'), ('abc') tab(col);
  3

DeepSpeed Distributor

이번 릴리스에서는 사용자가 스파크 클러스터(SPARK-44264)에서 DeepSpeed로 분산 트레이닝을 간소화할 수 있도록 DeepspeedTorchDistributor 모듈이 PySpark 추가되었습니다. 이 모듈은  Apache Spark 3.4™에서 출시된 TorchDistributor 모듈의 확장입니다. 내부적으로 DeepspeedTorchDistributor는 DeepSpeed에 필요한 환경과 통신 채널을 초기화합니다. 이 모듈은 단일 노드 멀티 GPU 및 멀티 노드 GPU 클러스터 모두에서 트레이닝 작업 배포를 지원합니다. 다음은 사용 방법에 대한 코드 스니펫 예시입니다:

from pyspark.ml.deepspeed.deepspeed_distributor import DeepspeedTorchDistributor

def train():
  # required boilerplate code
   import deepspeed
   parser = argparse.ArgumentParser(description="DeepSpeed Training")
   parser.add_argument('--deepspeed',
   '--ds',
   action='store_true',
  help='Enable DeepSpeed')
   parser.add_argument('--deepspeed_config',
   '--ds_config',
   type=str,
   help='DeepSpeed config file')
   args = parser.parse_args()

   device = int(os.environ["LOCAL_RANK"])

  # define the model
   model = build_model().to(device)
   model, *_ = deepspeed.initialize(args=args, model=model, 
 model_parameters=model.parameters())
  dataset = make_dataset() 
 loader = DataLoader(dataset)

 # run training
  output = run_training(model, loader, learning_rate=1e-3)
  return output

deepspeed_distributor = DeepspeedTorchDistributor(numGpus=2, nnodes=2, use_gpu=True, localMode=False, deepspeedConfig={...})
deepspeed_distributor.run(train)

자세한 내용과 노트북 예제는 https://docs.databricks.com/en/machine-learning/train-model/distributed-training/deepspeed.html 을 참조하세요.

스트리밍

Apache Spark™ 3.5는 다중 스테이트풀 연산자에 대한 지원 완료 및 RocksDB 스테이트 스토어 공급자 개선 등 스트리밍에 대한 다양한 개선 사항을 도입했습니다.

다중 상태 저장소 연산자 지원 완료(SPARK-42376): Apache Spark™ 3.4에서는 사용자가 연쇄된 시간 윈도우 집계를 포함하여 동일한 쿼리에서 여러 번 상태 저장소 연산(집계, 중복 제거, 스트림-스트림 조인 등)을 수행할 수 있습니다. 스트림-스트림 시간 간격 조인 후 다른 스테이트풀 연산자가 뒤따르는 스트림-스트림 조인은 Apache Spark™ 3.4에서는 지원되지 않았으나, Apache Spark™ 3.5에서는 마침내 이를 지원하여 광고 및 클릭 스트림 조인, 시간 윈도우 집계 등 보다 복잡한 워크로드를 처리할 수 있습니다.

RocksDB 상태 저장소 공급자에 대한 변경 로그 체크포인팅(SPARK-43421): Apache Spark™ 3.5는 상태의 변경 로그(업데이트)를 유지하는 "변경 로그 체크포인팅"이라는 이름의 RocksDB 상태 저장소 공급자에 대한 새로운 체크포인트 메커니즘을 도입합니다. 이렇게 하면 커밋 대기 시간이 크게 줄어들어 엔드 투 엔드 대기 시간도 크게 줄어듭니다. 이 기능을 활성화하려면 config spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 속성을 true로 설정하면 됩니다. 기존 체크포인트로도 이 기능을 활성화할 수 있습니다.

RocksDB 상태 저장소 공급자 메모리 관리 개선(SPARK-43311): RocksDB 상태 저장소 공급자는 상태의 메모리 문제를 해결하는 데 유용한 것으로 잘 알려져 있지만, 세분화된 메모리 관리가 없어 여전히 RocksDB에서 메모리 문제가 발생하고 있습니다. Apache Spark™ 3.5에서는 보다 세분화된 메모리 관리 기능을 도입하여 사용자가 동일한 익스큐터 프로세스에 있는 RocksDB 인스턴스의 총 메모리 사용량을 제한할 수 있어 익스큐터 프로세스별로 메모리 사용량을 추론하고 구성할 수 있게 되었습니다.

dropDuplicatesWithinWatermark 도입(SPARK-42931): 스트리밍 쿼리에 dropDuplicates()를 사용하면서 축적된 경험에 따라, Apache Spark™ 3.5는 이벤트 타임스탬프가 워터마크 지연에 맞을 만큼 충분히 가깝다면 이벤트 시간의 타임스탬프가 동일할 필요 없이 이벤트를 중복 제거해주는 새로운 API dropDuplicatesWithinWatermark()를 도입했습니다. 이 새로운 기능을 통해 사용자는 "중복으로 간주되는 이벤트라도 이벤트 시간의 타임스탬프가 다릴 수 있습니다"와 같은 경우를 처리할 수 있습니다. 예를 들어, 한 가지 실제적인 사례는 사용자가 멱등성 프로듀서 없이 Kafka를 수집하고 레코드의 자동 타임스탬프를 이벤트 시간으로 사용하는 경우입니다.

English SDK

Apache Spark용 English SDK는 프로그래밍 언어로 영어를 사용하여 데이터 엔지니어링 및 분석 워크플로우를 혁신하는 획기적인 도구입니다. 복잡한 작업을 간소화하도록 설계된 이 SDK는 코드 복잡성을 최소화하여 데이터에서 가치 있는 인사이트를 추출하는 데 집중할 수 있게 해줍니다.

일반 영어로 데이터프레임 변환하기

df.ai.transform()` 메서드를 사용하면 간단한 영어 구문을 사용하여 데이터 프레임을 조작할 수 있습니다.

사용 예:

transformed_df = revenue_df.ai.transform('What are the best-selling and the second best-selling products in every category?')

내부적으로 이 명령은 다음 SQL 쿼리로 변환된 후 실행되고 그 결과가 새 데이터 프레임에 저장됩니다:

WITH ranked_products AS (
  SELECT 
    product, 
    category, 
    revenue, 
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
  FROM spark_ai_temp_view_d566e4
)
SELECT product, category, revenue
FROM ranked_products
WHERE rank IN (1, 2)

일반 영어로 데이터 시각화

df.ai.plot()` 메서드는 데이터를 시각화하는 간단한 방법을 제공합니다. 플롯의 유형과 포함할 데이터를 모두 일반 영어로 지정할 수 있습니다.

사용 예:

auto_df.ai.plot('pie chart for US sales market shares, show the top 5 brands and the sum of others')

Visualize Data

추가 리소스

보다 심층적인 정보와 예제를 보려면 GitHub 리포지토리블로그 게시물을 참조하세요.

다른 Apache Spark™ 3.5 추가 기능

획기적인 기능에 스포트라이트가 집중되는 경우가 많지만, 지속적인 플랫폼의 진정한 특징은 사용성, 안정성, 점진적인 개선에 중점을 둔다는 점입니다. 이를 위해 Apache Spark 3.5는 198명이 넘는 기여자들의 협업 덕분에 놀라운 1324개의 문제를 해결하고 해결했습니다. 여기에는 개인뿐만 아니라 Databricks, Apple, Nvidia, Linkedin, UBS, Baidu 등과 같은 영향력 있는 회사의 팀도 포함되어 있습니다. 이 블로그 게시물에서는 SQL, Python 및 스트리밍의 헤드라인을 장식하는 발전 사항을 집중적으로 다루었지만, Spark 3.5는 여기에 설명되지 않은 수많은 다른 개선 사항도 제공합니다. 여기에는 SQL 캐시를 위한 적응형 쿼리 실행, 해제 개선 사항 및 새로운 DSV2 확장 기능 등이 포함됩니다. 이러한 추가 기능에 대한 자세한 설명은 릴리스 노트를 참조하세요.

Apache Spark

지금 Spark 3.5 시작하기

데이터브릭스 런타임 14.0에서 Apache Spark 3.5를 실험해보고 싶다면, 무료 Databricks 커뮤니티 에디션 또는 Databricks 체험판에 가입하면 쉽게 체험할 수 있습니다. 가입한 후에는 버전 "14.0"을 선택하기만 하면 Spark 3.5로 클러스터를 실행할 수 있습니다. 단 몇 분 만에 Spark 3.5가 제공하는 모든 기능을 탐색하면서 실행할 수 있습니다.

Databricks Runtime

Databricks 무료로 시작하기

관련 포스트

모든 엔지니어링 블로그 포스트 보기