diff --git a/src/main/Sessionization.py b/src/main/Sessionization.py index 9e15005..a7112fb 100644 --- a/src/main/Sessionization.py +++ b/src/main/Sessionization.py @@ -1,7 +1,8 @@ from pyspark.sql import SparkSession, DataFrame from pyspark.sql.window import Window -from pyspark.sql.functions import col, unix_timestamp, lag, when, last, sha2, concat_ws, sum as spark_sum, min as spark_min -from main.common_schemas import EVENT_SCHEMA +from pyspark.sql.functions import col, unix_timestamp, lag, when, last, sha2, concat_ws, \ + sum as spark_sum, min as spark_min, year, month, dayofmonth, hour +from common_schemas import EVENT_SCHEMA def load_data(ss, path, schema) -> DataFrame: @@ -24,7 +25,7 @@ def assign_session_id(df, SESSION_TIMEOUT) -> DataFrame: df.withColumn("prev_event_time", lag("event_time").over(window_spec)) .withColumn("time_diff", unix_timestamp("event_time") - unix_timestamp("prev_event_time")) .withColumn("new_session", when(col("time_diff") > SESSION_TIMEOUT, 1).otherwise(0)) - .withColumn("session_number",spark_sum("new_session").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)), + .withColumn("session_number", spark_sum("new_session").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)), ) ) @@ -52,7 +53,6 @@ def assign_session_id(df, SESSION_TIMEOUT) -> DataFrame: return df - def main(): spark = SparkSession.builder \ .master("local[*]") \ @@ -60,14 +60,30 @@ def main(): .getOrCreate() schema = EVENT_SCHEMA - path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=4/*.csv" - prev_path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=5/*.csv" + path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=13/*.csv" + prev_path = "/Users/doyeonpyun/Downloads/input_data/year=2019/month=10/day=10/hour=14/*.csv" + output_path = "/Users/doyeonpyun/Desktop/commercial_log" SESSION_TIMEOUT = 1800 + # 데이터 로드 및 세션 ID 계산 result_df = pre_processing_data(spark, path, prev_path, schema, SESSION_TIMEOUT) final_df = assign_session_id(result_df, SESSION_TIMEOUT) - final_df.select('event_time', 'user_id', 'prev_event_time', 'time_diff', 'session_id').show(truncate=False, n=1000) + # 파티션 컬럼 추가 + final_df = ( + final_df + .withColumn("year", year("event_time")) + .withColumn("month", month("event_time")) + .withColumn("day", dayofmonth("event_time")) + .withColumn("hour", hour("event_time")) + ) + + # 데이터 저장 + final_df.write \ + .mode("overwrite") \ + .partitionBy("year", "month", "day", "hour") \ + .option("header", "true") \ + .csv(output_path) if __name__ == "__main__": main()