PySpark 간단 소개
빅데이터와 머신러닝 환경에서는 방대한 데이터를 빠르게 처리하고, 효율적으로 분산 연산을 수행하는 것이 핵심이다.
Spark는 In-Memory 분산 처리 기능을 제공하여, 기존 Hadoop의 MapReduce보다 최대 100배 빠른 속도로 데이터를 처리할 수 있다.
또한, 병렬 처리 기반의 데이터 분산 저장 및 계산 기능을 제공하여 대규모 데이터셋을 활용한 머신러닝 모델 학습을 더욱 효율적으로 수행할 수 있다.
PySpark는 Spark의 Python API로, Pandas, NumPy, Scikit-learn 등 Python의 데이터 분석 라이브러리와의 연동이 가능하다.
머신러닝 프로젝트에서 PySpark를 사용하면 분산 컴퓨팅을 활용해 모델 학습 속도를 크게 향상시킬 수 있으며, MLlib 같은 Spark의 내장 라이브러리를 통해 대규모 데이터에 최적화된 머신러닝 모델을 쉽게 구현할 수 있다.
PySpark 설치
Hadop binary 설치
Spark가 Windows에서 작동하려면 winutils.exe라는 도구가 필요하다.
https://github.com/steveloughran/winutils?tab=readme-ov-file 해당 링크에서 hadoop-3.0.0/bin을 다운받고 C:\hadoop에 위치해주자
이후 [내 컴퓨터 → 속성 → 고급 시스템 설정 → 환경 변수]에 들어가 시스템 변수와 PATH 를 추가해준다.
- 시스템 변수 이름: HADOOP_HOME
- 변수 값: C:\hadoop (winutils.exe가 있는 디렉터리)
- PATH 변수에 추가: C:\hadoop\bin
PySpark와 PostgreSQL 드라이버 설정
PySpark는 pip로 설치해주면 된다.
pip install pyspark
이후 spark에서 PostgreSQL DB에 접근하여 데이터를 가져올 수 있도록 jdbc driver를 다운받는다. https://jdbc.postgresql.org/
본인은 postgresql-42.7.5.jar를 다운받았다.
PySpark 세션 생성
from pyspark.sql import SparkSession
import os
# 환경변수 에러가 발생하면 아래 두 줄 명시해주기
os.environ['HADOOP_HOME'] = r"C:\hadoop"
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['HADOOP_HOME'], 'bin')
# PostgreSQL JDBC 드라이버 경로 설정
jdbc_driver_path = r"B:\workspace\TradingAI\postgresql-42.7.5.jar"
# Spark 세션 생성
spark = SparkSession.builder \
.appName("PostgreSQL-Spark") \
.master("local[12]") \
.config("spark.jars", jdbc_driver_path) \
.config("spark.driver.memory", "24g") \
.config("spark.executor.memory", "24g") \
.config("spark.sql.shuffle.partitions", "12") \
.config("spark.default.parallelism", "24") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
pyspark 세션은 SparkSession.builder를 통해 생성한다.
local 모드 설정: master("local[12]")
단일 머신에서 작업할 경우 master 옵션에 "local[12]"를 지정하여 12코어를 모두 사용하도록 한다.
클러스터 자원 설정: config("spark.driver.memory", "24g"), .config("spark.executor.memory", "24g")
spark는 driver와 executor로 어플리케이션을 구성한다.
Driver는 한 개의 노드에서 실행되며 애플리케이션의 제어, 작업 스케줄링, 메타데이터 관리 등을 담당한다.
Executor는 실제 데이터 처리, 계산, 작업 실행을 담당한다. 메모리와 코어 수를 적절히 설정해 작업의 병렬성을 부여해야한다.
로컬 모드에서는 executor 프로세스를 여러개 생성하지 않고 단일 프로세스 내에서 지정한 스레드(12) 수를 활용하여 병렬처리를 수행한다. 또한 spark.executor.cores 옵션을 지정하지 않으면 기본적으로 1코어가 할당된다.
메모리를 너무 작게 설정하면 연산 도중 OOM이 발생할 수 있다.
파티션 수 최적화
Shuffle 파티션: .config("spark.sql.shuffle.partitions", "12")
기본값은 200이지만, 데이터 크기가 7백만 row 정도라면 파티션 수를 줄이는 것이 오버헤드를 줄일 수 있다.
셔플(데이터 재분배) 작업 후에 생성되는 파티션의 수를 코어수에 맞게 12개로 제한하여 균등하게 분배되도록 했다.
Parallelism: .config("spark.default.parallelism", "24")
명시적으로 파티션 수를 지정하지 않은 RDD 연산(예: parallelize()로 생성된 RDD나 일부 변환 연산)에서 사용할 기본 파티션 수를 결정. 너무 많은 태스크가 생성되면 task scheduling overhead가 발생할 수 있다.
직렬화 방식 개선
Kryo Serializer 사용: .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
기본 Java serializer보다 성능이 우수한 Kryo를 사용하면 데이터 직렬화/역직렬화 과정에서 속도 이점을 볼 수 있다.
생성된 세션에 대한 정보는 다음 명령어로 확인할 수 있다.
spark.sparkContext.getConf().getAll()
아니면 localhost:4040 에 접속하면 다음과 같은 spark 대시보드에 접속 가능하고 Environment에서 확인할 수 있다.
PostgreSQL 데이터 읽어오기
# PostgreSQL 연결 정보
db_url = "jdbc:postgresql://<DB_HOST>/postgres"
db_properties = {
"user": "postgres",
"password": "password",
"driver": "org.postgresql.Driver"
}
# table_name = "public.stock_chart"
table_query = """
(select * from steadybucks.stock_chart sc
join stock s on sc.stock_code = s.code
where s."section" = '주권') AS subquery
"""
df = spark.read.jdbc(
url=db_url,
table=table_query,
properties=db_properties,
)
# 데이터 출력
df.head()
PostgreSQL에 접근해 데이터를 읽어오는 방법은 다음과 같다.
- DB 접속 정보 선언 (DB 주소, DB 접속 권한)
- SQL 쿼리 작성
- spark.read.jdbc()를 통해 DB에 접속하고 query 결과 반환
Row(datetime=datetime.datetime(2025, 1, 21, 0, 0), stock_code='A000020', close_price=Decimal('6250.00'), high_price=Decimal('6380.00'), low_price=Decimal('6200.00'), open_price=Decimal('6370.00'), volume=Decimal('79700.00'), isspac=False, section_code=1, code='A000020', industry_code='KGS05P', type='한국주식', industry='제약', market='KOSPI', section='주권', name='동화약품')
'AI, 머신러닝' 카테고리의 다른 글
LLM의 효율적인 fine-tuning을 위한 PEFT 기법 - LoRA 알아보기 (0) | 2025.02.07 |
---|---|
RAG 시스템 강화하기 - LangChain으로 Multi-Query, Reranker 적용한 증권 뉴스 RAG 구현 (1) | 2025.01.30 |
네이버 증권뉴스 크롤링으로 RAG 데이터셋 만들기 (python, BeautifulSoup4) (0) | 2025.01.30 |
RAG 시스템 강화하기 - Multi-Query, Self-Query, Reranker 알아보기 (0) | 2025.01.29 |
OpenAI API와 LangChain으로 만드는 고성능 RAG 시스템 구현하기 (0) | 2025.01.28 |