-
[스파크 스트리밍] Structured Streaming 구조적 스트리밍 개발하기 (기본)개발지식 아카이브/Data - Spark 2025. 2. 26. 07:00
스파크 구조적 스트리밍
Structured Streaming은 스파크 엔진에 구축되어, 대용량 데이터에 대한 스트리밍 애플리케이션과 파이프라인을 구축할 수 있도록 개발되었으며 다음과 같은 특징이 있다.
- DataFrame과 Dataset API 기반으로 설계되었다. 즉, Input Data를 정해진 스키마에 맞게 구조화하여 처리한다는 뜻이다.
- 이를 통해 SQL 쿼리나 DataFrame 연산을 스트리밍 데이터에도 동일하게 적용할 수 있다.
- 마이크로배치 + 연속 처리 모드를 지원.
- 마이크로배치는 일정 시간 단위로 데이터를 배치처럼 처리하고 연속 처리 모드를 사용하면 거의 실시간에 가까운 처리를 함
- End-to-End Exactly-once 보장
- 데이터 유실, 중복 없이 정확히 한 번만 처리
Structured Streaming은 Spark 2.X 이후로 표준으로 자리 잡았기 때문에, RDD 기반의 스파크 스트리밍과 비교하고 있다면, 가급적 Structured Streaming 을 선택하는 것을 추천한다.
구현 예시 (simple)
스트리밍 애플리케이션은 크게 두 단계로 나눌 수 있다.
1. 데이터를 스트림으로부터 읽어들이는 readStream
2. 처리한 후 외부 스토리지나 다른 시스템으로 출력하는 writeStream
예시에서 언어는 Scala를 사용하고, input format은 kafka && 파일로 상정하여 작성하겠다!
1. 스트림에 연결하기 (readStream 사용) - 파일 스트림 예시
스파크 세션을 초기화한 후,
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder .appName("KafkaStructuredStreamingExample") .getOrCreate()
파일 스트리밍을 시작한다. 배치에서는 read API를 사용하지만, 스트리밍에서는 readStream API를 사용한다.val fileStream = spark.readStream .format("json") .schema(schema) .option("mode", "DROPMALFORMED") .load("/tmp/test")
<위 코드 설명>
- format에 파일 기반 스트리밍 소스인 json을 지정하였다. 현재 지원되는 파일 기반 스트리밍 소스들은 다음과 같다.
- json
- orc
- parquet
- csv
- text
- textFile
- schema 메소드는 데이터 스키마에 대한 스키마를 제공할 수 있게 해 준다.
- 사용할 수 있는 다양한 옵션들이 있다.
- mode
- DROPMALFORMED => 예시 코드에서 사용한 옵션이다. 이 옵션은 JSON 형식을 준수하지 않거나 스키마와 다른 라인을 삭제하도록 해주는 것이다.
- PERMISSIVE => (default) 오류가 난 행은 모든 컬럼을 null으로 채우고, $_corrupt_record 컬럼에 원본 데이터를 넣어준다
- FAILFAST => 손상 레코드 발생시 예외를 발생시킨다. 다만 스트리밍 프로세스에서는 권장되는 방법은 아니다.
- maxFilesPerTrigger
- 한 번의 Trigger(마이크로 배치)마다 읽을 최대 파일 개수를 제한할 수 있다. 기본적으로 Spark는 새롭게 추가된 모든 파일을 읽으려 하지만, 이 옵션 사용시 지정된 개수만큼만 읽어와 리소스 과부하를 방지할 수 있습니다.
- 한 번의 Trigger(마이크로 배치)마다 읽을 최대 파일 개수를 제한할 수 있다. 기본적으로 Spark는 새롭게 추가된 모든 파일을 읽으려 하지만, 이 옵션 사용시 지정된 개수만큼만 읽어와 리소스 과부하를 방지할 수 있습니다.
- maxFileAge
- 파일 소스를 읽을 때, 특정 시간 이상된 파일은 처리하지 않도록 설정하는 옵션이다.
- 파일 소스를 읽을 때, 특정 시간 이상된 파일은 처리하지 않도록 설정하는 옵션이다.
- mode
- load 메소드는 제공된 경로를 모니터링하고, 경로에 생성된 새 파일을 읽어온다.
2. 스트림에 연결하기 (readStream 사용) - kafka 스트림 예시
2.1 스트림에 연결한다.
이번엔 카프카 스트림에 연결한다.val kafkaStream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", topic) .option("startingOffsets", "earliest") // "latest" 옵션도 가능 .load()
format이 json 혹은 csv일 때는 schema 옵션을 통해 데이터를 구조화하였다.
그러나 kafka의 경우 key/value가 바이너리로 되어 있어 schema 옵션을 지원하지 않는다. 그래서 from_json으로 구조화를 따로 구현 할 것이다!
readStream 으로 생성된 결과물 DataFrame kafkaStream의 스키마를 보면 개발에 도움이 될 것이다. (key, value 타입이 binary로 되어있군요. offset도 들어있고, timestamp도 들어있네요)
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
startingOffsets 옵션은 쿼리가 처음 시작될 때만 사용된다. 이후부터는 저장된 체크포인트 정보가 사용된다. 만약 스트리밍 작업을 다시 시작하려면 체크포인트 내용을 제거해야 한다.
maxOffsetsPerTrigger 옵션도 많이 사용되는 옵션 중 하나인데, 카프카 소스에서 한번의 마이크로배치가 읽어올 최대 오프셋 수를 제한하는 옵션이다. (default: 없음) 이 옵션은 Kafka로부터 너무 많은 데이터를 한 번에 읽는 것을 제한하기 위해 사용할 수 있다. 이 옵션의 사용 목적은 스파크 클러스터 리소스에 과부하가 걸리지 않도록 보호하고, 처리 시간을 일정하게 유지하기 위함이다.
모든 카프카 옵션을 사용할 수 있는 것은 아니다. 다음과 같은 카프카 옵션들은 금지되어 있다.
금지된 옵션 이유 auto.offset.reset - 체크포인트 기반으로 오프셋을 관리하는 스파크 VS 카프카 컨슈머의 오프셋 관리 방식이 다름!!
- 둘이 충돌할 수 있음group.id - Spark는 각 스트리밍 쿼리마다 고유한 group.id를 자동 생성하여 사용.
- 직접 설정 시 여러 쿼리 간 간섭이 발생해 데이터 중복이나 누락 문제가 생길 수 있음.enable.auto.commit - Spark는 체크포인트를 통해 Exactly-once 처리를 보장.
- 이 옵션을 활성화하면, 데이터 유실이나 중복 처리가 발생할 수 있음.key.deserializer - Spark가 내부에서 카프카 데이터를 역직렬화하므로, 직접 지정시 스파크와 충돌할 수 있음 value.deserializer - 위와 상동
2.2 input 레코드 스키마를 정의(구조화)한다.3가지의 방법론이 있으나 예시로는 가장 권장되는 "case class" 작성방식을 사용하였다.
import java.sql.Timestamp case class MyRecord(id: Int, name: String, timestamp: Timestamp)
상황에 따라 다른 방식들도 사용할 수 있다.
1. case class로 스키마 작성
2. StructType, StructField 클래스를 이용해 직접 스키마 표현을 작성 (마치 json처럼 생김)
3. 데이터셋에서 추출.
- ex) sampleDF.schema
2.3 수신된 데이터를 구조화한다
구조화 = Structure (우리가 Structured streaming에 대해 논하고 있다는 거 기억하시겠죠..?)
// Kafka 데이터의 value는 바이너리 타입이므로, 문자열로 변환 val jsonStream = kafkaStream.select("$value").as[String] // JSON 데이터를 case class인 MyRecord로 변환 val recordsDS: Dataset[MyRecord] = jsonStream .select(from_json(col("json"), Encoders.product[MyRecord].schema).as("data")) .select("data.*") .as[MyRecord]
json으로 캐스팅한 후 recordsDS로 구조화하였다. 이후 필요하다면 Dataset [MyRecord]를 활용하여, 필요한 집계나 변환 로직 등 전처리를 구현할 수 있다.
3. 스트림 쓰기 (writeStream 사용)
배치에서는 write API를 사용하지만, 스트리밍에서는 writeStream API를 사용한다.
스트리밍 데이터의 처리가 완료되면, 결과를 출력해야 한다. 이를 위해 DataFrame의 writeStream 메서드를 사용한다.
이때 싱크(sink) 및 출력 모드(outputmode)를 지정해야 하는데, 이는 구조적 스트리밍의 중요한 두 가지 개념이다.
val query = recordsDS.writeStream .format("parquet") // Parquet 포맷 지정 .option("path", "/path/to/output/directory") // 출력 디렉토리 .option("checkpointLocation", "/path/to/checkpoint/dir") // 체크포인트 경로 지정 .outputMode("append") // Append 모드 사용 (주로 Parquet Sink에 적합) .trigger(ProcessingTime("10 seconds")) .start()
Sink란 스트리밍 처리 결과를 어디에 기록할지 결정하는 것을 말한다. 파일, Kafka, 콘솔, 메모리 등 다양한 옵션이 있다.
나는 parquet 파일로 저장하기 위해 format 옵션에 "parquet"를 부여하였다.
ex) csv, hive, json, orc, parquet, avro, text 등의 format은 파일 기반의 sink이다.
option / options 로 키-값 으로 conf 파라미터를 전달할 수 있다. option을 사용하든 options를 사용하든 큰 차이는 없으나, options는 주로 외부에서 받은 key value map을 부여하는데에 사용된다.
trigger 옵션(선택적)을 사용하면 결과 생성 빈도를 지정할 수 있다. trigger 옵션은 기본적으로 Trigger.ProcessingTime 메서드를 사용하며, String, int, long 타입을 모두 지원하는데, String으로 더 많이 사용된다. 숫자로 사용할 때는 조금 주의가 필요하다. int로 주입할 경우 Second 단위로 인식되고, long으로 주입할 경우 MilliSecond 단위로 인식되기 때문이다.
숫자로 사용하려면 위의 코드 예시처럼 시간단위 TimeUnit 파라미터를 추가로 넣어주자.
Output Mode는 스트리밍 쿼리의 결과를 어떻게 기록할지를 결정하는 방법론이다. 다음 옵션들이 있다.
- append: (default) 새로 생성된 최종 레코드를 추가
- 파일 Sink는 일반적으로 append 모드만 지원한다. 새 데이터가 계속 추가되는 형태로 파일 시스템에 저장된다.
- Kafka Sink 역시 일반적으로 append 모드가 주로 사용된다.
- complete: 전체 결과를 매번 기록
- update: 변경된 데이터만 기록
complete와 update는 집계쿼리에서 주로 사용된다.
(주의) WriteStream에서는 현재 위 3가지 기록 방법만 지원된다. 배치방식에서 지원하는 upsert는, 현재 스트림에서는 지원하지 않고 있다. Iceberg, Hudi 와 같은 테이블포맷에 stream으로 upsert 쓰기를 하고 싶다면 어떻게 해야할까?! 자... 다음으로 넘어가보자~
위에서 간단히 설명한 forEachBatch 메소드에 대해 조금 더 부연해보겠다.
forEachBatch는 각 스트리밍 배치를 일반 Spark 배치처럼 처리하기 때문에, Spark의 배치 API를 전부 사용할 수 있다.
따라서 각 마이크로 배치 단위로 파티셔닝, 병합, 필터링, 업로드 등 원하는 작업을 자유롭게 수행하는 것이 가능하다.
아래 예시는 Iceberg 테이블에 스트리밍으로 UPSERT를 하도록 구현해본 것이다. 스트림에서 레코드를 모아서 배치처럼 작동시켜, upsert를 구현한다.
def upsert_to_iceberg(batch_df, batch_id): batch_df.createOrReplaceTempView("batch_temp_view") spark.sql(""" MERGE INTO my_catalog.db.iceberg_table AS target USING batch_temp_view AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.name = source.name, target.value = source.value WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (source.id, source.name, source.value) """) query = data_df.writeStream \ .foreachBatch(upsert_to_iceberg) \ .outputMode("update") \ .option("checkpointLocation", "hdfs://path/to/checkpoint") \ .start()
(Hudi의 경우는, upsert를 내장 옵션으로 조절할 수 있으므로, 마이크로배치 메소드 없이 WriteStream으로 바로 써도 upsert가 적용된다. 공식 문서를 참고해보길 바란다...)
compression 옵션은 압축 형식을 지정할 수 있다.default 값은 None인데, parquet의 경우에만 snappy가 지정된다.
4. 주의할 점
스트림상에서는 다음 API 들이 직접적으로 지원되지 않는다.
- count
- show
- describe
- limit
- take(n)
- distinct
- foreach
- sort
꼭 적용해야한다면, window로 자르면 가능하다.
스트림에는 적용할 수 없으나, window로 1분 단위로 데이터를 자른 후 count()를 계산하는 것은 가능하다.
Reference
1. Official Docs
https://spark.apache.org/docs/4.0.0-preview2/streaming/index.htmlStructured Streaming Programming Guide - Spark 4.0.0-preview2 Documentation
spark.apache.org
https://iceberg.apache.org/docs/1.7.0/spark-structured-streaming/
Structured Streaming - Apache Iceberg™
Spark Structured Streaming Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions. Streaming Reads Iceberg supports processing incremental d
iceberg.apache.org
https://hudi.apache.org/docs/next/writing_tables_streaming_writes/?utm_source=chatgpt.com
Streaming Writes | Apache Hudi
Spark Streaming
hudi.apache.org
2. 스파크를 활용한 실시간 처리 (원제: Stream Processing with Apache Spark)
7~11장을 공부하고 작성하였습니다.
'개발지식 아카이브 > Data - Spark' 카테고리의 다른 글
Spark JAR에 대해 연구해보자... 너는 무엇을 하는 아이니 (1) 2024.12.12 스파크 - 성능 최적화하기, 리팩토링 practice (0) 2023.02.26 스파크는 무엇이고 왜 쓰는지? 스파크에 대해 알아보기 (0) 2023.02.26 - DataFrame과 Dataset API 기반으로 설계되었다. 즉, Input Data를 정해진 스키마에 맞게 구조화하여 처리한다는 뜻이다.