Skip to content

Commit

Permalink
feat: output csv 생성 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
dodorip committed Jan 9, 2025
1 parent c0d374b commit 338f9a1
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions src/main/Sessionization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import col, unix_timestamp, lag, when, last, sha2, concat_ws, sum as spark_sum, min as spark_min
from main.common_schemas import EVENT_SCHEMA
from pyspark.sql.functions import col, unix_timestamp, lag, when, last, sha2, concat_ws, \
sum as spark_sum, min as spark_min, year, month, dayofmonth, hour
from common_schemas import EVENT_SCHEMA


def load_data(ss, path, schema) -> DataFrame:
Expand All @@ -24,7 +25,7 @@ def assign_session_id(df, SESSION_TIMEOUT) -> DataFrame:
df.withColumn("prev_event_time", lag("event_time").over(window_spec))
.withColumn("time_diff", unix_timestamp("event_time") - unix_timestamp("prev_event_time"))
.withColumn("new_session", when(col("time_diff") > SESSION_TIMEOUT, 1).otherwise(0))
.withColumn("session_number",spark_sum("new_session").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)),
.withColumn("session_number", spark_sum("new_session").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)),
)
)

Expand Down Expand Up @@ -52,22 +53,37 @@ def assign_session_id(df, SESSION_TIMEOUT) -> DataFrame:
return df



def main():
spark = SparkSession.builder \
.master("local[*]") \
.appName("Sessionization") \
.getOrCreate()

schema = EVENT_SCHEMA
path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=4/*.csv"
prev_path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=5/*.csv"
path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=13/*.csv"
prev_path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=14/*.csv"
output_path = "/Users/doyeonpyun/Desktop/commercial_log"
SESSION_TIMEOUT = 1800

# 데이터 로드 및 세션 ID 계산
result_df = pre_processing_data(spark, path, prev_path, schema, SESSION_TIMEOUT)
final_df = assign_session_id(result_df, SESSION_TIMEOUT)
final_df.select('event_time', 'user_id', 'prev_event_time', 'time_diff', 'session_id').show(truncate=False, n=1000)

# 파티션 컬럼 추가
final_df = (
final_df
.withColumn("year", year("event_time"))
.withColumn("month", month("event_time"))
.withColumn("day", dayofmonth("event_time"))
.withColumn("hour", hour("event_time"))
)

# 데이터 저장
final_df.write \
.mode("overwrite") \
.partitionBy("year", "month", "day", "hour") \
.option("header", "true") \
.csv(output_path)

if __name__ == "__main__":
main()

0 comments on commit 338f9a1

Please sign in to comment.