Skip to content

Commit

Permalink
fix: pyspark 코드 실행을 위한 SparkSubmitOperator로 수정
Browse files Browse the repository at this point in the history
  • Loading branch information
dodorip committed Jan 9, 2025
1 parent c469f70 commit c0d374b
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions airflow/dags/hourly_batch_sessionization.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
import sys
sys.path.append('/Users/doyeonpyun/Desktop/Sessionization/src') # 로컬 환경
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from main.Sessionization import main
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'retries': 0,
'retry_delay': timedelta(minutes=1),
}

# Spark 설정 변수
SPARK_JOB = {
"task_id": "sessionization",
"application": "/Users/doyeonpyun/Desktop/Sessionization/src/main/Sessionization.py",
"conn_id": "local_spark",
"application_args": [
"{{ ds }}",
"{{ logical_date.strftime('%H') }}"
],
# 추가 설정
# 'conf': { # 필요 시 추가
# "spark.yarn.maxAppAttempts": "1",
# "spark.yarn.executor.memoryOverhead": "5120"
# },
# 'driver_memory': "3g", # 필요 시 추가
# 'executor_cores': 10, # 필요 시 추가
# 'num_executors': 10, # 필요 시 추가
# 'executor_memory': '5g', # 필요 시 추가
# 'keytab': '/keytab/path', # Kerberos 인증 필요 시 추가
# 'principal': '{keytab.principal}', # Kerberos 인증 필요 시 추가
# 'java_class': '{jar파일 안에 포함된 main class}' # JAR 실행 시 필요
}

with DAG(
dag_id='hourly_batch_sessionization',
default_args=default_args,
description='add session_id column',
schedule=None, # 수동 실행
description='Add session_id column',
schedule_interval=None, # 수동 실행
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:

def run_main():
main()

run_main_task = PythonOperator(
task_id='run_sessionization_main',
python_callable=run_main,
)
submit_main_task = SparkSubmitOperator(**SPARK_JOB)

run_main_task
submit_main_task

0 comments on commit c0d374b

Please sign in to comment.