Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11400][Manager] Support Airflow schedule engine #11479

Merged
merged 2 commits into from
Nov 20, 2024

Conversation

Zkplo
Copy link
Contributor

@Zkplo Zkplo commented Nov 10, 2024

Fixes #11400

Motivation

Modifications

Need to know about Airflow

  1. By default, Airflow rejects all REST API requests. We need to configure it according to the requirements of the official documentation.
  2. Airflow Connections is used to store credentials for connecting to other systems to ensure the security of credentials. For specific reference: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html.
  3. Airflow does not provide an API for DAG creation, so if we want to integrate with Inlong, it requires the original DAG.
dag_creator.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import os
import logging
import pytz
from croniter import croniter
from airflow.hooks.base_hook import BaseHook
from airflow import configuration

DAG_PATH = configuration.get('core', 'dags_folder') + "/"


def clean_expired_dags(**context):
    original_time = context.get('execution_date')
    target_timezone = pytz.timezone("Asia/Shanghai")
    utc_time = original_time.astimezone(target_timezone)
    current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
    logging.info(f"Current time: {current_time}")
    for dag_file in os.listdir(DAG_PATH):
        if dag_file.endswith(".py") and dag_file.startswith("inlong_offline_task_"):
            with open(DAG_PATH + dag_file, "r") as file:
                line = file.readline()
                while line and "end_offset_datetime_str" not in line:
                    line = file.readline()
                end_date_str = None
                if len(line.split("=")) > 1:
                    end_date_str = line.split("=")[1].strip().strip("\"")
                logging.info(f"DAG end time: {end_date_str}")
                if end_date_str:
                    try:
                        if str(current_time) > str(end_date_str):
                            dag_file_path = os.path.join(DAG_PATH, dag_file)
                            os.remove(dag_file_path)
                            # Optionally, delete the end_date variable
                            logging.info(f"Deleted expired DAG: {dag_file}")
                    except ValueError:
                        logging.error(f"Invalid date format for DAG {dag_file}: {end_date_str}")


default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(minutes=5),
    'catchup': False,
    'tags': ["inlong"]
}

dag = DAG(
    'dag_cleaner',
    default_args=default_args,
    schedule_interval="*/20 * * * *",
    is_paused_upon_creation=False
)

clean_task = PythonOperator(
    task_id='clean_expired_dags',
    python_callable=clean_expired_dags,
    provide_context=True,
    dag=dag,
)
dag_cleaner.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
import os
from datetime import datetime
from airflow.hooks.base_hook import BaseHook
from airflow import configuration

DAG_PATH = configuration.get('core', 'dags_folder') + "/"
DAG_PREFIX = 'inlong_offline_task_'

def create_dag_file(**context):
    conf = context.get('dag_run').conf
    print('conf: ', conf)
    groupId = conf.get('inlong_group_id')
    task_name = DAG_PREFIX + groupId
    timezone = conf.get('timezone')
    boundaryType = str(conf.get('boundary_type'))
    start_time = int(conf.get('start_time'))
    end_time = int(conf.get('end_time'))
    cron_expr = conf.get('cron_expr')
    seconds_interval = conf.get('seconds_interval')
    schedule_interval = cron_expr
    if cron_expr is None or len(cron_expr) == 0:
        schedule_interval = f'timedelta(seconds={seconds_interval})'
    else:
        schedule_interval = '"' + cron_expr + '"'
    connectionId = conf.get('connection_id')
    dag_content = f'''from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from croniter import croniter
from airflow.hooks.base_hook import BaseHook
import requests
import pytz

timezone = "{timezone}"
start_offset_datetime_str = {start_time}
end_offset_datetime_str = {end_time}
schedule_interval = {schedule_interval}  # Or put cron expression
dag_id = "{task_name}"
groupId = "{groupId}"
connectionId = "{connectionId}"
boundaryType = "{boundaryType}"

target_timezone = pytz.timezone(timezone)  

start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone)
end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone)

def taskFunction(**context):
    print("#########################")
    conn = BaseHook.get_connection(connectionId)
    url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}"
    params = {{
        "username": conn.login,
        "password": conn.password
    }}
    print("params", params)
    headers = {{
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0",
        "Accept": "application/json",
        "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
        "Accept-Encoding": "gzip, deflate",
        "Referer": "http://192.168.101.2:8083/",
        "Content-Type": "application/json;charset=UTF-8",
        "tenant": "public",
        "Origin": "http://192.168.101.2",
        "Connection": "close",
        "Priority": "u=0"
    }}
    time_interval = get_time_interval(context)
    data = {{
        "boundaryType": boundaryType,
        "groupId": groupId,
        "lowerBoundary": str(int(time_interval[0])),
        "upperBoundary": str(int(int(time_interval[1])))
    }}
    print("Request Body: ", data)
    response = requests.post(url, params=params, headers=headers, json=data)
    if response.status_code == 200:
        print(response.json()) 
    else:
        print(response.text)
    print("#########################")


def get_time_interval(context):
    execution_date = context.get('execution_date')
    execution_date = execution_date.astimezone(target_timezone)
    dag = context.get('dag')
    schedule_interval = dag.schedule_interval
    if isinstance(schedule_interval, timedelta):
        return execution_date.timestamp(), (execution_date + schedule_interval).timestamp()
    else:
        cron_expr = dag.schedule_interval
        cron = croniter(cron_expr, execution_date)
        next_run = cron.get_next(datetime)
        return execution_date.timestamp(), next_run.timestamp()


default_args = {{
    'owner': 'inlong',
    'start_date': start_date,
    'end_date': end_date,
    'catchup': False,
}}

dag = DAG(
    dag_id,
    default_args=default_args,
    schedule_interval=schedule_interval,
    is_paused_upon_creation=False
)

clean_task = PythonOperator(
    task_id=dag_id,
    python_callable=taskFunction,
    provide_context=True,
    dag=dag,
)
    '''
    dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py')
    with open(dag_file_path, 'w') as f:
        f.write(dag_content)
    print(f'Generated DAG file: {dag_file_path}')
default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup': False}
dag = DAG('dag_creator', default_args=default_args, schedule_interval=None, is_paused_upon_creation=False)
create_dag_task = PythonOperator(task_id='create_dag_file', python_callable=create_dag_file, provide_context=True, dag=dag)

System design:

  1. In order to facilitate the maintenance and expansion of AIRFLOW interface support in the future, the AirflowApi interface and the BaseAirflowApi abstract class are designed, and subsequent expansion only needs to be done on this basis.
  2. Implement a unified request class AirflowServerClient for the interface.
  3. Add two Interceptors to OkHttpClient. AirflowAuthInterceptor is used for unified authorization of the interface, and LoggingInterceptor is used for logging.

Results:

When we issue a scheduled task, Airflow's dag_creator will receive information from Inlong manager and create an offline task DAG based on the information.As shown in the figure below.
image-20241110105752348

Inlong Manager Log
[ ] 2024-11-08 12:38:22.667 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 100 to 101 for groupId=test_offline_1 
[ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0] .a.i.m.s.ScheduleClientFactory:51 - Get schedule engine client success for Airflow 
[ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0] .i.m.s.a.AirflowScheduleEngine:138 - Registering DAG for test_offline_1 
[ ] 2024-11-08 12:38:23.120 - INFO [inlong-workflow-0] a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method: POST, Response status code: 200 
[ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 101 to 102 for groupId=test_offline_1 
[ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] a.i.m.s.s.ScheduleOperatorImpl:150 - Register schedule info success for group test_offline_1 
[ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0] .GroupScheduleResourceListener:82 - success to process schedule resource for group=test_offline_1 
[ ] 2024-11-08 12:38:23.163 - INFO [inlong-workflow-0] .l.g.InitGroupCompleteListener:78 - begin to execute InitGroupCompleteListener for groupId=test_offline_1 
[ ] 2024-11-08 12:38:23.164 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:540 - begin to update group status to [130] for groupId=test_offline_1 by user=admin 
[ ] 2024-11-08 12:38:23.168 - INFO [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:558 - success to update group status to [130] for groupId=test_offline_1 by user=admin 
[ ] 2024-11-08 12:38:23.188 - WARN [inlong-workflow-0] i.m.s.g.InlongGroupServiceImpl:249 - start time is less than current time, re-set to current time for groupId=test_offline_1, startTime=2024-11-08T12:34:47.000+0000, newStartTime=2024-11-08T12:38:23.188+0000 
[ ] 2024-11-08 12:38:23.197 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:111 - success to update schedule info for groupId=test_offline_1 
[ ] 2024-11-08 12:38:23.202 - INFO [inlong-workflow-0] .a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 102 to 103 for groupId=test_offline_1 
[ ] 2024-11-08 12:38:23.203 - INFO [inlong-workflow-0] .i.m.s.a.AirflowScheduleEngine:203 - Updating DAG for test_offline_1 
[ ] 2024-11-08 12:38:23.463 - INFO [inlong-workflow-0] a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI: http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method: POST, Response status code: 200 
The task has been executed for a certain period of time, and you can see that the current interval is the time of the last execution and the interval of the next execution meets expectations.

image-20241108205147944
Now, I will modify the execution interval of this task through the inlong dashboard.

image-20241108205809834
You can see that the modification is successful, but the Last Run field of the Web UI will not be reflected immediately, but it can be seen in Run After.

image-20241108210117653
Next, change the scheduling period of this offline task to a Cron expression.

image-20241108212101216
From the figure below we can see that the modification has been successful.

image-20241108212237390
The execution results of each scheduled task are as follows:

image-20241110111215023

There are two ways to delete files. One is that dag_cleaner will regularly scan the files that meet the rules in the directory to determine whether their end time exceeds the current time. The second is that Inlong manager triggers dag_cleaner through an interface with parameters, and dag_cleaner will directly delete the Dag file. For the latter, Inlong manager will also delete the DAG loaded into the memory instance through another interface.

image-20241108222558534

Verifying this change

(Please pick either of the following options)

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

    (example:)

    • Added integration tests for end-to-end deployment with large payloads (10MB)
    • Extended integration test for recovery after broker failure

aloyszhang
aloyszhang previously approved these changes Nov 11, 2024
Copy link
Contributor

@aloyszhang aloyszhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

dockerzhang
dockerzhang previously approved these changes Nov 12, 2024
@Zkplo Zkplo force-pushed the INLONG-11400 branch 2 times, most recently from 14ab207 to 8b242da Compare November 14, 2024 06:07
fuweng11
fuweng11 previously approved these changes Nov 19, 2024
aloyszhang
aloyszhang previously approved these changes Nov 19, 2024
dockerzhang
dockerzhang previously approved these changes Nov 20, 2024
aloyszhang
aloyszhang previously approved these changes Nov 20, 2024
fuweng11
fuweng11 previously approved these changes Nov 20, 2024
@dockerzhang dockerzhang dismissed stale reviews from fuweng11, aloyszhang, and themself via e42edb6 November 20, 2024 02:32
dockerzhang
dockerzhang previously approved these changes Nov 20, 2024
aloyszhang
aloyszhang previously approved these changes Nov 20, 2024
vernedeng
vernedeng previously approved these changes Nov 20, 2024
@dockerzhang dockerzhang merged commit befe172 into apache:master Nov 20, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Manager] Support Airflow schedule engine
5 participants