Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session_id 부여 로직 수정 및 Airflow dag 개발 #5

Merged
merged 11 commits into from
Jan 21, 2025
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,18 @@ pyrightconfig.json
*.idea

# End of https://www.toptal.com/developers/gitignore/api/macos,pycharm,python

## .gitignore 생성
airflow.cfg
airflow.db
airflow-webserver.pid
unittests.cfg
.idea
logs
venv
*.iml
.envrc
.direnv
dist
airflow.egg-info
build
44 changes: 44 additions & 0 deletions airflow/dags/hourly_batch_sessionization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from datetime import datetime, timedelta
from pendulum import timezone
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}

# Spark 설정
spark_config = {
"application_args": [
"{{ ds }}", # 실행 날짜
"{{ logical_date.strftime('%H') }}" # 실행 시간
],
# 필요 시 아래 주석 해제하여 추가 설정
# 'driver_memory': "3g",
# 'executor_cores': 4,
# 'executor_memory': '4g',
}

# DAG 정의
with DAG(
dag_id='hourly_batch_sessionization',
default_args=default_args,
description='Add session_id column',
schedule_interval='@hourly', # 매 시간 실행
start_date=datetime(2019, 10, 10, tzinfo=timezone("Asia/Seoul")), # 시작 시간 (KST 고정)
catchup=False,
) as dag:

# Spark 작업 정의
submit_main_task = SparkSubmitOperator(
task_id="sessionization",
conn_id="local_spark",
application="/Users/doyeonpyun/Desktop/Sessionization/src/main/Sessionization.py", # 스크립트 경로
**spark_config
)

submit_main_task
49 changes: 49 additions & 0 deletions airflow/dags/hourly_batch_sessionization_qa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datetime import datetime
from pendulum import timezone
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2019, 10, 10, tzinfo=timezone("Asia/Seoul")), # KST로 고정
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}

spark_config = {
"application_args": [
"{{ ds }}",
"{{ logical_date.strftime('%H') }}"
],
# 추가 설정 (필요 시 주석 제거)
# 'conf': { # 필요 시 추가
# "spark.yarn.maxAppAttempts": "1",
# "spark.yarn.executor.memoryOverhead": "5120"
# },
# 'driver_memory': "3g", # 필요 시 추가
# 'executor_cores': 10, # 필요 시 추가
# 'num_executors': 10, # 필요 시 추가
# 'executor_memory': '5g', # 필요 시 추가
# 'keytab': '/keytab/path', # Kerberos 인증 필요 시 추가
# 'principal': '{keytab.principal}', # Kerberos 인증 필요 시 추가
# 'java_class': '{jar파일 안에 포함된 main class}' # JAR 실행 시 필요
}

with DAG(
dag_id='hourly_batch_sessionization_qa',
default_args=default_args,
description='QA for sessionization',
schedule_interval='@hourly', # 매 시간 실행
start_date=datetime(2019, 10, 1),
catchup=False,
) as dag:

submit_main_task = SparkSubmitOperator(
task_id="sessionization",
conn_id="local_spark",
application="/Users/doyeonpyun/Desktop/Sessionization/src/main/sessionization.py",
**spark_config)

submit_main_task
117 changes: 89 additions & 28 deletions src/main/Sessionization.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,118 @@
from pyspark.sql import SparkSession,DataFrame
import sys
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import col, unix_timestamp, lag, when, sha2, concat_ws, sum as spark_sum
from main.schemas.common_schemas import EVENT_SCHEMA
from pyspark.sql.functions import col, unix_timestamp, lag, when, last, sha2, concat_ws, \
sum as spark_sum, min as spark_min, year, month, dayofmonth, hour
from common_schemas import EVENT_SCHEMA


def load_data(ss, path, schema) -> DataFrame:
return ss.read.option("header", "true").schema(schema).csv(path) # ss 사용
print(f"Loading data from path: {path}") # 데이터 경로 출력
return ss.read.option("header", "true").schema(schema).csv(path)


def pre_processing_data(ss, raw_data_path, prev_data_path, schema, session_timeout) -> DataFrame:
print(f"Processing raw data path: {raw_data_path}") # raw 데이터 경로 출력
print(f"Processing previous data path: {prev_data_path}") # 이전 데이터 경로 출력

raw_data = load_data(ss, raw_data_path, schema)
prev_data = load_data(ss, prev_data_path, schema)
# 데이터 병합 및 날짜 타입 변경

print(f"Raw data count: {raw_data.count()}") # raw 데이터 개수 출력
print(f"Previous data count: {prev_data.count()}") # 이전 데이터 개수 출력

df = raw_data.unionAll(prev_data)
df = df.withColumn("event_time", col("event_time").cast("timestamp"))
print(f"Combined data count: {df.count()}") # 병합된 데이터 개수 출력
return df


def assign_session_id(df, session_timeout) -> DataFrame:
def assign_session_id(df, SESSION_TIMEOUT) -> DataFrame:
window_spec = Window.partitionBy("user_id").orderBy("event_time")

# 필요 컬럼 생성
# new_session : 새로운 세션 발생 여부
# session_number : window 함수를 통해 new_session 컬럼을 누적하며 session_id 구분 로직 구현
# session_id : sha 함수를 통한 최종 session_id
df = (df.withColumn("prev_event_time",lag("event_time").over(window_spec))
.withColumn("time_diff",unix_timestamp("event_time") - unix_timestamp("prev_event_time"))
.withColumn("new_session",when(col("time_diff") > session_timeout, 1).otherwise(0))
.withColumn("session_number",spark_sum("new_session").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)))
.withColumn("session_id",sha2(concat_ws("_", col("user_id"), col("session_number")), 256)))
df = (
df.withColumn("prev_event_time", lag("event_time").over(window_spec))
.withColumn("time_diff", unix_timestamp("event_time") - unix_timestamp("prev_event_time"))
.withColumn("new_session", when(col("time_diff") > SESSION_TIMEOUT, 1).otherwise(0))
.withColumn("session_number",spark_sum("new_session").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)),)
)

session_window_spec = Window.partitionBy("user_id", "session_number")
df = df.withColumn("session_start_time", spark_min("event_time").over(session_window_spec))

session_id_expr = (
when(
col("session_id").isNotNull(),
col("session_id")
)
.when(
(col("time_diff").isNull()) | (col("time_diff") > SESSION_TIMEOUT),
sha2(concat_ws("_", col("session_start_time"), col("user_id")), 256)
)
.otherwise(None)
)

df = df.withColumn(
"session_id",
last(session_id_expr, True).over(window_spec.rowsBetween(Window.unboundedPreceding, 0))
)
return df


if __name__ == "__main__":
def main():
# 명령줄 인자로 날짜와 시간 받기
if len(sys.argv) < 3:
print("Usage: spark-submit <script> <event_date> <event_hour>")
sys.exit(1)

event_date = sys.argv[1] # "{{ ds }}"
event_hour = sys.argv[2] # "{{ logical_date.strftime('%H') }}"

print(f"Event Date: {event_date}, Event Hour: {event_hour}") # 이벤트 날짜와 시간 출력

spark = SparkSession.builder \
.master("local[*]") \
.appName("Sessionization") \
.getOrCreate()

# 스키마 가져오기
schema = EVENT_SCHEMA
session_timeout = 1800

# 동적 경로 생성
path = f"/Users/doyeonpyun/Downloads/input_data/year={event_date[:4]}/month={event_date[5:7]}/day={event_date[8:10]}/hour={event_hour}/*.csv"
prev_path = f"/Users/doyeonpyun/Downloads/input_data/year={event_date[:4]}/month={event_date[5:7]}/day={event_date[8:10]}/hour={int(event_hour) - 1:02d}/*.csv"
output_path = "/Users/doyeonpyun/Desktop/commercial_log"

print(f"Generated path: {path}")
print(f"Generated prev_path: {prev_path}")
print(f"Output path: {output_path}")

result_df = pre_processing_data(spark, path, prev_path, schema, session_timeout)

# 데이터 경로
from_file = True
path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=4/*.csv" # 동적으로 변경 예정
prev_path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=5/*.csv" # 동적으로 변경 예정
# 데이터 스키마 및 내용 출력
print("Result DataFrame Schema:")
result_df.printSchema()
result_df.show(5) # 상위 5개 행 출력

# 세션 초과 기준
SESSION_TIMEOUT = 1800 # 30분
# 데이터 세션 ID 생성 및 결과 저장
final_df = assign_session_id(result_df, session_timeout)

# 데이터 처리
result_df = pre_processing_data(spark, path, prev_path, schema, SESSION_TIMEOUT)
final_df = assign_session_id(result_df, SESSION_TIMEOUT)
final_df = (
final_df
.withColumn("year", year("event_time"))
.withColumn("month", month("event_time"))
.withColumn("day", dayofmonth("event_time"))
.withColumn("hour", hour("event_time"))
)

# 결과 확인
final_df.select('event_time','user_id','prev_event_time','time_diff','session_id').show(truncate=False,n = 1000)
final_df.write \
.mode("overwrite") \
.partitionBy("year", "month", "day", "hour") \
.option("header", "true") \
.csv(output_path)

print(f"Data processed and saved to: {output_path}") # 데이터 저장 완료 메시지 출력


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
StructField("category_code", StringType(), True),
StructField("brand", StringType(), True),
StructField("price", StringType(), True),
StructField("user_id", StringType(), False)
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), True)
])
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ class SchemaProvider:
StructField("category_code", StringType(), True),
StructField("brand", StringType(), True),
StructField("price", StringType(), True),
StructField("user_id", StringType(), False)
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), False)
])
61 changes: 46 additions & 15 deletions src/test/SessionizationTest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql import SparkSession
from main.Sessionization import assign_session_id
from main.schemas.common_schemas import EVENT_SCHEMA
from main.common_schemas import EVENT_SCHEMA


# SparkSession fixture
Expand All @@ -19,39 +19,70 @@ def sample_data(spark):
schema = EVENT_SCHEMA
data = [
# 같은 유저, 동일한 세션 (30분 이내)
("2019-10-10 04:00:00", "view", "product_111", "category_211", "brand1", "samsung", 257.15, "user_1"),
("2019-10-10 04:10:00", "view", "product_112", "category_212", None, "samsung", 41.19, "user_1"),
("2019-10-10 04:00:00", "view", "product_111", "category_211", "brand1", "samsung", "257.15", "user_1", None),
("2019-10-10 04:10:00", "view", "product_112", "category_212", None, "samsung", "41.19", "user_1", None),

# 같은 유저, 다른 세션 (30분 이상)
("2019-10-10 04:00:00", "view", "product_113", "category_213", "brand2", "apple", 694.74, "user_2"),
("2019-10-10 04:45:00", "view", "product_114", "category_214", "brand3", "apple", 100.36, "user_2"),
("2019-10-10 04:00:00", "view", "product_113", "category_213", "brand2", "apple", "694.74", "user_2", None),
("2019-10-10 04:45:00", "view", "product_114", "category_214", "brand3", "apple", "100.36", "user_2", None),

# 다른 유저
("2019-10-10 04:00:00", "view", "product_115", "category_215", None, None, 15.96, "user_3")
("2019-10-10 04:00:00", "view", "product_115", "category_215", None, None, "15.96", "user_3", None),

# 같은 유저, 동일한 세션 (event_time만 다르게 설정한 경우)
("2019-10-10 05:00:00", "view", "product_118", "category_218", "brand6", "sony", "200.00", "user_4", None),
("2019-10-10 05:10:00", "view", "product_119", "category_219", "brand7", "sony", "300.00", "user_4", None),

# inactive session 시뮬레이션: 30분 이상 시간 차이
("2019-10-10 05:00:00", "view", "product_120", "category_220", "brand8", "lg", "150.00", "user_5", None),
("2019-10-10 05:40:00", "view", "product_121", "category_221", "brand9", "lg", "250.00", "user_5", None),
]
return spark.createDataFrame(data, schema=schema)
return spark.createDataFrame(data, schema=EVENT_SCHEMA)


# assign_session_id 테스트
def test_assign_session_id(spark, sample_data):
SESSION_TIMEOUT = 1800 # 30분

# 테스트 수행
result_df = assign_session_id(sample_data, SESSION_TIMEOUT)

# 필요한 컬럼 선택
result_df = result_df.select("user_id", "event_time", "prev_event_time", "time_diff", "session_id")
result_df = result_df.select(
"user_id", "event_time", "prev_event_time", "time_diff",
"new_session", "session_start_time", "session_id"
)

# 결과 출력
result_df.show(truncate=False)

# 테스트 조건
# 테스트 조건들
# 1. 같은 세션 검증
user1_sessions = result_df.filter(result_df.user_id == "user_1").select("session_id").distinct().count()
user2_sessions = result_df.filter(result_df.user_id == "user_2").select("session_id").distinct().count()

# 같은 세션 (user_1) 검증
assert user1_sessions == 1, f"user_1 should have 1 session, but found {user1_sessions}"

# 다른 세션 (user_2) 검증
# 2. 다른 세션 검증
user2_sessions = result_df.filter(result_df.user_id == "user_2").select("session_id").distinct().count()
assert user2_sessions == 2, f"user_2 should have 2 sessions, but found {user2_sessions}"

# 3. inactive 세션 확인
user5_sessions = result_df.filter(result_df.user_id == "user_5").select("session_id").distinct().count()
assert user5_sessions == 2, f"user_5 should have 2 sessions, but found {user5_sessions}"

# 4. 각 세션의 session_start_time 검증
user4_start_times = result_df.filter(result_df.user_id == "user_4").select(
"session_start_time", "event_time"
).distinct().collect()
for row in user4_start_times:
assert row["session_start_time"] <= row["event_time"], \
f"session_start_time {row['session_start_time']} should be <= event_time {row['event_time']} for user_4"

# 5. 동일 세션 내 session_id 일관성 확인
for user in ["user_1", "user_2", "user_4", "user_5"]:
grouped_df = result_df.filter(result_df.user_id == user).groupBy("session_id").count()
for row in grouped_df.collect():
assert row["count"] >= 1, f"session_id {row['session_id']} for {user} should group all events in the session"

# 6. session_id null 여부 검증
null_session_ids = result_df.filter(result_df.session_id.isNull()).count()
assert null_session_ids == 0, f"There are {null_session_ids} null session_ids, but all session_ids should be assigned."

Loading