[특화 프로젝트] Spark 배치 분석 및 결과 저장

2026. 3. 16. 11:52📚 빅데이터 분산

빅데이터 분산 처리 프로젝트 - Spark 배치 분석 및 결과 저장

목표

HDFS에 쌓인 로그 데이터를 Spark로 분석하여
"비슷한 여행자들이 선호하는 장소" 결과를 도출하고 PostgreSQL에 저장하는 것이 목표다.

전체 흐름

HDFS (사용자 로그 CSV)
    → Spark가 읽어서 분석
    → 국적/연령대/성별/여행목적/라이프스타일별 인기 장소 도출
    → PostgreSQL의 popular_places 테이블에 저장

1. Spark 작업 실행 방식: spark-submit

Spark 분석 작업은 Spring Boot 내부에서 실행하는 것이 아니라,
별도의 스크립트를 Spark 클러스터에 제출(submit)하는 방식이다.
프로젝트 기획서에도 "새벽 4시 기준 스케줄링 : 트리거 역할"이라고 명시되어 있다.

새벽 4시 (스케줄러 트리거)
    → spark-submit 명령어 실행
    → Spark가 HDFS에서 로그 읽기
    → 군집별 인기 장소 분석
    → 결과를 PostgreSQL에 저장

분석 스크립트는 PySpark(Python)로 작성했다.
Java로도 가능하지만 Python이 더 간결하고 데이터 분석에 적합하다.


2. 사전 준비

테스트 데이터 적재

분석을 위해 다양한 사용자 로그를 HDFS에 적재했다.

# 한국인 20대 남성 관광객 로그
curl -X POST http://localhost:8090/api/logs -H "Content-Type: application/json" \
  -d "{\"userId\":\"user001\",\"nationality\":\"KR\",\"age\":25,\"gender\":\"M\",\"travelPurpose\":\"SIGHTSEEING\",\"lifestyle\":\"ADVENTURE\",\"action\":\"VIEW\",\"placeId\":\"place_001\",\"timestamp\":\"2026-03-16T10:30:00\"}"

# 일본인 30대 여성 맛집 탐방객 로그
curl -X POST http://localhost:8090/api/logs -H "Content-Type: application/json" \
  -d "{\"userId\":\"user004\",\"nationality\":\"JP\",\"age\":32,\"gender\":\"F\",\"travelPurpose\":\"FOOD\",\"lifestyle\":\"RELAXATION\",\"action\":\"GO\",\"placeId\":\"place_005\",\"timestamp\":\"2026-03-16T12:00:00\"}"

# ... 총 6건의 로그를 적재

PostgreSQL JDBC 드라이버 준비

Spark가 분석 결과를 PostgreSQL에 저장하려면 JDBC 드라이버가 필요하다.

# spark-jobs 폴더에 드라이버 다운로드
curl -L -o spark-jobs/postgresql-42.7.4.jar https://jdbc.postgresql.org/download/postgresql-42.7.4.jar

docker-compose.yml 수정

Spark Master가 로컬의 스크립트 파일에 접근할 수 있도록 volumes를 추가했다.

  spark-master:
    image: bde2020/spark-master:3.1.1-hadoop3.2
    container_name: travel-spark-master
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      CORE_CONF_fs_defaultFS: hdfs://namenode:9000
    volumes:
      - ./spark-jobs:/spark-jobs

./spark-jobs:/spark-jobs는 호스트의 spark-jobs 폴더를 컨테이너의 /spark-jobs 경로에 마운트한다.

폴더 구조

travel-project/
├── docker-compose.yml
└── spark-jobs/
    ├── analyze_logs.py            ← PySpark 분석 스크립트
    └── postgresql-42.7.4.jar      ← PostgreSQL JDBC 드라이버

3. PySpark 분석 스크립트

analyze_logs.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, dense_rank, when, sum
from pyspark.sql.window import Window

# 1. Spark 세션 생성
spark = SparkSession.builder \
    .appName("TravelLogAnalysis") \
    .config("spark.jars", "/spark-jobs/postgresql-42.7.4.jar") \
    .getOrCreate()

# 2. HDFS에서 로그 읽기
df = spark.read.csv("hdfs://namenode:9000/user-logs/*/*", header=False)

# 3. 컬럼 이름 지정
df = df.toDF(
    "user_id", "nationality", "age", "gender",
    "travel_purpose", "lifestyle", "action", "place_id", "timestamp"
)

# 4. 나이를 숫자로 변환하고 연령대 생성
df = df.withColumn("age", col("age").cast("int"))
df = df.withColumn("age_group",
    when(col("age") < 20, "10s")
    .when(col("age") < 30, "20s")
    .when(col("age") < 40, "30s")
    .when(col("age") < 50, "40s")
    .otherwise("50s+")
)

# 5. 행동별 가중치 부여
df = df.withColumn("score",
    when(col("action") == "GO", 3)
    .when(col("action") == "RE_RECOMMEND", 2)
    .otherwise(1)  # VIEW
)

# 6. 군집별 장소 점수 합산
grouped = df.groupBy(
    "nationality", "age_group", "gender",
    "travel_purpose", "lifestyle", "place_id"
).agg(
    count("*").alias("visit_count"),
    sum("score").alias("total_score")
)

# 7. 군집 내에서 점수 높은 순으로 순위 매기기
window = Window.partitionBy(
    "nationality", "age_group", "gender",
    "travel_purpose", "lifestyle"
).orderBy(col("total_score").desc())

ranked = grouped.withColumn("rank", dense_rank().over(window))

# 8. 상위 5개 장소만 선택
top5 = ranked.filter(col("rank") <= 5)

# 9. 결과를 PostgreSQL에 저장
top5.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://travel-postgres:5432/travel_db") \
    .option("dbtable", "popular_places") \
    .option("user", "admin") \
    .option("password", "admin1234") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

print("=== 분석 완료! popular_places 테이블에 저장됨 ===")

spark.stop()

스크립트 단계별 설명

단계 내용 설명
1~3 HDFS에서 읽기 모든 날짜(/*/*)의 CSV 파일을 읽어서 컬럼명 부여
4 연령대 생성 나이를 10대/20대/30대 등으로 그룹화. 기획서의 "userId별로 하면 경우의 수가 많아지니까 남성, 20대, 국적 이렇게 분류" 요구사항 반영
5 가중치 부여 VIEW=1점, RE_RECOMMEND=2점, GO=3점. 기획서의 "각 로그별 스코어를 부여함 : 가중치를 위해" 요구사항 반영
6 군집별 합산 국적+연령대+성별+여행목적+라이프스타일 조합별로 장소 점수를 합산
7 순위 매기기 각 군집 내에서 total_score 기준 내림차순 순위 부여
8 상위 5개 군집별 인기 장소 Top 5만 추출
9 PostgreSQL 저장 결과를 popular_places 테이블에 overwrite 모드로 저장

PostgreSQL 접속 주소

스크립트에서 PostgreSQL 접속 URL이 localhost가 아니라 travel-postgres인 이유는,
Spark가 Docker 컨테이너 안에서 실행되기 때문이다.
Docker 네트워크에서는 컨테이너 이름으로 서로 접근할 수 있다.

호스트(내 PC)에서 접속할 때     → localhost:5432
Docker 컨테이너에서 접속할 때   → travel-postgres:5432

4. 실행

spark-submit 명령어

MSYS_NO_PATHCONV=1 docker exec -it travel-spark-master \
  /spark/bin/spark-submit \
  --jars /spark-jobs/postgresql-42.7.4.jar \
  /spark-jobs/analyze_logs.py

⚠️ Windows Git Bash 사용 시 주의

Git Bash는 /spark를 Windows 경로(C:/Program Files/Git/spark)로 자동 변환한다.
MSYS_NO_PATHCONV=1을 앞에 붙여서 경로 변환을 방지해야 한다.

성공 로그

실행 후 대량의 Spark 로그가 출력되다가 마지막에 다음 메시지가 나오면 성공이다.

=== 분석 완료! popular_places 테이블에 저장됨 ===

5. 결과 확인

CLI로 확인

docker exec -it travel-postgres psql -U admin -d travel_db -c "SELECT * FROM popular_places;"

결과

 nationality | age_group | gender | travel_purpose |  lifestyle  |  place_id  | visit_count | total_score | rank
-------------+-----------+--------+----------------+-------------+------------+-------------+-------------+------
 KR          | 20s       | M      | SIGHTSEEING    | ADVENTURE   | place_001  |           3 |           5 |    1
 KR          | 20s       | M      | SIGHTSEEING    | ADVENTURE   | place_003  |           2 |           4 |    2
 JP          | 20s       | F      | FOOD           | RELAXATION  | place_005  |           1 |           1 |    1
 JP          | 30s       | F      | FOOD           | RELAXATION  | place_005  |           1 |           3 |    1

결과 해석

  • 한국인 / 20대 / 남성 / 관광 / 모험형: place_001이 1위(점수 5), place_003이 2위(점수 4)
  • 일본인 / 여성 / 맛집 / 휴식형: 20대와 30대 모두 place_005를 선호

이 데이터가 "나와 비슷한 여행자들의 pick" 추천의 기반이 된다.
예를 들어 20대 한국인 남성 관광객이 앱에 접속하면, 이 테이블에서 해당 군집을 조회하여
place_001, place_003을 추천할 수 있다.

GUI로 확인 (DBeaver)

커맨드 라인 대신 DBeaver를 사용하면 테이블 데이터를 시각적으로 확인할 수 있다.

  1. dbeaver.io에서 Community Edition 설치
  2. 새 연결 → PostgreSQL 선택
  3. 접속 정보 입력: Host localhost, Port 5432, Database travel_db, User admin, Password admin1234
  4. travel_db > Schemas > public > Tables > popular_places 더블클릭

트러블슈팅

Git Bash 경로 변환 문제

exec: "C:/Program Files/Git/spark/bin/spark-submit": no such file or directory

Git Bash가 /spark를 Windows 경로로 변환해서 발생한다.

해결: 명령어 앞에 MSYS_NO_PATHCONV=1을 추가한다.