2026. 3. 16. 11:52ㆍ📚 빅데이터 분산
빅데이터 분산 처리 프로젝트 - Spark 배치 분석 및 결과 저장
목표
HDFS에 쌓인 로그 데이터를 Spark로 분석하여
"비슷한 여행자들이 선호하는 장소" 결과를 도출하고 PostgreSQL에 저장하는 것이 목표다.
전체 흐름
HDFS (사용자 로그 CSV)
→ Spark가 읽어서 분석
→ 국적/연령대/성별/여행목적/라이프스타일별 인기 장소 도출
→ PostgreSQL의 popular_places 테이블에 저장1. Spark 작업 실행 방식: spark-submit
Spark 분석 작업은 Spring Boot 내부에서 실행하는 것이 아니라,
별도의 스크립트를 Spark 클러스터에 제출(submit)하는 방식이다.
프로젝트 기획서에도 "새벽 4시 기준 스케줄링 : 트리거 역할"이라고 명시되어 있다.
새벽 4시 (스케줄러 트리거)
→ spark-submit 명령어 실행
→ Spark가 HDFS에서 로그 읽기
→ 군집별 인기 장소 분석
→ 결과를 PostgreSQL에 저장분석 스크립트는 PySpark(Python)로 작성했다.
Java로도 가능하지만 Python이 더 간결하고 데이터 분석에 적합하다.
2. 사전 준비
테스트 데이터 적재
분석을 위해 다양한 사용자 로그를 HDFS에 적재했다.
# 한국인 20대 남성 관광객 로그
curl -X POST http://localhost:8090/api/logs -H "Content-Type: application/json" \
-d "{\"userId\":\"user001\",\"nationality\":\"KR\",\"age\":25,\"gender\":\"M\",\"travelPurpose\":\"SIGHTSEEING\",\"lifestyle\":\"ADVENTURE\",\"action\":\"VIEW\",\"placeId\":\"place_001\",\"timestamp\":\"2026-03-16T10:30:00\"}"
# 일본인 30대 여성 맛집 탐방객 로그
curl -X POST http://localhost:8090/api/logs -H "Content-Type: application/json" \
-d "{\"userId\":\"user004\",\"nationality\":\"JP\",\"age\":32,\"gender\":\"F\",\"travelPurpose\":\"FOOD\",\"lifestyle\":\"RELAXATION\",\"action\":\"GO\",\"placeId\":\"place_005\",\"timestamp\":\"2026-03-16T12:00:00\"}"
# ... 총 6건의 로그를 적재
PostgreSQL JDBC 드라이버 준비
Spark가 분석 결과를 PostgreSQL에 저장하려면 JDBC 드라이버가 필요하다.
# spark-jobs 폴더에 드라이버 다운로드
curl -L -o spark-jobs/postgresql-42.7.4.jar https://jdbc.postgresql.org/download/postgresql-42.7.4.jar
docker-compose.yml 수정
Spark Master가 로컬의 스크립트 파일에 접근할 수 있도록 volumes를 추가했다.
spark-master:
image: bde2020/spark-master:3.1.1-hadoop3.2
container_name: travel-spark-master
ports:
- "8080:8080"
- "7077:7077"
environment:
CORE_CONF_fs_defaultFS: hdfs://namenode:9000
volumes:
- ./spark-jobs:/spark-jobs
./spark-jobs:/spark-jobs는 호스트의 spark-jobs 폴더를 컨테이너의 /spark-jobs 경로에 마운트한다.
폴더 구조
travel-project/
├── docker-compose.yml
└── spark-jobs/
├── analyze_logs.py ← PySpark 분석 스크립트
└── postgresql-42.7.4.jar ← PostgreSQL JDBC 드라이버3. PySpark 분석 스크립트
analyze_logs.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, dense_rank, when, sum
from pyspark.sql.window import Window
# 1. Spark 세션 생성
spark = SparkSession.builder \
.appName("TravelLogAnalysis") \
.config("spark.jars", "/spark-jobs/postgresql-42.7.4.jar") \
.getOrCreate()
# 2. HDFS에서 로그 읽기
df = spark.read.csv("hdfs://namenode:9000/user-logs/*/*", header=False)
# 3. 컬럼 이름 지정
df = df.toDF(
"user_id", "nationality", "age", "gender",
"travel_purpose", "lifestyle", "action", "place_id", "timestamp"
)
# 4. 나이를 숫자로 변환하고 연령대 생성
df = df.withColumn("age", col("age").cast("int"))
df = df.withColumn("age_group",
when(col("age") < 20, "10s")
.when(col("age") < 30, "20s")
.when(col("age") < 40, "30s")
.when(col("age") < 50, "40s")
.otherwise("50s+")
)
# 5. 행동별 가중치 부여
df = df.withColumn("score",
when(col("action") == "GO", 3)
.when(col("action") == "RE_RECOMMEND", 2)
.otherwise(1) # VIEW
)
# 6. 군집별 장소 점수 합산
grouped = df.groupBy(
"nationality", "age_group", "gender",
"travel_purpose", "lifestyle", "place_id"
).agg(
count("*").alias("visit_count"),
sum("score").alias("total_score")
)
# 7. 군집 내에서 점수 높은 순으로 순위 매기기
window = Window.partitionBy(
"nationality", "age_group", "gender",
"travel_purpose", "lifestyle"
).orderBy(col("total_score").desc())
ranked = grouped.withColumn("rank", dense_rank().over(window))
# 8. 상위 5개 장소만 선택
top5 = ranked.filter(col("rank") <= 5)
# 9. 결과를 PostgreSQL에 저장
top5.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://travel-postgres:5432/travel_db") \
.option("dbtable", "popular_places") \
.option("user", "admin") \
.option("password", "admin1234") \
.option("driver", "org.postgresql.Driver") \
.mode("overwrite") \
.save()
print("=== 분석 완료! popular_places 테이블에 저장됨 ===")
spark.stop()
스크립트 단계별 설명
| 단계 | 내용 | 설명 |
|---|---|---|
| 1~3 | HDFS에서 읽기 | 모든 날짜(/*/*)의 CSV 파일을 읽어서 컬럼명 부여 |
| 4 | 연령대 생성 | 나이를 10대/20대/30대 등으로 그룹화. 기획서의 "userId별로 하면 경우의 수가 많아지니까 남성, 20대, 국적 이렇게 분류" 요구사항 반영 |
| 5 | 가중치 부여 | VIEW=1점, RE_RECOMMEND=2점, GO=3점. 기획서의 "각 로그별 스코어를 부여함 : 가중치를 위해" 요구사항 반영 |
| 6 | 군집별 합산 | 국적+연령대+성별+여행목적+라이프스타일 조합별로 장소 점수를 합산 |
| 7 | 순위 매기기 | 각 군집 내에서 total_score 기준 내림차순 순위 부여 |
| 8 | 상위 5개 | 군집별 인기 장소 Top 5만 추출 |
| 9 | PostgreSQL 저장 | 결과를 popular_places 테이블에 overwrite 모드로 저장 |
PostgreSQL 접속 주소
스크립트에서 PostgreSQL 접속 URL이 localhost가 아니라 travel-postgres인 이유는,
Spark가 Docker 컨테이너 안에서 실행되기 때문이다.
Docker 네트워크에서는 컨테이너 이름으로 서로 접근할 수 있다.
호스트(내 PC)에서 접속할 때 → localhost:5432
Docker 컨테이너에서 접속할 때 → travel-postgres:54324. 실행
spark-submit 명령어
MSYS_NO_PATHCONV=1 docker exec -it travel-spark-master \
/spark/bin/spark-submit \
--jars /spark-jobs/postgresql-42.7.4.jar \
/spark-jobs/analyze_logs.py
⚠️ Windows Git Bash 사용 시 주의
Git Bash는
/spark를 Windows 경로(C:/Program Files/Git/spark)로 자동 변환한다.MSYS_NO_PATHCONV=1을 앞에 붙여서 경로 변환을 방지해야 한다.
성공 로그
실행 후 대량의 Spark 로그가 출력되다가 마지막에 다음 메시지가 나오면 성공이다.
=== 분석 완료! popular_places 테이블에 저장됨 ===5. 결과 확인
CLI로 확인
docker exec -it travel-postgres psql -U admin -d travel_db -c "SELECT * FROM popular_places;"
결과
nationality | age_group | gender | travel_purpose | lifestyle | place_id | visit_count | total_score | rank
-------------+-----------+--------+----------------+-------------+------------+-------------+-------------+------
KR | 20s | M | SIGHTSEEING | ADVENTURE | place_001 | 3 | 5 | 1
KR | 20s | M | SIGHTSEEING | ADVENTURE | place_003 | 2 | 4 | 2
JP | 20s | F | FOOD | RELAXATION | place_005 | 1 | 1 | 1
JP | 30s | F | FOOD | RELAXATION | place_005 | 1 | 3 | 1결과 해석
- 한국인 / 20대 / 남성 / 관광 / 모험형: place_001이 1위(점수 5), place_003이 2위(점수 4)
- 일본인 / 여성 / 맛집 / 휴식형: 20대와 30대 모두 place_005를 선호
이 데이터가 "나와 비슷한 여행자들의 pick" 추천의 기반이 된다.
예를 들어 20대 한국인 남성 관광객이 앱에 접속하면, 이 테이블에서 해당 군집을 조회하여
place_001, place_003을 추천할 수 있다.
GUI로 확인 (DBeaver)
커맨드 라인 대신 DBeaver를 사용하면 테이블 데이터를 시각적으로 확인할 수 있다.
- dbeaver.io에서 Community Edition 설치
- 새 연결 → PostgreSQL 선택
- 접속 정보 입력: Host
localhost, Port5432, Databasetravel_db, Useradmin, Passwordadmin1234 travel_db > Schemas > public > Tables > popular_places더블클릭
트러블슈팅
Git Bash 경로 변환 문제
exec: "C:/Program Files/Git/spark/bin/spark-submit": no such file or directoryGit Bash가 /spark를 Windows 경로로 변환해서 발생한다.
해결: 명령어 앞에 MSYS_NO_PATHCONV=1을 추가한다.
'📚 빅데이터 분산' 카테고리의 다른 글
| [특화 프로젝트] 추천 장소 조회 API 개발 (3) | 2026.03.16 |
|---|---|
| [특화 프로젝트] HDFS에 사용자 로그 적재 (0) | 2026.03.16 |
| [특화 프로젝트] Spring Boot 프로젝트 생성 및 PostgreSQL 연동 (2) | 2026.03.16 |
| [특화 프로젝트]Docker 인프라 세팅하는 법 (2) | 2026.03.16 |