Writer: Joonwon Jang
0. Prerequisites For Practice
•
Spark runs on Java 8, 11, or 17.
•
Install Apache Spark (Framework that powered the pyspark)
◦
Latest Version의 Pre-built for Apache Hadoop 3.3 and later download
•
Spark runs on Python 3.7+
•
Install the following libraries in Python
pip install pyspark findspark
conda install jupyter
JavaScript
복사
•
Launch jupyter lab in python
jupyter-lab
JavaScript
복사
1. Introduction to Spark
Spark
•
대규모 데이터 처리를 위한 분석 엔진 (unified analytics engine for large-scale data processing)
•
In-Memory (RAM) (Hadoop은 Disk에서 처리되기 때문에 여기서 속도차이가 발생합니다.)
◦
•
분산 병렬처리가 가능합니다.
•
사용하기 쉽다. (다양한 사용자 API를 통해 접근이 가능하다는 용이성이 있습니다.)
◦
Write applications quickly in Java, Scala, Python, R,and SQL.
MapReduce
•
Spark를 알기 위해서는 MapReduce연산에 대해서 간단히 알고 있으면 매우 좋습니다.
•
MapReduce는 대규모 데이터 세트를 처리하기 위한 프로그래밍 모델이자 구현 (i.e., distributed data processing)입니다. (물론 훨씬 더 깊은 원리가 그 안에 포함되어있겠지만, 데이터를 모델에 학습시키기 위해 처리하는 입장에서 간단히 이해한다고 하면,,, 아래만 이해해보자!)
•
이름에도 명시되어 있듯이, MapReduce은 두 가지 주요 단계, 즉 Map 단계와 Reduce 단계로 구성되어 있습니다.
1.
Map:
•
입력 데이터를 키-값 쌍으로 변환하고, 각 데이터 조각에 지정된 Map 함수를 적용합니다.
(예시)
◦
각 단어를 (단어, 1)의 키-값 쌍으로 변환
◦
"hello world hello"라는 텍스트가 있다면, 이는 [(hello, 1), (world, 1), (hello, 1)]으로 변환
2.
Reduce:
•
Map 단계에서 생성된 키-값 쌍을 키에 따라 그룹화 → 그룹화된 각 키에 대해 Reduce 함수를 적용하여 최종 결과를 생성합니다.
(예시)
◦
[(hello, 1), (hello, 1)] 및 [(world, 1)]로 그룹화 → hello는 2번, world는 1번 출현
•
왜 이런 MapReduce연산을 해야하는 걸까요?
◦
위와 같은 함수형으로 작성된 연산은 (상대적으로 적은 연산이 필요한 상황에서도) 병렬적으로 대용량 데이터의 연산처리를 가능하게 합니다. (위의 그림을 참고해보자)
#### Limitation of MapReduce in Hadoop & Improvement
•
이전에도 언급했듯이 MapReduce 모델은 하둡(Hadoop)과 같은 시스템에서 널리 사용된다. 데이터를 디스크에 저장하고, 중간 결과도 디스크에 쓰기 때문에 대량의 데이터 처리에 적합하지만, 입출력 작업으로 인해 속도가 느려진다는 단점이 존재합니다.
•
MapReduce는 YARN (Hadoop 2.0부터 도입된 리소스 관리 플랫폼으로, Hadoop 클러스터의 자원을 관리. YARN은 자원 할당과 작업 스케줄링을 담당하며, CPU, 메모리 등의 컴퓨팅 자원을 효율적으로 분배하여 여러 애플리케이션이 동시에 실행될 수 있도록 함)상에서 동작하는 분산 애플리케이션 중 하나며, 분산 시스템에서 데이터를 처리하는 데 사용됩니다.
•
또한, SQL 같은 쿼리 언어를 위해 Apache Hive라는 쿼리 엔진을 사용할 수 있는데, 이들은 입력한 쿼리를 자동으로 MapReduce으로 변환해주기도 하고, 아래 그림에서 확인할 수 있듯이 (위 2개의 애플리케이션 (Hadoop, YARN) 모두 대량의 데이터를 배치 처리에 적합하지만) 애드 훅 쿼리를 여러 번 실행해야하는 복잡한 쿼리를 처리하기에는 부적합하며 데이터 처리의 스테이지가 바꿜 때마다 약간의 대기 시간이 필요하다는 한계가 존재합니다. (이건 Hive on Tez같은 기술적인 해결책이 존재한다고 하기는 합니다..!)
⇒ 하지만 점점 RAM 가격이 저렴해졌고, (Disk IO로 빠질것을) In-Memory에서 돌아가는 SPARK 등장!
2. Spark (How Spark Work & Spark Session)
How Spark Work
•
Spark는 중앙관리자라고 볼 수 있는 (1) Master/Driver Node와 실질적인 연산작업을 하는 (2) Worker/Slaver Node로 이루어져 있으며, Driver와 Executor를 합쳐서 Spark application이라고 합니다.
참고: Hadoop의 HDFS 구조
•
(저도 처음에 이 개념을 파악하는데 굉장히 헷갈렸는데) 우선은 Master/Driver
Worker/Slaver가 있다는 사실만은 인지하고 Master/Driver의 Worker/Slaver가 launch되는 환경에 따라 Spark 동작 환경은 크게 3개로 나눌 수가 있다는 것만 이해하고 아래에서 자세히 설명하도록 하겠습니다.
Application Launch 환경 | Driver 실행 환경 | Worker 실행 환경 | 설명 | |
Submit 하는 머신 | Submit 하는 머신 | Submit 하는 머신 | Driver = Executor 모두 같은 JVM 내에 존재 | |
Submit 하는 머신 | Submit 하는 머신 (Driver 가 Submit 하는 머신의 리소스를 소모) | Cluster | Cluster: Standalone / Yarn / Kubernetes 등 | |
Submit 하는 머신 (컴퓨팅 없이 단순 요청 및 작업 종료까지 대기만 함) | Cluster | Cluster | Cluster: Standalone / Yarn / Kubernetes 등 |
◦
Submit 하는 머신이란, spark-submit 이 실행되는 machine입니다.
▪
Client Mode는 spark-submit 이 실행되는 machine과 Driver 실행 환경이 동일하다는 의미입니다.
▪
반면, Cluster Mode는 Driver가 Cluster내에서 실행된다는 의미이다. (아래에서 좀 더 자세히 설명해 보겠습니다.)
◦
설명에서 ‘Executor’라는 단어가 나오는데, 이는 Worker 노드에서 실행되는 ‘Process’로, application의 작업(Task)을 실제로 수행하는 역할을 합니다.
•
Local Mode, Client Mode, Cluster Mode에 대해서 설명을 하기 전에 위의 그림을 바탕으로 Spark의 동작 원리에 대해서 하고 가볼까 합니다.
Driver/Master
•
스크린샷 2024-10-04 오후 4.46.05.png 에는 보이지 않지만, Drive Node는 내부 프로세스를 통해 Spark Context라는 클래스 객체를 생성하며, 이 객체는 application과 cluster 간의 연결을 담당합니다. 이는 Spark 기능의 가장 Entry Point입니다. (Spark 1.X까지)
•
application의 실행은 제출된 코드를 기반으로 작업의 순서를 DAG(Directed Acyclic Graph) 형태로 구성해, 해당 작업을 Stage 단위로 나눕니다. 최종적으로 각 Stage는 작은 Task 단위로 분할됩니다.
•
그러나 Spark 2.0부터는 다양한 기능(예: SQL 쿼리, DataFrame 작업, 스트리밍 작업)을 하나의 인터페이스에서 사용할 수 있게 해주는 Spark Session이 도입되었습니다.
•
Spark Session은 Spark Context를 포함하고 있으며, SharedState, SessionState, SparkSessionExtensions와 같은 객체들도 함께 포함되어 있습니다. Spark Session은 이 하위 객체들이 생성된 후에 생성됩니다.
•
SparkSession은 SparkContext의 모든 기능을 포함하며, 더 높은 수준의 추상화(예: DataFrames, Datasets)를 제공하는 추가적인 기능을 포함합니다.
Worker/Slaver
•
Worker Node는 Executor 생성을 통해 실제로 데이터에 접근할 수 있고, 할당된 Task를 실질적으로 수행하는 Node입니다.
•
제가 가장 헷갈린 부분은 Worker Node 내 Executor가 1개만 존재해야하는가?라는 질문이었습니다.
◦
정답은 ‘아니다’. 입니다. (아래 Reference를 달겠지만)
◦
Worker Node를 AWS EC2 와 같이 사전 할당된 머신이라고 생각해보면 이해가 용이해집니다. Spark Executor는 JVM Process이기에 하나의 머신(Spark에서는 Worker Node)에는 당연히 여러 프로세스가 실행될 수 있습니다. 다만 여러개의 Process를 실행하면, 하나의 머신 내에 있는 여러 자원을 나눠써야겠죠.
◦
40코어 CPU, 480GB RAM인 머신에서 논리적으로 아래의 JOB이 가능한 것이죠.
spark.executor.instances=10 \
spark.executor.cores=4 \
spark.executor.memory=48g \
Python
복사
Cluster Manager
•
Untitled 에서 Client Mode나 Cluster Mode의 경우 Cluster manager를 통해서 Cluster의 머신에 접근할 수 있게 합니다.
1.
Driver 내의 Spark Context 는 Cluster Manager 와 통신해 (Standalone, Yarn, Kubernetes) Executor 를 할당 받습니다. (비유하자면, 누구에게 일시킬껀지 리스트를 받는 것이죠)
2.
Spark Driver는 Executor 에게 사용자가 작성한 코드 실행을 위해 Spark Context 내의 Jar, Python File 들을 전송합니다.
•
위에서 언급했듯이 Spark에서는 여러 Cluster Manager를 지원하지만 저는 초대형 RAM이 지원되는 환경 아래에서 X00GB 단위 데이터에 대한 처리를 수행했기 때문에 Standalone를 사용했습니다.
•
Standalone
◦
자체에 내장된 간단한 Cluster Manager로, 마스터는 start-master.sh 와 start-worker.sh 스크립트 실행을 통해 Driver와 Worker Node 생성합니다.
◦
jps 명령어를 입력하면 Driver, Worker 각각에 대한 정보가 나오는데, 이는 Drive Node와 Worker Node가 서로 다른 JVM Instance로 생성되어 있기 때문입니다.
◦
위의 그림을 보면, (Worker Node의 예시로 보면 될 듯한데) 다수의 언어가 Interface 형태로 지원되는 Spark는 실제로는 여러 Exector에 의해서 (관련설명: Worker Node를 AWS EC2 와 같이 사전 할당된 머신이라고 생각해보면 이해가 용이해집니다. Spark Executor는 JVM Process이기에 하나의 머신(Spark에서는 Worker Node)에는 당연히 여러 프로세스가 실행될 수 있습니다. 다만 여러개의 Process를 실행하면, 하나의 머신 내에 있는 여러 자원을 나눠써야겠죠. ) 요청한 Task가 처리되는 것을 알 수가 있습니다.
Mode of Spark
Local Mode
•
Driver Node가 Worker Node와 클러스터를 사용하지 않고 단일 JVM상에서 동작하는 형태입니다.
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
Python
복사
•
Local Mode에서는 Driver 1개와 Executor 1개를 생성해 작업을 진행합니다.
◦
--master local[60]
◦
--master local[*]
◦
와 같은 형태를 통해서 논리코어를 []큼 동원해 worker 스레드로 동원해 작업을 처리합니다.
(기존의 Node안에 Executor instance를 생성하는 방식이라는 차이가 있는듯합니다.)
•
위 그림처럼, Local Client JVM에 Driver 1개와 Executor 1개를 생성하는 형태입니다.
◦
Executor 내부에는 여러 개의 Core를 사용하여 태스크를 병렬로 실행할 수 있습니다.
•
저는 전체 JVM(Spark Session은 JVM으로 돌아감)이 하나의 큰 실행자(executor)로 볼 수 있으며, 여러 스레드(논리 코어로 선언한)가 데이터를 처리하는 구조다’라고 이해했습니다. (틀린점 있으면 지적해주세요)
•
당연하게도 단일 머신 환경을 구축하거나 간단한 테스트를 할 때 유용합니다.
Client Mode
•
Client mode는 Driver Node & Spark application와 Worker Node가 분리된 Mode라고 보시면 될 것 같습니다.
•
위의 그림을 보시면 Client mode에서 Driver Node는 Cluster에서 분리되어 있는 것을 볼 수 있습니다.
•
참고로 deploy-mode에 Client와 Cluster mode가 포함됩니다.
•
Driver Program과 Spark application은 모두 Client Process (Cluster와 독립된 공간에 존재)하니, Client 프로세스를 중지시키면, 당연히 Spark Context도 함께 종료되면서 수행 중이던 모든 스파크 job이 중지 됩니다.
#### bash
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master spark://207.184.161.138:7077\--executor-memory 20G\--total-executor-cores 100\
/path/to/examples.jar\
1000
Shell
복사
Cluster Mode
•
Cluster mode에서는 Driver Node와 Worker Node가 Cluster 의 물리적 노드 중 하나에서 실행됩니다.
•
그럼 왜 위와 같은 setting이 도움이 될까요? Cluster와 같은 네트워크 상에 위치하기 때문에, Cluster 리소스(executor 등)와의 통신 지연이 최소화됩니다.
◦
cluster mode는 특히 장기간 실행되는 작업, 대규모 데이터 처리 작업, 또는 클라이언트 머신의 성능이 제한적인 경우에 적합합니다.
◦
아래의 cluster 작업을 다시 상기해보면,
1.
spark-submit은 드라이버 프로그램을 실행하고 사용자가 정의한 main() 메소드를 호출
2.
Driver는 Cluster Manager에게 Executor 실행을 위한 리소스를 요청
3.
Cluster Manager는 Driver Program을 대신해 Executor 들을 실행
4.
Driver가 사용자 application을 통해 실행. 프로그램에 작성된 RDD의 트랜스포메이션과 액션에 기반하여 Driver는 작업 내역을 단위 작업 형태(Task)로 나눠 Executor들에게 보냄.
⇒ 해당 단위 작업의 반복이 일어날때, driver와 worker 사이의 통신이 잦아질 수 밖에 없고, Cluster내에 있는게 여러 측면에서 효율적일 수 있습니다.
#### bash
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
Shell
복사
3. RDDs (Resilient Distributed Datasets)
•
Resilient Distributed Datasets (RDDs)는 Apache Spark의 핵심 데이터 구조로서, 분산 환경에서 대규모 데이터셋의 효율적인 처리를 가능하게합니다.
•
SPARK라는 생태계의 가장 low-level의 데이터 단위라고 저는 이해했습니다.
(하지만 element(요소)단위로 실질적인 transformation이 적용되는거 같음)
About RDDs
•
RDD의 특성에 대해서 간단하게 소개를 하면 다음과 같습니다.
1.
Backbone of data processing in Spark
•
RDD는 Spark에서 데이터 처리 작업의 기본 단위로 사용합니다.
•
데이터는 클러스터 전체의 여러 Worker에 분산 저장되며, 각 노드는 할당된 데이터 부분에 대한 작업을 병렬로 수행할 수 있습니다.
2.
Distributed, fault-tolerant, parallelizable data structure, and in-memory
•
분산(Distributed): 데이터가 네트워크상의 여러 컴퓨터(Node/Machine)에 걸쳐 분산되어 저장됩니다. 해당 구조는 데이터를 여러 Node에 나누어 처리함으로써, 처리 속도를 높이고, 한 Node의 실패가 전체 시스템에 미치는 영향을 최소화합니다.
•
내결함성(Fault-tolerant): RDD는 데이터의 partition을 여러 노드에 복제하거나, 데이터의 메타데이터(예: tranformation 연산 기록)를 사용하여 실패한 노드의 데이터를 다시 계산할 수 있게 만들어 줍니다. 이를 통해 데이터 손실 없이 시스템의 장애를 극복할 수 있습니다.
•
병렬 처리 가능(Parallelizable): 데이터의 각 partition은 ‘독립적’으로 처리될 수 있어, 여러 처리 작업을 동시에 수행할 수 있습니다.
•
메모리 저장(In-Memory): RDD는 데이터를 메모리에 저장하고, 여러 연산을 메모리 상에서 직접 수행함으로써 데이터 접근 시간을 단축합니다.
3.
Efficiently processes large datasets across a cluster
•
RDD를 사용하면 매우 큰 데이터셋을 효율적으로 처리할 수 있습니다.. 데이터는 ‘partition’ 단위로 분할되고, 각 파티션은 클러스터의 다양한 노드에서 동시에 처리합니다.
4.
Key characteristics: immutable, distributed, resilient, lazily evaluated, fault-tolerant:
•
불변성(Immutable): 한 번 생성된 RDD는 변경할 수 없습니다. 데이터에 transformation을 가하려면, 새로운 RDD를 생성하는 transformation 연산을 적용해야 합니다. 이는 데이터의 일관성을 보장하고, 복잡한 데이터 파이프라인을 안정적으로 관리할 수 있게 합니다.
•
Distributed: 데이터를 여러 노드에 나누어 (partitioned) 분산처리되도록 합니다.
•
Lazy evaluated: RDD 연산은 실제로 액션(action)이 호출될 때까지 실행되지 않음. 불필요한 계산을 피하고 최적화된 방식으로 데이터를 처리합니다.
•
Fault-tolerant: map, filter, reduce, collect, count, save 등의 연산을 지원하며, 해당 연산들은 모두 내결함성을 갖습니다.
◦
예를 들어, 하나의 worker 노드가 위의 연산을 처리하는 도중에 문제가 발생해도 해당 노드의 처리가 필요한 RDD 파티션은 다른 노드에서 재처리가 가능합니다.
How to Make RDDs?
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = '/Users/name/App/Spark'
# SPARK_HOME 환경 변수를 설정하여, PySpark가 Spark이 설치된 위치를 알 수 있도록 설정
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
# PYSPARK_DRIVER_PYTHON 환경 변수는 PySpark 세션을 시작할 때 어떤 Python 인터페이스를 사용할지 지정
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
# PySpark가 Jupyter Lab을 실행할 때 필요한 추가 옵션을 제공
os.environ['PYSPARK_PYTHON'] = 'python'
# PYSPARK_PYTHON 환경 변수는 클러스터의 모든 노드에서 PySpark 작업을 처리할 때 사용할 Python 인터프리터를
# 이 설정은 PySpark가 작업을 분산 처리할 때 일관된 Python 환경을 유지하도록 돕습니다.
Python
복사
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()
Python
복사
'''
현재 Spark 세션이 어떤 마스터 설정을 사용하고 있는지를 출력
### LOCAL
- local[*]이면 로컬 모드에서 모든 사용 가능한 코어를 사용하고 있다는 것을 의미
### CLUSTER
spark://host:port 형식이면 특정 클러스터 매니저(Standalone, Mesos, YARN 등)에 연결되어 있다는 것을 의미
'''
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()
print(spark.sparkContext.master)
# local[*]
'''
로컬 환경에서 실행 중인 경우, 스레드의 수(즉, 병렬 처리 가능한 작업의 수)는 다음과 같이 확인
'''
import multiprocessing
num_cores = multiprocessing.cpu_count()
print("Number of available cores:", num_cores)
# Number of available cores: 10
Python
복사
# Data를 가용할 수 있는 core(CPU 코어들을 Worker 노드)중 5를 활용해 partitioning
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers,5)
num_partitions = rdd.getNumPartitions()
print("Number of partitions:", num_partitions)
# glom() 함수를 사용하면 각 파티션의 데이터를 배열로 변환하여 파티션별로 그룹화
partitioned_data = rdd.glom().collect()
# print
print("Data in each partition:")
for i, data in enumerate(partitioned_data):
print(f"Partition {i}: {data}")
# Data in each partition:
# Partition 0: [1]
# Partition 1: [2]
# Partition 2: [3]
# Partition 3: [4]
# Partition 4: [5]
Python
복사
rdd.collect()
# [1, 2, 3, 4, 5]
Python
복사
from pyspark.sql import SparkSession
# 클러스터 모드에서 Spark 세션 설정
spark = SparkSession.builder \
.master("spark://master_url:7077") \
.appName("RDD-Demo") \
.config("spark.executor.instances", "6") \ # 클러스터 전체에서 Worker 노드들에 걸쳐 분배
.config("spark.executor.memory", "2g") \
.config("spark.executor.cores", "2") \
.getOrCreate()
# RDD 생성 및 파티션 설정
numbers = list(range(1, 21))
rdd = spark.sparkContext.parallelize(numbers, 6) # 명시적으로 파티션 수를 6으로 설정
# 각 파티션의 데이터를 리스트로 변환
partitioned_data = rdd.glom().collect()
# 출력
print("Data in each partition:")
for i, data in enumerate(partitioned_data):
print(f"Partition {i}: {data}")
'''
(예상 출력 결과)
Data in each partition:
Partition 0: [1, 2, 3, 4]
Partition 1: [5, 6, 7, 8]
Partition 2: [9, 10, 11, 12]
Partition 3: [13, 14, 15, 16]
Partition 4: [17, 18]
Partition 5: [19, 20]
'''
Python
복사
Transformations vs. Actions
•
Transformation과 Action은 RDDs를 조작하고 결과를 도출하는 방법입니다.
•
RDD operation은 Spark의 low-level API 작업이기에 SparkSession을 통해 접근 가능한 SparkContext의 인스턴스(spark.sparkContext)를 사용합니다.
#### Transformations
•
개념: Transformations은 RDD에 적용되는 연산으로, 새로운 RDD를 생성. Transformations은 '지연 평가(lazy evaluation)' 모델을 따르기 때문에, 실제 연산은 관련된 동작(Action)이 호출될 때까지 실행X
•
특성: Transformations을 통해 생성된 새 RDD는 원본 RDD의 변경 불가능한(immutable) 특성을 유지하며, 원본 데이터를 수정하지 않고 새로운 데이터셋을 생성합니다.
•
예시: map, filter, flatMap, reduceByKey, sortBy, join → filter 변환은 조건에 맞는 데이터만을 포함하는 새 RDD를 생성합니다.
#### Actions
•
개념: Actions은 RDD에 적용되며, 변환된 데이터에 대해 계산을 실행하고 결과를 반환. Actions은 '적극적 평가(eager evaluation)'을 통해 즉시 결과를 도출합니다.
•
특성: Actions은 Spark의 연산 과정에서 lazy evaluation된 transformations들을 trigger하고, 최종 결과를 계산하기 위해 데이터를 Driver 프로그램으로 가져오거나 외부 시스템에 저장합니다.
•
예시: collect, count, first, take, save, foreach → collect 동작은 RDD의 모든 요소를 driver로 반환하여 사용하도록 합니다.
#### Examples
1.
Transformation:
#### Big Picture
filtered_customers = customers_rdd.filter(lambda x: x['age'] > 30)
Python
복사
•
즉시 실행 X (i.e., lazy evaluation). 대신, 필요한 계산을 정의하고, 실행 계획을 준비합니다.
•
action을 통해서 실행시켜줘야 결과물 확인 가능합니다.
### LOCAL 노드 환경 가정
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)
# a. Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
result = mapped_rdd.collect()
print("rdd with uppercease name: ", result)
#rdd with uppercease name: [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]
# b. Filter transformation: Filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()
#[('Charlie', 35), ('Alice', 40)]
# c. ReduceByKey transformation: Calculate the total age for each name
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()
#[('Alice', 65), ('Bob', 30), ('Charlie', 35)]
# d. SortBy transformation: Sort the RDD by age in descending order
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()
#[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]
Python
복사
a.
map()
•
map() RDD의 각 요소에 주어진 함수를 적용, 그 결과로 새로운 RDD를 생성합니다.
b.
filter()
•
filter() 주어진 조건 함수를 만족하는 요소들만을 포함하는 새로운 RDD를 생성합니다.
•
예시 코드에서는 나이가 30 초과인 사람들만 필터링합니다.
c.
reduceByKey()
•
reduceByKey()키-값 쌍(pair)을 가진 RDD에 사용되며, 같은 키를 가진 값들을 주어진 reduce()함수로 합쳐서 새로운 RDD를 생성합니다.
•
예시 코드에서는 이름을 키로 하여 나이를 합산합니다.
d.
sortBy()
•
sortBy() 주어진 키 함수의 결과에 따라 요소를 정렬한 새로운 RDD를 생성핮니다.
•
예시 코드에서는 나이를 기준으로 내림차순 정렬합니다.
2.
Action:
#### Big Picture
result = filtered_customers.collect()
Python
복사
이제 collect() 동작이 호출되면서, filter 변환에 의해 정의된 모든 ‘tranformation’ 연산이 실행, 이후 그 결과가 반환합니다.
### LOCAL 노드 환경 가정
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)
# a. Collect action: Retrieve all elements of the RDD
print("All elements of the rdd: ", rdd.collect())
# All elements of the rdd: [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]
# b. Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)
# The total number of elements in rdd: 4
# c. First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)
# The first element of the rdd: ('Alice', 25)
# d. Take action: Retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)
#The first two elements of the rdd: [('Alice', 25), ('Bob', 30)]
# e. Foreach action: Print each element of the RDD
rdd.foreach(lambda x: print(x))
# ('Charlie', 35)
# ('Alice', 25)
# ('Bob', 30)
# ('Alice', 40)
Python
복사
a.
collect()
•
collect() RDD에 포함된 모든 요소를 Driver 프로그램(여기서는 사용자의 Local 머신)으로 반환합니다.
•
collect()는 RDD 전체를 메모리에 로드하므로, 큰 데이터셋에서는 메모리 오버플로를 발생 가능성을 염두해 두어야 합니다.
b.
count()
•
count() 함수는 RDD에 포함된 요소의 총 수를 count해서 반환합니다.
•
예시 코드에서는 rdd.count()를 호출하여 데이터의 총 수를 계산하고 그 결과를 출력합니다.
c.
first()
•
first() 함수는 RDD의 첫 번째 요소를 반환합니다.
•
예시 코드에서는 rdd.first()를 사용하여 첫 번째 데이터 요소를 검색하고 그 결과를 출력합니다.
d.
take(n)
•
take(n) 함수는 RDD에서 처음 n개의 요소를 반환 (상위 n개 데이터 샘플을 보고자 할 때 사용합니다.
•
예시 코드에서는 rdd.take(2)를 호출하여 상위 2개 데이터 요소를 반환합니다.
e.
foreach()
•
foreach() 함수는 RDD의 각 요소에 대해 지정된 함수를 실행 (RDD의 각 요소에 대해 부작용(side effect)를 가진 작업을 수행할 때 사용 (예: 데이터베이스에 저장, 출력 등))합니다.
•
예시 코드에서는 rdd.foreach(lambda x: print(x))를 사용하여 RDD의 모든 요소를 출력합니다.
Additional Operations
•
saveAsTextFile()
◦
saveAsTextFile("output.txt") RDD의 내용을 외부 파일 시스템에 텍스트 파일 형식으로 저장합니다.
◦
로컬 파일 시스템, HDFS(Hadoop Distributed File System), S3 같은 클라우드 스토리지 등 다양한 파일 시스템을 지원합니다.
◦
output.txt라는 이름으로 저장하면, 실제로는 output.txt/part-00000, output.txt/part-00001 등의 형태로 여러 파일에 걸쳐 저장됩니다. (RDD가 분산되어 처리되기 때문이라고 합니다.)
•
textFile()
◦
spark.sparkContext.textFile("output.txt") 텍스트 파일로부터 새로운 RDD를 생성합니다. 파일의 각 라인을 RDD의 한 요소로 로드합니다.
# create rdd from text file
rdd_text = spark.sparkContext.textFile("output.txt")
rdd_text.collect()
# ["('Alice', 25)", "('Bob', 30)", "('Charlie', 35)", "('Alice', 40)"]
Python
복사
4. DataFrame in Spark
DataFrames in Apache Spark
•
Spark DataFrame은 분산 데이터 컬렉션으로, 구조화된 데이터를 테이블 형태로 저장합니다.
•
각 컬럼에는 이름과 데이터 타입이 정의되어 있어 SQL 데이터베이스의 테이블과 유사합니다.
•
RDD보다 high-level operation 수행가능, 직관적이어서 사용하기 편합니다.
#### DataFrame Structure
•
각 행은 데이터 레코드(데이터 인스턴스 1개)를 나타내며, 열은 해당 레코드의 특정 필드(속성)를 나타냅니다. (아시는 데이터프래임 그대로입니다.)
•
사실상 우리가 평소에 접하는 pandas dataframe가 유사, spark에서도 SQL 쿼리를 실행하는 것처럼, DataFrame을 사용하여 데이터에 대한 쿼리, 필터링, 집계 등을 수행 가능합니다.
#### Schema Information
•
스키마의 역할: DataFrame의 스키마는 각 컬럼의 이름과 데이터 타입 정보를 포함 (SQL배울때의 그 스키마와 사실상 동일합니다.)
•
스키마는 데이터를 읽고 쓰는 동안 타입 안정성을 보장하고, SQL 쿼리 및 데이터 처리 작업의 최적화된 데이터 프레임입니다.
#### Advantages of DataFrames
•
Optimized Execution
◦
Schema Information
▪
DataFrames의 형태이기에 각 컬럼의 데이터 유형을 미리 알고 있음. 따라서 더 효율적인 데이터 처리와 쿼리 최적화가 가능. 스키마 정보는 query planner에 의해 사용되어 더 빠르고 효율적인 실행 계획을 생성할 수 있습니다..
◦
Predicate Pushdown
▪
쿼리의 필터링 조건을 가능한 한 데이터 소스에 가깝게 적용하여 불필요한 데이터의 처리와 이동을 줄이는 최적화도 가능하게 합니다. 예를 들어, 데이터베이스에서 데이터를 가져올 때 필요한 데이터만 추출하여 전송량을 줄이고 처리 속도를 높인다고 합니다.
•
Ease of Use
◦
SQL-like Interface (사용편의성)
◦
Simplified API
▪
API 단순화: 복잡한 RDD 변환과 액션 대신, DataFrames API는 직관적이고 선언적인 데이터 조작을 지원 → 사실상 사용자 입장에서 가장 큰 장점 중 하나라고 볼 수 있습니다.
•
Integration with Ecosystem
◦
Seamless Integration
▪
Spark SQL, MLLib, GraphX 등 Spark의 다른 라이브러리들과의 원활한 통합을 통해, 복잡한 데이터 파이프라인을 구축할 때 일관된 API를 사용 가능 → 마찬가지로 사용자 입장에서 가장 큰 장점 중 하나라고 볼 수 있습니다.
•
Built-in Optimization
◦
Catalyst Optimizer
▪
Spark의 고급 최적화 엔진인 Catalyst는 실행 계획을 동적으로 컴파일하고 최적화하여 실행 성능을 크게 향상시킨다고 합니다.
•
Interoperability
◦
Data Format Conversion
▪
DataFrames는 다양한 데이터 소스와 포맷(Pandas DataFrames, Parquet, JSON 등)으로부터 쉽게 데이터를 읽고 쓸 수 있으며, 다른 데이터 처리 도구와의 연동이 용이합니다 → 전처리 하는 입장에서 가장 큰 장점 중 하나라고 볼 수 있죠!
Difference from Pandas Dataframes (Mutable vs. Immutable DataFrames)
•
많은 사람들이 처음 데이터를 접할때 배우는 Pandas의 Dataframe과 Spark의 DataFrame은 어떻게 다를까요?
•
pandas DataFrame
◦
pandas에서의 DataFrame은 mutable(가변)
◦
즉, 데이터 프레임 내의 데이터를 직접 변경할 수 있습니다. 대부분의 사용자가 경험해보았듯이 데이터를 조작하고 업데이트할 때 매우 유연한 구조를 가지고 있습니다.
◦
method에 inplace 매개변수를 제공하여, 원본 데이터 프레임을 직접 수정할지 여부를 사용자가 선택할 수 있습니다.
import pandas as pd
# pandas DataFrame 생성
df_pandas = pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6]
})
# 새 컬럼 추가 (inplace가 필요 없는 연산)
df_pandas['C'] = df_pandas['A'] + df_pandas['B']
# 컬럼 이름 변경 (inplace 옵션 사용)
df_pandas.rename(columns={'A': 'Alpha'}, inplace=True)
# 결과 출력
print(df_pandas)
'''
Alpha B C
0 1 4 5
1 2 5 7
2 3 6 9
'''
Python
복사
•
Spark DataFrame
◦
Spark에서의 DataFrame은 immutable(불변)
◦
즉, 한 번 생성되면 그 내용을 변경할 수 없으며, 데이터에 변형을 가하고자 할 때는 새로운 DataFrame이 생성됩니다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# SparkSession 시작
spark = SparkSession.builder.appName("example").getOrCreate()
# Spark DataFrame 생성
df_spark = spark.createDataFrame([
(1, 4),
(2, 5),
(3, 6)
], ["A", "B"])
# 새 컬럼 추가
new_df_spark = df_spark.withColumn('C', col('A') + col('B'))
# 컬럼 이름 변경
final_df_spark = new_df_spark.withColumnRenamed('A', 'Alpha')
# 결과 출력
final_df_spark.show()
'''
+-----+---+---+
|Alpha| B| C|
+-----+---+---+
| 1| 4| 5|
| 2| 5| 7|
| 3| 6| 9|
+-----+---+---+
'''
Python
복사
Example (RDD vs DataFrame)
•
txt file에서 word count를 하고 most frequent word를 찾는 example
# Setting Environment ...
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-Demo").getOrCreate()
Python
복사
#### RDD
rdd = spark.sparkContext.textFile("./data/data.txt")
result_rdd = rdd.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], ascending=False)
Python
복사
•
spark.sparkContext.textFile: 파일 시스템에서 텍스트 파일을 읽어서 RDD를 생성. 각 줄은 RDD의 요소로 변환 (각 line을 index하나로 간주한다고 생각하면 좋을것 같습니다.)
•
flatMap(lambda line: line.split(" ")): 각 줄을 공백으로 분리하여 단어를 추출
•
flatMap은 각 입력 요소에 대해 여러 개의 출력을 생성할 수 있으며, 이 모든 출력을 단일 RDD로 평탄화합니다.
•
map(lambda word: (word, 1)): 각 단어를 (단어, 1)의 tuple 형식으로 매핑
•
reduceByKey(lambda a, b: a + b): 같은 키(단어)를 가진 값들을 합산 → 각 단어의 출현 횟수를 계산
•
sortBy(lambda x: x[1], ascending=False): 계산된 단어의 빈도 수에 따라 내림차순으로 정렬
#### DataFrame
df = spark.read.text("./data/data.txt")
result_df = df.selectExpr("explode(split(value, ' ')) as word") \
.groupBy("word") \
.count() \
.orderBy(desc("count"))
Python
복사
•
spark.read.text: 파일 시스템에서 텍스트 파일을 읽어 DataFrame을 생성. 각 line은 DataFrame의 'value'라는 이름의 컬럼에 저장
•
selectExpr("explode(split(value, ' ')) as word"): selectExpr은 SQL 표현식을 사용해 데이터를 변환 → 'value'를 공백으로 분리하고, explode 함수를 사용해 각 단어를 별도의 row로 확장
•
groupBy("word"): 'word' 컬럼의 값에 따라 데이터를 그룹화
•
count(): 각 그룹의 데이터 수를 세어 'count' 컬럼에 저장 → 각 단어의 출현 횟수를 계산
•
orderBy(desc("count")): 'count' 컬럼을 기준으로 내림차순으로 정렬
→ RDD와 마찬가지로 당연히 DataFrame도 Lazy Evaluation의 작동 원리에 따라 Action( count, collect, take ) 호출하는 순간, Spark는 필요한 데이터만을 계산하여 자원 사용과 처리 시간을 최적화됩니다.
Example (Create Spark DataFrame from CSV)
### Read CSV with header
# Read CSV file into DataFrame
csv_file_path = "./data/products.csv"
df = spark.read.csv(csv_file_path, header=True)
# Display schema of DataFrame
df.printSchema()
# Display content of DataFrame
df.show(5)
'''
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- category: string (nullable = true)
|-- quantity: string (nullable = true)
|-- price: string (nullable = true)
+---+--------------------+---------------+--------+------+
| id| name| category|quantity| price|
+---+--------------------+---------------+--------+------+
| 1| iPhone 12| Electronics| 10|899.99|
| 2| Nike Air Max 90| Clothing| 25|119.99|
| 3|KitchenAid Stand ...|Home Appliances| 5|299.99|
| 4| The Great Gatsby| Books| 50| 12.99|
| 5|L'Oreal Paris Mas...| Beauty| 100| 9.99|
+---+--------------------+---------------+--------+------+
'''
Python
복사
•
Spark로 csv file을 불러올 수 있으나 schema를 제대로 읽지 못하는 것을 확인할 수 있습니다.
### Read CSV with an explicit schema definition
# import necessary types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# Define the schema
schema = StructType([
StructField(name="id", dataType=IntegerType(), nullable=True),
StructField(name="name", dataType=StringType(), nullable=True),
StructField(name="category", dataType=StringType(), nullable=True),
StructField(name="quantity", dataType=IntegerType(), nullable=True),
StructField(name="price", dataType=DoubleType(), nullable=True)
])
# Read CSV file into DataFrame with schema definition
csv_file_path = "./data/products.csv"
df = spark.read.csv(csv_file_path, header=True, schema=schema)
# Display schema of DataFrame
df.printSchema()
# Display content of DataFrame
df.show(5)
'''
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- category: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- price: double (nullable = true)
+---+--------------------+---------------+--------+------+
| id| name| category|quantity| price|
+---+--------------------+---------------+--------+------+
| 1| iPhone 12| Electronics| 10|899.99|
| 2| Nike Air Max 90| Clothing| 25|119.99|
| 3|KitchenAid Stand ...|Home Appliances| 5|299.99|
| 4| The Great Gatsby| Books| 50| 12.99|
| 5|L'Oreal Paris Mas...| Beauty| 100| 9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows
'''
Python
복사
•
명시적 schema를 제공함으로써 제대로된 데이터타입을 가질 수 있도록 할 수 있습니다.
### Read CSV file into DataFrame with inferSchema
csv_file_path = "./data/products.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
# Display schema of DataFrame
df.printSchema()
# Display content of DataFrame
df.show(5)
'''
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- category: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- price: double (nullable = true)
+---+--------------------+---------------+--------+------+
| id| name| category|quantity| price|
+---+--------------------+---------------+--------+------+
| 1| iPhone 12| Electronics| 10|899.99|
| 2| Nike Air Max 90| Clothing| 25|119.99|
| 3|KitchenAid Stand ...|Home Appliances| 5|299.99|
| 4| The Great Gatsby| Books| 50| 12.99|
| 5|L'Oreal Paris Mas...| Beauty| 100| 9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows
'''
Python
복사
•
inferSchema=True를 추가해서 spark가 자동으로 datatype을 추적하도록 할 수 있습니다.
Transformations & Actions
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-Operations").getOrCreate()
# Load the synthetic data into a DataFrame
data_file_path = "./data/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)
# Select specific columns
selected_columns = df.select("id", "name", "price")
print("Selected Columns:")
selected_columns.show(10)
'''
Selected Columns:
+---+----------------+-------+
| id| name| price|
+---+----------------+-------+
| 1| iPhone| 899.99|
| 2| Macbook|1299.99|
| 3| iPad| 499.99|
| 4| Samsung TV| 799.99|
| 5| LG TV| 699.99|
| 6| Nike Shoes| 99.99|
| 7| Adidas Shoes| 89.99|
| 8| Sony Headphones| 149.99|
| 9|Beats Headphones| 199.99|
| 10| Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows
'''
Python
복사
•
select: 표현식 집합을 선택하고 새로운 DataFrame을 반환합니다.
# Filter rows based on a condition
filtered_data = df.filter(df.quantity > 20)
print("Filtered Data:", filtered_data.count())
filtered_data.show()
'''
Filtered Data: 12
+---+--------------+-----------+--------+-----+
| id| name| category|quantity|price|
+---+--------------+-----------+--------+-----+
| 6| Nike Shoes| Clothing| 30|99.99|
| 7| Adidas Shoes| Clothing| 25|89.99|
| 12| Apples| Food| 100| 0.5|
| 13| Bananas| Food| 150| 0.25|
| 14| Oranges| Food| 120| 0.75|
| 15|Chicken Breast| Food| 50| 3.99|
| 16| Salmon Fillet| Food| 30| 5.99|
| 24| Laptop Bag|Accessories| 25|29.99|
| 25| Backpack|Accessories| 30|24.99|
| 28| Jeans| Clothing| 30|59.99|
| 29| T-shirt| Clothing| 50|14.99|
| 30| Sneakers| Clothing| 40|79.99|
+---+--------------+-----------+--------+-----+
'''
Python
복사
•
filter: 주어진 조건을 만족하는 행만 필터링합니다.
# GroupBy and Aggregations
grouped_data = df.groupBy("category").agg({"quantity": "sum", "price": "avg"})
print("Grouped and Aggregated Data:")
grouped_data.show()
'''
Grouped and Aggregated Data:
+-----------+-------------+------------------+
| category|sum(quantity)| avg(price)|
+-----------+-------------+------------------+
| Food| 450|2.2960000000000003|
| Sports| 35| 34.99|
|Electronics| 98| 586.6566666666665|
| Clothing| 200| 99.2757142857143|
| Furniture| 41| 141.99|
|Accessories| 55| 27.49|
+-----------+-------------+------------------+
'''
Python
복사
•
groupBy: 지정된 열을 기준으로 DataFrame을 그룹화합니다.
•
agg: 그룹화된 데이터에 대해 집계 함수(sum, avg)를 수행합니다.
… 이외에도 join , orderBy 등의 transformation 적용 가능합니다.
5. Code Review - Deduplication Data with PySpark (NLP)
•
LLM Training을 위해서는 대용량 Corpus 수집 후 전처리가 필수적입니다.
(실제로 NLP Researcher들이 Spark를 가장 많이 활용할 부분이라고 사료됩니다.)
•
polyglot 모델 학습시에 Deduplication, 전처리코드 활용되었던
dps 레포지토리내의 Deduplication 코드를 상세하게 리뷰해보면서 PySpark가 어떻게 사용되는지 이해를 높혀보고자 합니다.

•
아래는 dps/spark/jobs/dedup_job.py 전문
"""
Run this from project root path
python bin/sparkapp.py dedup_job --config_path=./configs/dedup_job.yaml
"""
import random
from itertools import combinations
import yaml
from pyspark import SparkContext
from pyspark.rdd import RDD
from dps.spark.spark_session import spark_session
from dps.spark.utils.io_utils import read_line, to_json
from dps.spark.prep.dedup_prep import (
shingle_word,
generate_minhash,
jaccard_by_hashvalues,
)
def expand_instances_by_minhash(
data, expand_size: int, n_gram: int, seed: int = 1, char_level: bool = False
):
shingles = shingle_word(data["text"], n_gram=n_gram, char_level=char_level)
minhashes = generate_minhash(shingles, num_perm=expand_size, seed=seed)
for mh in minhashes.tolist():
yield (str(mh), [dict(**data, shingles=shingles, hashvalues=minhashes)])
def explore_dedup_instance(hash_groups, threshold: float = 0.8):
if len(hash_groups) <= 1:
return
group_represent_text = hash_groups[0][
"text"
] # not to remove all text instances in group.
pairs = combinations(hash_groups, 2)
for d_1, d_2 in pairs:
sim_score = jaccard_by_hashvalues(d_1["hashvalues"], d_2["hashvalues"])
if sim_score >= threshold:
dedup_text = [d_1["text"], d_2["text"]]
if group_represent_text in dedup_text:
yield dedup_text[0] if dedup_text[
0
] != group_represent_text else dedup_text[1]
else:
yield random.choice(dedup_text)
def dedup_job(config_path):
with open(config_path) as f:
conf = yaml.load(f, Loader=yaml.FullLoader)
input_paths = ",".join([f'{conf["base_dir"]}/{t}' for t in conf["targets"]])
with spark_session(f"") as spark:
sc: SparkContext = spark.sparkContext
proc_rdd: RDD = (
sc.textFile(input_paths)
.repartition(conf["n_dist"])
.flatMap(read_line)
.cache()
)
overlap_kv_rdd: RDD = (
proc_rdd.flatMap(
lambda x: expand_instances_by_minhash(
x,
expand_size=conf["num_expand"],
n_gram=conf["n_gram"],
seed=conf["seed"],
char_level=conf["char_level"],
)
)
.reduceByKey(lambda x, y: x + y)
.flatMap(
lambda x: explore_dedup_instance(x[1], threshold=conf["sim_threshold"])
)
.distinct()
.map(lambda x: (x, dict(text=x)))
.cache()
)
proc_rdd.map(lambda x: (x["text"], x)).subtractByKey(overlap_kv_rdd).map(
lambda x: x[1]
).repartition(conf["n_output"]).flatMap(to_json).saveAsTextFile(
conf["output_dir"]
)
Python
복사
#### Function (expand_instances_by_minhash()와 explore_dedup_instance()의 함수에 대한 많은 디테일은 생략되어 있습니다.)
def expand_instances_by_minhash(
data, expand_size: int, n_gram: int, seed: int = 1, char_level: bool = False
):
shingles = shingle_word(data["text"], n_gram=n_gram, char_level=char_level)
minhashes = generate_minhash(shingles, num_perm=expand_size, seed=seed)
for mh in minhashes.tolist():
yield (str(mh), [dict(**data, shingles=shingles, hashvalues=minhashes)])
Python
복사
•
shingle_word: data를 n-gram 또는 문자 단위로 분할합니다.
•
generate_minhash: 분할된 data로부터 MinHash 값을 생성합니다.
◦
실제로 저 함수는 hash_values = np.array([])라는 것을 반환합니다.
•
expand_instances_by_minhash: 를 거치면 각 데이터는 하나의 min_hash value로 mapping됨 (yield (str(mh), [dict(**data, shingles=shingles, hashvalues=minhashes)]))
⇒ Example
•
text: "안녕하세요 여러분 안녕하세요"
•
n_gram: 2,
•
shingle_word: ["안녕하세요_여러분", "여러분_안녕하세요"]
•
minhases: [11,23, … ]
def explore_dedup_instance(hash_groups, threshold: float = 0.8):
if len(hash_groups) <= 1:
return
group_represent_text = hash_groups[0]["text"]
pairs = combinations(hash_groups, 2)
for d_1, d_2 in pairs:
sim_score = jaccard_by_hashvalues(d_1["hashvalues"], d_2["hashvalues"])
if sim_score >= threshold:
dedup_text = [d_1["text"], d_2["text"]]
if group_represent_text in dedup_text:
yield dedup_text[0] if dedup_text[0] != group_represent_text else dedup_text[1]
else:
yield random.choice(dedup_text)
Python
복사
•
combinations: 가능한 모든 텍스트 쌍을 생성합니다.
◦
이때 expand_instances_by_minhash: 를 거치면 각 데이터는 하나의 min_hash value로 mapping됨 (yield (str(mh), [dict(**data, shingles=shingles, hashvalues=minhashes)])) 에서 생성한 str(mh)끼리만 combinations를 생성 (그룹 내의 모든 데이터 쌍의 조합을 생성)
•
jaccard_by_hashvalues: 위에서 생성한 combinations 텍스트 사이의 유사도를 계산합니다.
•
theshold가 높을 경우 두 instance 중 하나만 반환합니다.
#### Spark
proc_rdd: RDD = (
sc.textFile(input_paths)
.repartition(conf["n_dist"])
.flatMap(read_line)
.cache()
)
Python
복사
•
sc.textFile: 지정된 경로에서 텍스트 파일을 읽어 RDD를 생성합니다.
•
repartition: n_dist 설정에 따라 RDD의 파티션 수를 조정하여 데이터 분포를 최적화합니다
•
flatMap: read_line 함수를 각 입력 라인에 적용하여 각 줄의 데이터를 단일 요소로 읽습니다.
•
cache: RDD를 메모리에 캐시하여 다중 작업에서 성능을 향상합니다.
overlap_kv_rdd: RDD = (
proc_rdd.flatMap(
lambda x: expand_instances_by_minhash(
x,
expand_size=conf["num_expand"],
n_gram=conf["n_gram"],
seed=conf["seed"],
char_level=conf["char_level"],
)
)
.reduceByKey(lambda x, y: x + y)
.flatMap(
lambda x: explore_dedup_instance(x[1], threshold=conf["sim_threshold"])
)
.distinct()
.map(lambda x: (x, dict(text=x)))
.cache()
)
Python
복사
•
expand_instances_by_minhash: 각 데이터 인스턴스에 대해 def expand_instances_by_minhash(
data, expand_size: int, n_gram: int, seed: int = 1, char_level: bool = False
):
shingles = shingle_word(data["text"], n_gram=n_gram, char_level=char_level)
minhashes = generate_minhash(shingles, num_perm=expand_size, seed=seed)
for mh in minhashes.tolist():
yield (str(mh), [dict(**data, shingles=shingles, hashvalues=minhashes)])
적용합니다.
•
reduceByKey: 동일한 키를 가진 데이터를 합침 → 데이터를 그룹화가 일어나기 때문에 str(mh)별로 reduce됩니다.
•
explore_dedup_instance: 설정된 유사도 임계값을 기반으로 중복을 식별하고 제거합니다.
•
distinct: 중복된 결과를 제거합니다.
•
map: 각 결과에 대한 추가적인 데이터 구조를 생성합니다.
(예시: [("apple", {'text': 'apple'}), ("banana", {'text': 'banana'}), ("cherry", {'text': 'cherry'})])
⇒ 위 코드는 중복된 instance/RDD를 찾아내는 함수입니다.
proc_rdd.map(lambda x: (x["text"], x)).subtractByKey(overlap_kv_rdd).map(
lambda x: x[1]
).repartition(conf["n_output"]).flatMap(to_json).saveAsTextFile(
conf["output_dir"]
)
Python
복사
•
subtractByKey: 중복된 데이터를 제거, 즉 overlap_kv_rdd에 있는 키를 가진 데이터는 제거합니다.
•
repartition: 결과 데이터의 파티션 수를 조절합니다.
•
flatMap(to_json): 결과 데이터를 JSON 형식으로 변환하기 위해 넣은 함수입니다.
•
saveAsTextFile: 최종 결과를 텍스트 파일로 저장합니다.