diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java new file mode 100644 index 00000000000..deb056ed4f7 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "Full representation of the connection.") +public class AirflowConnection { + + @JsonProperty("connection_id") + @ApiModelProperty("The connection ID.") + private String connectionId; + + @JsonProperty("conn_type") + @ApiModelProperty("The connection type.") + private String connType; + + @JsonProperty("description") + @ApiModelProperty("The description of the connection.") + private String description; + + @JsonProperty("host") + @ApiModelProperty("Host of the connection.") + private String host; + + @JsonProperty("login") + @ApiModelProperty("Login of the connection.") + private String login; + + @JsonProperty("schema") + @ApiModelProperty("Schema of the connection.") + private String schema; + + @JsonProperty("port") + @ApiModelProperty("Port of the connection.") + private Integer port; + + @JsonProperty("password") + @ApiModelProperty("Password of the connection.") + private String password; + + @JsonProperty("extra") + @ApiModelProperty("Additional information description of the connection.") + private String extra; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java new file mode 100644 index 00000000000..578eadb1514 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "DAG Description Information.") +public class DAG { + + @JsonProperty("dag_id") + @ApiModelProperty("The ID of the DAG.") + private String dagId; + + @JsonProperty("root_dag_id") + @ApiModelProperty("If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.") + private String rootDagId; + + @JsonProperty("is_paused") + @ApiModelProperty("Whether the DAG is paused.") + private Boolean isPaused; + + @JsonProperty("is_active") + @ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).") + private Boolean isActive; + + @JsonProperty("description") + @ApiModelProperty("User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents.") + private String description; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java new file mode 100644 index 00000000000..7a52548f417 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.List; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "Collection of DAGs.") +public class DAGCollection { + + @JsonProperty("dags") + @ApiModelProperty("List of DAGs.") + private List dags = null; + + @JsonProperty("total_entries") + @ApiModelProperty("The length of DAG list.") + private Integer totalEntries; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java new file mode 100644 index 00000000000..e9384c75da4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "DAGRun Description Information.") +public class DAGRun { + + @JsonProperty("conf") + @ApiModelProperty("JSON object describing additional configuration parameters.") + private Object conf; + + @JsonProperty("dag_id") + @ApiModelProperty("Airflow DAG id.") + private String dagId; + + @JsonProperty("dag_run_id") + @ApiModelProperty("Airflow DAGRun id (Nullable).") + private String dagRunId; + + @JsonProperty("end_date") + @ApiModelProperty("The end time of this DAGRun.") + private String endDate; + + @JsonProperty("start_date") + @ApiModelProperty("The start time of this DAGRun.") + private String startDate; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java new file mode 100644 index 00000000000..4154c2526ce --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "DAGRunConf Description Information.") +public class DAGRunConf { + + @JsonProperty("inlong_group_id") + @ApiModelProperty("Specify the Inlong group ID") + private String inlongGroupId; + + @JsonProperty("start_time") + @ApiModelProperty("The start time of DAG scheduling.") + private long startTime; + + @JsonProperty("end_time") + @ApiModelProperty("The end time of DAG scheduling.") + private long endTime; + + @JsonProperty("boundary_type") + @ApiModelProperty("The offline task boundary type.") + private String boundaryType; + + @JsonProperty("cron_expr") + @ApiModelProperty("Cron expression.") + private String cronExpr; + + @JsonProperty("seconds_interval") + @ApiModelProperty("Time interval (in seconds).") + private String secondsInterval; + + @JsonProperty("connection_id") + @ApiModelProperty("Airflow Connection Id of Inlong Manager.") + private String connectionId; + + @JsonProperty("timezone") + @ApiModelProperty("The timezone.") + private String timezone; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java new file mode 100644 index 00000000000..3eb76fd6773 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.airflow; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.math.BigDecimal; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) compliant response. ") +public class Error { + + @JsonProperty("detail") + @ApiModelProperty("Error Details.") + private String detail; + + @JsonProperty("instance") + @ApiModelProperty("Error of the instance.") + private String instance; + + @JsonProperty("status") + @ApiModelProperty("Error of the status.") + private BigDecimal status; + + @JsonProperty("title") + @ApiModelProperty("Error of the title.") + private String title; + + @JsonProperty("type") + @ApiModelProperty("Error of the type.") + private String type; +} diff --git a/inlong-manager/manager-schedule/pom.xml b/inlong-manager/manager-schedule/pom.xml index a9d9fb3e1ed..cccc72472f8 100644 --- a/inlong-manager/manager-schedule/pom.xml +++ b/inlong-manager/manager-schedule/pom.xml @@ -73,5 +73,29 @@ junit-jupiter test + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.springframework.boot + spring-boot-starter-test + + + com.vaadin.external.google + android-json + + + com.jayway.jsonpath + json-path + + + org.junit.jupiter + junit-jupiter-api + + + diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java index 71949ef7445..f8be589d0f9 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java @@ -23,6 +23,7 @@ public enum ScheduleEngineType { NONE("None"), + AIRFLOW("Airflow"), QUARTZ("Quartz"); private final String type; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java new file mode 100644 index 00000000000..e328d8fd0a0 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +/** + * Contains constants for interacting with the Airflow API. + */ +public class AirFlowAPIConstant { + + public static final String DEFAULT_TIMEZONE = "Asia/Shanghai"; + public static final String INLONG_OFFLINE_DAG_TASK_PREFIX = "inlong_offline_task_"; + public static final String SUBMIT_OFFLINE_JOB_URI = "/inlong/manager/api/group/submitOfflineJob"; + + // AirflowConnection + public static final String LIST_CONNECTIONS_URI = "/api/v1/connections"; + public static final String GET_CONNECTION_URI = "/api/v1/connections/{connection_id}"; + + // DAG + public static final String LIST_DAGS_URI = "/api/v1/dags"; + public static final String UPDATE_DAG_URI = "/api/v1/dags/{dag_id}"; + + // DAGRun + public static final String TRIGGER_NEW_DAG_RUN_URI = "/api/v1/dags/{dag_id}/dagRuns"; + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java new file mode 100644 index 00000000000..bbb8e59149a --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.schedule.ScheduleEngineType; + +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * Built-in implementation of schedule engine client corresponding with {@link AirflowScheduleEngine}. + * AirflowScheduleClient simply invokes the {@link AirflowScheduleEngine} to register/unregister/update + * schedule info instead of calling a remote schedule service. + * */ +@Service +public class AirflowScheduleClient implements ScheduleEngineClient { + + @Resource + public AirflowScheduleEngine scheduleEngine; + + @Override + public boolean accept(String engineType) { + return ScheduleEngineType.AIRFLOW.getType().equalsIgnoreCase(engineType); + } + + @Override + public boolean register(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleRegister(scheduleInfo); + } + + @Override + public boolean unregister(String groupId) { + return scheduleEngine.handleUnregister(groupId); + } + + @Override + public boolean update(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleUpdate(scheduleInfo); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java new file mode 100644 index 00000000000..792307e6aeb --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.common.bounded.BoundaryType; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; +import org.apache.inlong.manager.pojo.schedule.airflow.DAG; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun; +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRunConf; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse; +import org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionCreator; +import org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionGetter; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGCollectionUpdater; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGDeletor; +import org.apache.inlong.manager.schedule.airflow.api.dag.DAGUpdater; +import org.apache.inlong.manager.schedule.airflow.api.dagruns.DAGRunsTrigger; +import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig; +import org.apache.inlong.manager.schedule.airflow.util.DAGUtil; +import org.apache.inlong.manager.schedule.airflow.util.DateUtil; +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import com.google.common.collect.ImmutableMap; +import org.apache.mina.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX; +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.DAG_DUPLICATE; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.INIT_CONNECTION_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_ENGINE_SHUTDOWN_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_REGISTER_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_UPDATE_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.TASK_DAG_SWITCH_FAILED; + +/** + * Response for processing the start/register/unregister/update/stop requests from {@link AirflowScheduleClient} + */ +@Service +public class AirflowScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(AirflowScheduleEngine.class); + private final Set scheduledJobSet = new ConcurrentHashSet<>(); + private AirflowServerClient serverClient; + private AirflowConfig airflowConfig; + + public AirflowScheduleEngine(AirflowServerClient serverClient, AirflowConfig airflowConfig) { + this.serverClient = serverClient; + this.airflowConfig = airflowConfig; + start(); + } + + @Override + public void start() { + try { + // Create authentication information for the Inlong Manger API used by AirFlow + initConnection(); + // Check if DagCleaner and DagCreator exist and unpause them + switchOriginalDAG(false); + // Start all task DAGs and load all DAG ID(Group Id) into the local cache + switchAllTaskDAG(false); + LOGGER.info("Airflow initialization succeeded."); + } catch (Exception e) { + LOGGER.error("Airflow initialization failed.", e); + } + } + + private void initConnection() throws Exception { + LOGGER.info("Initializing Inlong Manager AirflowConnection for Airflow ... "); + // Check if Airflow has the Inlong AirflowConnection + AirflowResponse response = serverClient.sendRequest( + new AirflowConnectionGetter(airflowConfig.getConnectionId())); + if (!response.isSuccess()) { + AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "", + airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, + airflowConfig.getPort(), airflowConfig.getInlongPassword(), ""); + response = serverClient.sendRequest(new AirflowConnectionCreator(newConn)); + LOGGER.info("AirflowConnection registration response: {}", response.toString()); + if (!response.isSuccess()) { + LOGGER.error("Initialization connection failed."); + throw new AirflowScheduleException(INIT_CONNECTION_FAILED, "Initialization connection failed."); + } + } + } + + private void switchOriginalDAG(boolean isPaused) { + for (String dagId : Arrays.asList(airflowConfig.getDagCleanerId(), airflowConfig.getDagCreatorId())) { + try { + AirflowResponse response = serverClient.sendRequest(new DAGUpdater(dagId, isPaused)); + LOGGER.info("Response to {} the original DAG : {}", isPaused ? "stop" : "start", response.toString()); + if (!response.isSuccess()) { + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("%s does not exist or failed to %s.", dagId, (isPaused ? "stop" : "start"))); + } + } catch (Exception e) { + LOGGER.error("The original DAG {} failed.", isPaused ? "stop" : "start", e); + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("The original DAG %s failed: %s.", isPaused ? "stop" : "start", e.getMessage())); + } + } + } + + private void switchAllTaskDAG(boolean isPaused) { + try { + AirflowResponse response = serverClient.sendRequest( + new DAGCollectionUpdater(INLONG_OFFLINE_DAG_TASK_PREFIX, isPaused)); + LOGGER.info("Response to {} task DAG : {}", isPaused ? "stop" : "start", response.toString()); + if (!response.isSuccess()) { + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("Failed to %s task DAGs.", isPaused ? "stop" : "start")); + } + if (!isPaused) { + List dagList = response.getData().getDags(); + if (dagList != null) { + dagList.forEach(dag -> scheduledJobSet + .add(dag.getDagId().substring(INLONG_OFFLINE_DAG_TASK_PREFIX.length() - 1))); + } + } + } catch (Exception e) { + LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : "start", e); + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, + String.format("Failed to %s task DAGs: %s", isPaused ? "stop" : "start", e.getMessage())); + } + } + + @Override + public boolean handleRegister(ScheduleInfo scheduleInfo) { + try { + LOGGER.info("Registering DAG for {}", scheduleInfo.getInlongGroupId()); + return doRegister(scheduleInfo, true); + } catch (Exception e) { + LOGGER.error("The Airflow scheduling task with Group ID {} failed to register.", + scheduleInfo.getInlongGroupId(), e); + throw new AirflowScheduleException(SCHEDULE_TASK_REGISTER_FAILED, + String.format("The Airflow scheduling task with Group ID %s failed to register: %s", + scheduleInfo.getInlongGroupId(), e.getMessage())); + } + } + + @Override + public boolean handleUnregister(String groupId) { + LOGGER.info("Unregistering Airflow Dag with GroupId {} ", groupId); + if (scheduledJobSet.contains(groupId)) { + try { + if (!completelyDelete(DAGUtil.buildDAGIdByGroupId(groupId))) { + return false; + } + } catch (Exception e) { + LOGGER.warn("May not be completely removed {}", groupId, e); + } + } + scheduledJobSet.remove(groupId); + LOGGER.info("Un-registered airflow schedule info for {}", groupId); + return true; + } + + private boolean completelyDelete(String groupId) throws Exception { + // Trigger the removal of the DAG file for the Cleaner DAG + DAGRunConf dagRunConf = DAGRunConf.builder() + .inlongGroupId(DAGUtil.buildDAGIdByGroupId(groupId)).build(); + AirflowResponse response = serverClient.sendRequest( + new DAGRunsTrigger(airflowConfig.getDagCleanerId(), ImmutableMap.of("conf", dagRunConf))); + LOGGER.info("Response to DAG file clearing: {}", response.toString()); + if (!response.isSuccess()) { + LOGGER.warn("Failed to delete DAG file corresponding to {}.", groupId); + return false; + } + // Delete DAG tasks that have been loaded into memory + AirflowResponse deleteResponse = serverClient.sendRequest(new DAGDeletor(groupId)); + LOGGER.info("Response to DAG scheduling instance clearing: {}", deleteResponse.toString()); + if (!deleteResponse.isSuccess()) { + LOGGER.warn("Failed to delete DAG instance corresponding to {}.", groupId); + } + return deleteResponse.isSuccess(); + } + + @Override + public boolean handleUpdate(ScheduleInfo scheduleInfo) { + try { + LOGGER.info("Updating DAG for {}", scheduleInfo.getInlongGroupId()); + return doRegister(scheduleInfo, false); + } catch (Exception e) { + LOGGER.error("The Airflow scheduling task with Group ID {} failed to update.", + scheduleInfo.getInlongGroupId(), e); + throw new AirflowScheduleException(SCHEDULE_TASK_UPDATE_FAILED, + String.format("The Airflow scheduling task with Group ID %s failed to update: %s", + scheduleInfo.getInlongGroupId(), e.getMessage())); + } + } + + public boolean doRegister(ScheduleInfo scheduleInfo, boolean isFirst) throws Exception { + if (isFirst && scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { + throw new AirflowScheduleException(DAG_DUPLICATE, + String.format("Group %s is already registered", scheduleInfo.getInlongGroupId())); + } + DAGRunConf.DAGRunConfBuilder confBuilder = DAGRunConf.builder() + .inlongGroupId(scheduleInfo.getInlongGroupId()) + .startTime(scheduleInfo.getStartTime().getTime()) + .endTime(scheduleInfo.getEndTime().getTime()) + .boundaryType(BoundaryType.TIME.getType()) + .connectionId(airflowConfig.getConnectionId()) + .timezone(DEFAULT_TIMEZONE); + if (scheduleInfo.getScheduleType() == 1) { + confBuilder = confBuilder.cronExpr(scheduleInfo.getCrontabExpression()); + } else { + confBuilder = confBuilder.secondsInterval(DateUtil.intervalToSeconds(scheduleInfo.getScheduleInterval(), + scheduleInfo.getScheduleUnit())) + .startTime(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()) == ScheduleUnit.ONE_ROUND + ? scheduleInfo.getEndTime().getTime() + : scheduleInfo.getStartTime().getTime()); + } + DAGRunConf dagRunConf = confBuilder.build(); + AirflowResponse response = serverClient.sendRequest( + new DAGRunsTrigger(airflowConfig.getDagCreatorId(), ImmutableMap.of("conf", dagRunConf))); + LOGGER.info("DAG {} response: {}", isFirst ? "registration" : "update", response.toString()); + if (response.isSuccess()) { + scheduledJobSet.add(scheduleInfo.getInlongGroupId()); + } + return response.isSuccess(); + } + + @Override + public void stop() { + try { + switchOriginalDAG(true); + switchAllTaskDAG(true); + } catch (Exception e) { + LOGGER.error("Airflow Schedule Engine shutdown failed: ", e); + throw new AirflowScheduleException(SCHEDULE_ENGINE_SHUTDOWN_FAILED, + String.format("Airflow Schedule Engine shutdown failed: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java new file mode 100644 index 00000000000..be67a364758 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.pojo.schedule.airflow.Error; +import org.apache.inlong.manager.schedule.airflow.api.AirflowApi; +import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse; +import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A unified class used for Airflow RESTful API processing. + */ +public class AirflowServerClient { + + private static final Logger logger = LoggerFactory.getLogger(AirflowServerClient.class); + private final OkHttpClient httpClient; + private final AirflowConfig config; + private final ObjectMapper objectMapper; + + public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config) { + this.httpClient = httpClient; + this.config = config; + this.objectMapper = new ObjectMapper(); + } + + /** + * Send request and parse response + * + * @param apiEndpoint apiEndpoint + * @param Response to Generic Types + * @return Parsed response object + * @throws IOException Network request exception + */ + public AirflowResponse sendRequest(AirflowApi apiEndpoint) throws IOException { + Request request = apiEndpoint.buildRequest(config.getBaseUrl()); + try (Response response = httpClient.newCall(request).execute()) { + String responseBody = response.body().string(); + if (response.isSuccessful()) { + return new AirflowResponse<>(true, objectMapper.readValue(responseBody, apiEndpoint.getResponseType())); + } else { + logger.error("Airflow Web API Request failed, status code: {} , detail: {}", + response.code(), objectMapper.readValue(responseBody, Error.class).getDetail()); + return new AirflowResponse<>(false, null); + } + } + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java new file mode 100644 index 00000000000..4ff1a3284df --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api; + +import okhttp3.Request; +import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; + +import java.util.Map; +/** + * Represents a generic interface for defining and constructing API requests to interact with Airflow. + * This interface provides methods for specifying HTTP methods, endpoint paths, parameters, + * request bodies, and constructing complete requests. + * @param the type of the response expected from the API, allowing flexibility for various response types. + */ +public interface AirflowApi { + + /** + * Get HTTP Method + * @return HTTP Method + */ + HttpMethod getMethod(); + + /** + * Get the requested path (relative to baseUrl) + * @return Request path + */ + String getPath(); + + /** + * Get path parameters to replace placeholders in the path (e.g. : "/api/v1/dags/{dag_id}/dagRuns") + * @return Path parameter map + */ + Map getPathParams(); + + /** + * Get query parameters (e.g. "?Key=value") + * @return GET parameter map + */ + Map getQueryParams(); + + /** + * Get the request body (applicable to methods such as POST, PUT, etc.) + * @return Post RequestBody Object + */ + RequestBody getRequestBody(); + + /** + * Constructing a complete Request object + * @param baseUrl Base URL + * @return Constructed Request object + */ + Request buildRequest(String baseUrl); + + /** + * Returns the type of the response expected from this method. + * @return The expected response type. + */ + Class getResponseType(); +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java new file mode 100644 index 00000000000..60e0ef63668 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api; + +/** + * A generic response wrapper for handling responses from Airflow services. + * @param the type of data included in the response, allowing flexibility for various data types. + */ +public class AirflowResponse { + + private final boolean success; + private final T data; + + public AirflowResponse(boolean success, T data) { + this.success = success; + this.data = data; + } + + public boolean isSuccess() { + return success; + } + + public T getData() { + return data; + } + + @Override + public String toString() { + return "AirflowResponse{" + + "success=" + success + + ", data=" + data + + '}'; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java new file mode 100644 index 00000000000..18a1ed5206a --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api; + +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; + +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED; + +/** + * The basic implementation of Airflow API interface. + * + * @param the type of the response expected from the API, allowing flexibility for various response types. + */ + +@Slf4j +public abstract class BaseAirflowApi implements AirflowApi { + + protected static final ObjectMapper objectMapper = new ObjectMapper(); + protected Map pathParams = Maps.newHashMap(); + protected Map queryParams = Maps.newHashMap(); + protected Map requestBodyParams = Maps.newHashMap(); + + @Override + public abstract HttpMethod getMethod(); + + @Override + public abstract String getPath(); + + @Override + public abstract Class getResponseType(); + + @Override + public Map getPathParams() { + return pathParams; + } + + @Override + public Map getQueryParams() { + return queryParams; + } + + /** + * Create JSON request body + * @return RequestBody Object + */ + @Override + public RequestBody getRequestBody() { + try { + return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), + objectMapper.writeValueAsString(requestBodyParams)); + } catch (Exception e) { + log.error("Airflow request body construction failed: {}", e.getMessage(), e); + throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED, + String.format("Airflow request body construction failed: %s", e.getMessage())); + } + } + + @Override + public Request buildRequest(String baseUrl) { + // Build a complete URL + String path = buildPathParams(getPath(), getPathParams()); + String url = baseUrl + path; + + // Add query parameters + if (!getQueryParams().isEmpty()) { + String queryString = buildQueryString(getQueryParams()); + url += "?" + queryString; + } + + // Build Request Builder + Request.Builder builder = new Request.Builder().url(url); + + // Set requests based on HTTP methods + switch (getMethod()) { + case GET: + builder.get(); + break; + case POST: + builder.post(getRequestBody()); + break; + case PATCH: + builder.patch(getRequestBody()); + break; + case PUT: + builder.put(getRequestBody()); + break; + case DELETE: + if (!requestBodyParams.isEmpty()) { + builder.delete(getRequestBody()); + } else { + builder.delete(); + } + break; + default: + throw new IllegalArgumentException("Unsupported HTTP method: " + getMethod()); + } + return builder.build(); + } + + private String buildPathParams(String path, Map pathParams) { + for (Map.Entry entry : pathParams.entrySet()) { + path = path.replace("{" + entry.getKey() + "}", entry.getValue()); + } + return path; + } + + private String buildQueryString(Map queryParams) { + StringBuilder sb = new StringBuilder(); + // Multiple values can be specified for the same parameter name in the Get parameter. + // (e.g. "?Key=value1&Key=value2") + queryParams.forEach((key, value) -> { + if (value instanceof List) { + ((List) value).forEach(item -> sb.append(key).append("=").append(item).append("&")); + } else { + sb.append(key).append("=").append(value).append("&"); + } + }); + if (sb.length() > 0) { + sb.setLength(sb.length() - 1); + } + return sb.toString(); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java new file mode 100644 index 00000000000..1e5b7d737cf --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.connection; + +import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; + +import java.util.Map; + +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED; + +/** + * Build call for AirflowConnectionGetter
+ * + * + * + * + * + * + * + * + * + * + * + *
Request Body Param Description
connection_id The connection ID.
conn_type The connection type.
description The description of the connection.
host Host of the connection.
login Login of the connection.
schema Schema of the connection.
port Port of the connection.
password Password of the connection.
extra Other values that cannot be put into another field, e.g. RSA keys.(optional)
+ * + * @http.response.details + * + * + * + * + * + *
Status Code Description
200 Success.
400 Client specified an invalid argument.
401 Request not authenticated due to missing, invalid, authentication info.
403 Client does not have sufficient permission.
+ */ +@Slf4j +public class AirflowConnectionCreator extends BaseAirflowApi { + + AirflowConnection connection = null; + + public AirflowConnectionCreator(AirflowConnection connection) { + this.connection = connection; + } + + public AirflowConnectionCreator(Map requestBodyParams) { + this.requestBodyParams = requestBodyParams; + } + + @Override + public RequestBody getRequestBody() { + if (connection != null) { + try { + return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), + objectMapper.writeValueAsString(connection)); + } catch (Exception e) { + log.error("Airflow request body construction failed: {}", e.getMessage(), e); + throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED, + String.format("Airflow request body construction failed: %s", e.getMessage())); + } + } + return super.getRequestBody(); + } + + @Override + public Class getResponseType() { + return AirflowConnection.class; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.POST; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.LIST_CONNECTIONS_URI; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java new file mode 100644 index 00000000000..7dc278ae0b3 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.connection; + +import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +/** + * Build call for AirflowConnectionGetter
+ * + * + * + *
Path Param Description
connection_id The connection ID.
+ * + * @http.response.details + * + * + * + * + * + *
Status Code Description
200 Success.
401 Request not authenticated due to missing, invalid, authentication info.
403 Client does not have sufficient permission.
404 A specified resource is not found.
+ */ +public class AirflowConnectionGetter extends BaseAirflowApi { + + public AirflowConnectionGetter(String connectionId) { + pathParams.put("connection_id", connectionId); + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.GET; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.GET_CONNECTION_URI; + } + + @Override + public Class getResponseType() { + return AirflowConnection.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java new file mode 100644 index 00000000000..039a18d9d84 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dag; + +import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +import java.util.Map; + +/** + * Build call for DAGCollectionUpdater< br> + * + * + * + * + * + * + * + * + *
GET Param Description
limit The numbers of items to return. (optional, default to 100)
offset The number of items to skip before starting to collect the result set. (optional)
tags List of tags to filter results.(optional)
update_mask The fields to update on the resource. If absent or empty, all modifiable fields are updated. A comma-separated list of fully qualified names of fields.(optional)
only_active Only filter active DAGs. (optional, default to true)
dag_id_pattern If set, only return DAGs with dag_ids matching this pattern. (required)
+ * + * + * + * + *
Request Body Param Description
is_paused Whether the DAG is paused.
+ * + * @http.response.details + * + * + * + * + * + *
Status Code Description
200 Success.
401 Request not authenticated due to missing, invalid, authentication info.
403 Client does not have sufficient permission.
404 A specified resource is not found.
+ */ +public class DAGCollectionUpdater extends BaseAirflowApi { + + public DAGCollectionUpdater(String dagIdPattern, boolean isPaused) { + this.queryParams.put("dag_id_pattern", dagIdPattern); + this.requestBodyParams.put("is_paused", isPaused); + } + + public DAGCollectionUpdater(Map queryParams, Map requestBodyParams) { + this.queryParams = queryParams; + this.requestBodyParams = requestBodyParams; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.PATCH; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.LIST_DAGS_URI; + } + + @Override + public Class getResponseType() { + return DAGCollection.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java new file mode 100644 index 00000000000..23a348d766a --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dag; + +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +/** + * Build call for DAGDeleter< br> + * + * + * + *
Path Param Description
dag_id The DAG ID.
+ * + * + * + * + *
GET Param Description
update_mask The fields to update on the resource. If absent or empty, all modifiable fields are updated. A comma-separated list of fully qualified names of fields.(optional)
+ * + * + * + * + *
Request Body Param Description
is_paused Whether the DAG is paused.
+ * + * @http.response.details + * + * + * + * + * + *
Status Code Description
200 Success.
401 Request not authenticated due to missing, invalid, authentication info.
403 Client does not have sufficient permission.
404 A specified resource is not found.
+ */ +public class DAGDeletor extends BaseAirflowApi { + + public DAGDeletor(String dagId) { + this.pathParams.put("dag_id", dagId); + } + @Override + public HttpMethod getMethod() { + return HttpMethod.DELETE; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.UPDATE_DAG_URI; + } + + @Override + public Class getResponseType() { + return Object.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java new file mode 100644 index 00000000000..be8313f1b15 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dag; + +import org.apache.inlong.manager.pojo.schedule.airflow.DAG; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +/** + * Build call for DAGUpdater< br> + * + * + * + *
Path Param Description
dag_id The DAG ID.
+ * + * + * + * + *
GET Param Description
update_mask The fields to update on the resource. If absent or empty, all modifiable fields are updated. A comma-separated list of fully qualified names of fields.(optional)
+ * + * + * + * + *
Request Body Param Description
is_paused Whether the DAG is paused.
+ * + * @http.response.details + * + * + * + * + * + *
Status Code Description
200 Success.
401 Request not authenticated due to missing, invalid, authentication info.
403 Client does not have sufficient permission.
404 A specified resource is not found.
+ */ +public class DAGUpdater extends BaseAirflowApi { + + public DAGUpdater(String dagId, boolean isPaused) { + this.pathParams.put("dag_id", dagId); + this.requestBodyParams.put("is_paused", isPaused); + } + + public DAGUpdater(String dagId, String updateMask, boolean isPaused) { + this.pathParams.put("dag_id", dagId); + this.queryParams.put("update_mask", updateMask); + this.requestBodyParams.put("is_paused", isPaused); + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.PATCH; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.UPDATE_DAG_URI; + } + + @Override + public Class getResponseType() { + return DAG.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java new file mode 100644 index 00000000000..b9fe7b22600 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.api.dagruns; + +import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun; +import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; +import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; + +import org.springframework.http.HttpMethod; + +import java.util.Map; + +/** + * Build call for DAGRunsTrigger
+ * + * + * + *
Path Param Description
dag_id The DAG ID.
+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Request Body Param Description
conf + * JSON object describing additional configuration parameters.
+ * The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.
+ *
dag_run_id Run ID.
+ * The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.
+ * If not provided, a value will be generated based on execution_date.
+ * If the specified dag_run_id is in use, the creation request fails with an ALREADY_EXISTS error.
+ * This together with DAG_ID are a unique key.
+ *
data_interval_end The end of the interval the DAG run covers.
data_interval_start The beginning of the interval the DAG run covers.
logical_date + * The logical date (previously called execution date). This is the time or interval covered by this DAG run, according to the DAG definition.
+ * The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.
+ * This together with DAG_ID are a unique key.
+ *
note Contains manually entered notes by the user about the DagRun.
+ * + * @http.response.details + * + * + * + * + * + *
Status Code Description
200 Success.
401 Request not authenticated due to missing, invalid, authentication info.
403 Client does not have sufficient permission.
404 A specified resource is not found.
+ */ + +public class DAGRunsTrigger extends BaseAirflowApi { + + public DAGRunsTrigger(String dagId) { + this.pathParams.put("dag_id", dagId); + } + + public DAGRunsTrigger(String dagId, Map requestBodyParams) { + this.pathParams.put("dag_id", dagId); + this.requestBodyParams = requestBodyParams; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.POST; + } + + @Override + public String getPath() { + return AirFlowAPIConstant.TRIGGER_NEW_DAG_RUN_URI; + } + + @Override + public Class getResponseType() { + return DAGRun.class; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java new file mode 100644 index 00000000000..60bb6673cec --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.config; + +import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.schedule.airflow.AirflowServerClient; +import org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor; +import org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import okhttp3.OkHttpClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class AirflowConfig extends ClientConfiguration { + + @Value("${schedule.engine.airflow.inlong.manager.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String inlongUsername; + + @Value("${default.admin.password:inlong}") + private String inlongPassword; + + @Value("${schedule.engine.airflow.connection.id:inlong_connection}") + private String connectionId; + + @Value("${schedule.engine.airflow.cleaner.id:dag_cleaner}") + private String dagCleanerId; + + @Value("${schedule.engine.airflow.creator.id:dag_creator}") + private String dagCreatorId; + + @Value("${schedule.engine.airflow.username:airflow}") + private String airflowUsername; + + @Value("${schedule.engine.airflow.password:airflow}") + private String airflowPassword; + + @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}") + private String baseUrl; + + @Bean + public OkHttpClient okHttpClient() { + return new OkHttpClient.Builder() + .addInterceptor(new AirflowAuthInterceptor(this.getAirflowUsername(), this.getAirflowPassword())) + .addInterceptor(new LoggingInterceptor()) + .connectTimeout(this.getConnectTimeout(), this.getTimeUnit()) + .readTimeout(this.getReadTimeout(), this.getTimeUnit()) + .writeTimeout(this.getWriteTimeout(), this.getTimeUnit()) + .retryOnConnectionFailure(true) + .build(); + } + @Bean + public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) { + return new AirflowServerClient(okHttpClient, airflowConfig); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java new file mode 100644 index 00000000000..714614bf949 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.interceptor; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.Interceptor; +import okhttp3.Request; +import okhttp3.Response; + +import java.io.IOException; +import java.util.Base64; + +/** + * AirflowAuthInterceptor + * Before okhttp call a request, uniformly encapsulate the relevant parameters of authentication + */ +@Slf4j +public class AirflowAuthInterceptor implements Interceptor { + + // Airflow Authentication Header + private final String authHeader; + + public AirflowAuthInterceptor(String username, String password) { + String credentials = username + ":" + password; + this.authHeader = "Basic " + Base64.getEncoder().encodeToString(credentials.getBytes()); + } + + @Override + public Response intercept(Chain chain) throws IOException { + Request originalRequest = chain.request(); + Request.Builder requestBuilder = originalRequest + .newBuilder() + .header("Authorization", authHeader); + return chain.proceed(requestBuilder.build()); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java new file mode 100644 index 00000000000..c3028385b18 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.interceptor; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.Interceptor; +import okhttp3.Request; +import okhttp3.Response; + +import java.io.IOException; + +/** + * LoggingInterceptor + * Provide unified logging for okhttp + */ +@Slf4j +public class LoggingInterceptor implements Interceptor { + + @Override + public Response intercept(Chain chain) throws IOException { + Request request = chain.request(); + Response response = chain.proceed(request); + log.info("Airflow API request information - Address: {}, URI: {}, Request method: {}, Response status code: {}", + request.url(), request.url().uri(), request.method(), response.code()); + return response; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java new file mode 100644 index 00000000000..fad05f2116c --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.util; + +import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX; + +public class DAGUtil { + + public static String buildDAGIdByGroupId(String groupId) { + return INLONG_OFFLINE_DAG_TASK_PREFIX.concat(groupId); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java new file mode 100644 index 00000000000..950e334921c --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow.util; + +import org.apache.inlong.manager.schedule.ScheduleUnit; + +import java.math.BigInteger; +import java.util.Objects; + +public class DateUtil { + + public static String intervalToSeconds(long interval, String timeUnit) { + BigInteger seconds = new BigInteger(String.valueOf(interval)); + String intervalStr = ""; + switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(timeUnit))) { + case SECOND: + intervalStr = "1"; + break; + case MINUTE: + intervalStr = "60"; + break; + case HOUR: + intervalStr = "3600"; + break; + case DAY: + intervalStr = "86400"; + break; + case WEEK: + intervalStr = "604800"; + break; + case MONTH: + intervalStr = "2592000"; + break; + case YEAR: + intervalStr = "31536000"; + break; + default: + throw new IllegalArgumentException("Unsupported time unit"); + } + return seconds.multiply(new BigInteger(intervalStr)).toString(); + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java new file mode 100644 index 00000000000..6b6830fb304 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.exception; + +/** + * Represents exceptions specific to the Airflow scheduling process. + * Each exception is associated with a specific error code for better identification. + */ +public class AirflowScheduleException extends RuntimeException { + + /** + * Enum to define all error codes associated with Airflow scheduling exceptions. + */ + public enum AirflowErrorCode { + INIT_CONNECTION_FAILED, + TASK_DAG_SWITCH_FAILED, + SCHEDULE_TASK_REGISTER_FAILED, + SCHEDULE_TASK_UPDATE_FAILED, + SCHEDULE_ENGINE_SHUTDOWN_FAILED, + BUILD_REQUEST_BODY_FAILED, + DAG_DUPLICATE + } + + private AirflowErrorCode errorCode; + + public AirflowScheduleException(String message) { + super(message); + } + public AirflowScheduleException(AirflowErrorCode errorCode, String message) { + super(message); + this.errorCode = errorCode; + } + + public AirflowScheduleException(AirflowErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public AirflowErrorCode getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + return String.format("ErrorCode: %s, Message: %s", errorCode, getMessage()); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java new file mode 100644 index 00000000000..2e7d4675816 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.testcontainers.containers.ContainerState; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +@Slf4j +public class AirflowContainerEnv { + + public static String BASE_URL = "http://localhost:8080"; + public static String AIRFLOW_USERNAME = "airflow"; + public static String AIRFLOW_PASSWORD = "airflow"; + public static String NORMAL_POSTFIX = "_normal"; + public static String CORN_POSTFIX = "_cron"; + public static String AIRFLOW_SCHEDULER_CONTAINER_NAME = "airflow-scheduler_1"; + public static String DOCKER_COMPOSE_YAML_PATH = "src/test/resources/airflow/docker-compose.yaml"; + public static String DEFAULT_DAGS_PATH = "/opt/airflow/dags/"; + + private static DockerComposeContainer environment; + private static OkHttpClient httpClient = new OkHttpClient(); + + public static void setUp() { + // Step 1: Start only the airflow-init service + environment = new DockerComposeContainer<>(new File(DOCKER_COMPOSE_YAML_PATH)) + .withServices("airflow-init") + .withEnv("AIRFLOW_UID", "$(id -u)"); + // Start the environment + environment.start(); + // Step 2: Wait until the "airflow-init" service has completed initialization + // Once initialized, stop the init-only environment and start the full environment + environment.stop(); + // Step 3: Start all services in detached mode after initialization + environment = new DockerComposeContainer<>(new File(DOCKER_COMPOSE_YAML_PATH)) + .withEnv("AIRFLOW_UID", "0") + .withEnv("AIRFLOW__CORE__LOAD_EXAMPLES", "false") + .withEnv("AIRFLOW__API__AUTH_BACKEND", + "airflow.providers.fab.auth_manager.api.auth.backend.basic_auth"); + environment.start(); + copyTestDAGs(); + waitForDAGsLoad("dag_cleaner"); + log.info("Airflow runtime environment created successfully."); + } + + private static void copyTestDAGs() { + // After the DAG file is created, the scheduler will regularly scan the DAG file directory and + // then load it into memory for scheduling. In order to quickly test the update and unregister, two + // test DAGs need to be loaded at the beginning. + Optional container = environment.getContainerByServiceName(AIRFLOW_SCHEDULER_CONTAINER_NAME); + if (container.isPresent()) { + ContainerState airflowScheduler = container.get(); + Path dagPath1 = Paths.get("src/test/resources/airflow/dag_cleaner.py").toAbsolutePath(); + Path dagPath2 = Paths.get("src/test/resources/airflow/dag_creator.py").toAbsolutePath(); + Path dagPath3 = Paths.get("src/test/resources/airflow/testGroup_cron.py").toAbsolutePath(); + Path dagPath4 = Paths.get("src/test/resources/airflow/testGroup_normal.py").toAbsolutePath(); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath1), + DEFAULT_DAGS_PATH.concat("dag_cleaner.py")); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath2), + DEFAULT_DAGS_PATH.concat("dag_creator.py")); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath3), + DEFAULT_DAGS_PATH.concat("testGroup_cron.py")); + airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath4), + DEFAULT_DAGS_PATH.concat("testGroup_normal.py")); + try { + String result = + airflowScheduler.execInContainer("bash", "-c", "ls ".concat(DEFAULT_DAGS_PATH)).getStdout(); + log.info(DEFAULT_DAGS_PATH.concat(" has file: {}"), result); + } catch (Exception e) { + log.warn(String.format( + "Copying the test DAG file may have failed. Docker Container command(\"%s\") execution failed.", + "ls ".contains(DEFAULT_DAGS_PATH)), e); + } + } else { + log.error(String.format("Copying test DAG file failed. Airflow scheduler container(%s) does not exist.", + AIRFLOW_SCHEDULER_CONTAINER_NAME)); + throw new AirflowScheduleException("Copying test DAG file failed."); + } + log.info("Copy test DAG file successfully."); + } + + public static void waitForDAGsLoad(String dagId) { + int total = 10; + // Waiting for Airflow to load the initial DAG + while (total > 0) { + String credential = okhttp3.Credentials.basic(AIRFLOW_USERNAME, AIRFLOW_PASSWORD); + Request request = new Request.Builder() + .url(BASE_URL + "/api/v1/dags/" + dagId + "/details") + .header("Authorization", credential) + .build(); + try (Response response = httpClient.newCall(request).execute()) { + if (response.code() == 200) { + break; + } + } catch (Exception e) { + log.error("The request to check if the original DAG exists failed: {}", e.getMessage(), e); + } + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + total--; + } + log.info("DAG successfully loaded."); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java new file mode 100644 index 00000000000..2e4213f56fe --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.airflow; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.BaseScheduleTest; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; + +import static org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.CORN_POSTFIX; +import static org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.NORMAL_POSTFIX; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Slf4j +@EnableConfigurationProperties +@ComponentScan(basePackages = "org.apache.inlong.manager") +@SpringBootTest(classes = AirflowScheduleEngineTest.class) +public class AirflowScheduleEngineTest { + + @Autowired + private AirflowScheduleEngine scheduleEngine; + private static BaseScheduleTest baseScheduleTest = new BaseScheduleTest(); + + @BeforeAll + public static void initScheduleEngine() { + try { + AirflowContainerEnv.setUp(); + } catch (Exception e) { + log.error("Airflow runtime environment creation failed.", e); + throw new RuntimeException( + String.format("Airflow runtime environment creation failed: %s", e.getMessage())); + } + } + + @Test + @Order(1) + public void testRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo(); + String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX + System.currentTimeMillis(); + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleRegister(scheduleInfo)); + + // 2. test for cron schedule + scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo(); + groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX + System.currentTimeMillis(); + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleRegister(scheduleInfo)); + } + + @Test + @Order(2) + public void testUpdateScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo(); + String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUpdate(scheduleInfo)); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo(); + groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUpdate(scheduleInfo)); + } + + @Test + @Order(3) + public void testUnRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo(); + String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId())); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo(); + groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX; + scheduleInfo.setInlongGroupId(groupId); + assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId())); + } +} diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py new file mode 100644 index 00000000000..be20fe1bb15 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from airflow.models import Variable +from airflow.utils.dates import days_ago +from datetime import datetime +import os +import logging +import pytz +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +from airflow import configuration + +DAG_PATH = configuration.get('core', 'dags_folder') + "/" + + +def clean_expired_dags(**context): + original_time = context.get('execution_date') + target_timezone = pytz.timezone("Asia/Shanghai") + utc_time = original_time.astimezone(target_timezone) + current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + logging.info(f"Current time: {current_time}") + for dag_file in os.listdir(DAG_PATH): + if dag_file.endswith(".py") and dag_file.startswith("inlong_offline_task_"): + with open(DAG_PATH + dag_file, "r") as file: + line = file.readline() + while line and "end_offset_datetime_str" not in line: + line = file.readline() + end_date_str = None + if len(line.split("=")) > 1: + end_date_str = line.split("=")[1].strip().strip("\"") + logging.info(f"DAG end time: {end_date_str}") + if end_date_str: + try: + if str(current_time) > str(end_date_str): + dag_file_path = os.path.join(DAG_PATH, dag_file) + os.remove(dag_file_path) + # Optionally, delete the end_date variable + logging.info(f"Deleted expired DAG: {dag_file}") + except ValueError: + logging.error(f"Invalid date format for DAG {dag_file}: {end_date_str}") + + +default_args = { + 'owner': 'airflow', + 'start_date': datetime.now() - timedelta(minutes=5), + 'catchup': False, + 'tags': ["inlong"] +} + +dag = DAG( + 'dag_cleaner', + default_args=default_args, + schedule_interval="*/20 * * * *", + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id='clean_expired_dags', + python_callable=clean_expired_dags, + provide_context=True, + dag=dag, +) diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py new file mode 100644 index 00000000000..4034cf467c9 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +from airflow.models import Variable +import os +from datetime import datetime +from airflow.hooks.base_hook import BaseHook +from airflow import configuration + +DAG_PATH = configuration.get('core', 'dags_folder') + "/" +DAG_PREFIX = 'inlong_offline_task_' + +def create_dag_file(**context): + conf = context.get('dag_run').conf + print('conf: ', conf) + groupId = conf.get('inlong_group_id') + task_name = DAG_PREFIX + groupId + timezone = conf.get('timezone') + boundaryType = str(conf.get('boundary_type')) + start_time = int(conf.get('start_time')) + end_time = int(conf.get('end_time')) + cron_expr = conf.get('cron_expr') + seconds_interval = conf.get('seconds_interval') + schedule_interval = cron_expr + if cron_expr is None or len(cron_expr) == 0: + schedule_interval = f'timedelta(seconds={seconds_interval})' + else: + schedule_interval = '"' + cron_expr + '"' + connectionId = conf.get('connection_id') + dag_content = f'''from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +import requests +import pytz + +timezone = "{timezone}" +start_offset_datetime_str = {start_time} +end_offset_datetime_str = {end_time} +schedule_interval = {schedule_interval} # Or put cron expression +dag_id = "{task_name}" +groupId = "{groupId}" +connectionId = "{connectionId}" +boundaryType = "{boundaryType}" + +target_timezone = pytz.timezone(timezone) + +start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) +end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) + +def taskFunction(**context): + print("#########################") + conn = BaseHook.get_connection(connectionId) + url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}" + params = {{ + "username": conn.login, + "password": conn.password + }} + print("params", params) + headers = {{ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", + "Accept": "application/json", + "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + "Accept-Encoding": "gzip, deflate", + "Referer": "http://192.168.101.2:8083/", + "Content-Type": "application/json;charset=UTF-8", + "tenant": "public", + "Origin": "http://192.168.101.2", + "Connection": "close", + "Priority": "u=0" + }} + time_interval = get_time_interval(context) + data = {{ + "boundaryType": boundaryType, + "groupId": groupId, + "lowerBoundary": str(int(time_interval[0])), + "upperBoundary": str(int(int(time_interval[1]))) + }} + print("Request Body: ", data) + response = requests.post(url, params=params, headers=headers, json=data) + if response.status_code == 200: + print(response.json()) + else: + print(response.text) + print("#########################") + + +def get_time_interval(context): + execution_date = context.get('execution_date') + execution_date = execution_date.astimezone(target_timezone) + dag = context.get('dag') + schedule_interval = dag.schedule_interval + if isinstance(schedule_interval, timedelta): + return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() + else: + cron_expr = dag.schedule_interval + cron = croniter(cron_expr, execution_date) + next_run = cron.get_next(datetime) + return execution_date.timestamp(), next_run.timestamp() + + +default_args = {{ + 'owner': 'inlong', + 'start_date': start_date, + 'end_date': end_date, + 'catchup': False, +}} + +dag = DAG( + dag_id, + default_args=default_args, + schedule_interval=schedule_interval, + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id=dag_id, + python_callable=taskFunction, + provide_context=True, + dag=dag, +) + ''' + dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py') + with open(dag_file_path, 'w') as f: + f.write(dag_content) + print(f'Generated DAG file: {dag_file_path}') +default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup': False} +dag = DAG('dag_creator', default_args=default_args, schedule_interval=None, is_paused_upon_creation=False) +create_dag_task = PythonOperator(task_id='create_dag_file', python_callable=create_dag_file, provide_context=True, dag=dag) \ No newline at end of file diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml new file mode 100644 index 00000000000..c97195c03f7 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml @@ -0,0 +1,292 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.6.0 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +version: '3.8' +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0} + # build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + # For backward compatibility, with Airflow <2.3 + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'true' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if (( airflow_version_comparable < min_airflow_version_comparable )); then + echo + echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" + echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" + echo + exit 1 + fi + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py new file mode 100644 index 00000000000..b753eb7587c --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +import requests +import pytz + +timezone = "Asia/Shanghai" +start_offset_datetime_str = 1731072908243 +end_offset_datetime_str = 1731142800000 +schedule_interval = "*/1 * * * *" # Or put cron expression +dag_id = "inlong_offline_task_testGroup_cron" +groupId = "test_offline_1" +connectionId = "inlong_connection" +boundaryType = str("time") + +target_timezone = pytz.timezone(timezone) + +start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) +end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) + + +def taskFunction(**context): + print("#########################") + conn = BaseHook.get_connection(connectionId) + url = f"http://{conn.host}:{conn.port}/{conn.schema}" + params = { + "username": conn.login, + "password": conn.password + } + print("params", params) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", + "Accept": "application/json", + "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + "Accept-Encoding": "gzip, deflate", + "Referer": "http://192.168.101.2:8083/", + "Content-Type": "application/json;charset=UTF-8", + "tenant": "public", + "Origin": "http://192.168.101.2", + "Connection": "close", + "Priority": "u=0" + } + time_interval = get_time_interval(context) + data = { + "boundaryType": boundaryType, + "groupId": groupId, + "lowerBoundary": time_interval[0], + "upperBoundary": time_interval[1] + } + print("Request Body: ", data) + response = requests.post(url, params=params, headers=headers, json=data) + if response.status_code == 200: + print(response.json()) + else: + print(response.text) + print("#########################") + + +def get_time_interval(context): + execution_date = context.get('execution_date') + execution_date = execution_date.astimezone(target_timezone) + dag = context.get('dag') + schedule_interval = dag.schedule_interval + if isinstance(schedule_interval, timedelta): + return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() + else: + cron_expr = dag.schedule_interval + cron = croniter(cron_expr, execution_date) + next_run = cron.get_next(datetime) + return execution_date.timestamp(), next_run.timestamp() + + +default_args = { + 'owner': 'inlong', + 'start_date': start_date, + 'end_date': end_date, + 'catchup': False, +} + +dag = DAG( + dag_id, + default_args=default_args, + schedule_interval=schedule_interval, + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id=dag_id, + python_callable=taskFunction, + provide_context=True, + dag=dag, +) diff --git a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py new file mode 100644 index 00000000000..5666f9f471a --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.python_operator import PythonOperator +from datetime import datetime +from croniter import croniter +from airflow.hooks.base_hook import BaseHook +import requests +import pytz + +timezone = "Asia/Shanghai" +start_offset_datetime_str = 1731072908243 +end_offset_datetime_str = 1731142800000 +schedule_interval = "*/1 * * * *" # Or put cron expression +dag_id = "inlong_offline_task_testGroup_normal" +groupId = "test_offline_1" +connectionId = "inlong_connection" +boundaryType = str("time") + +target_timezone = pytz.timezone(timezone) + +start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, tz=target_timezone) +end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, tz=target_timezone) + + +def taskFunction(**context): + print("#########################") + conn = BaseHook.get_connection(connectionId) + url = f"http://{conn.host}:{conn.port}/{conn.schema}" + params = { + "username": conn.login, + "password": conn.password + } + print("params", params) + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0", + "Accept": "application/json", + "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", + "Accept-Encoding": "gzip, deflate", + "Content-Type": "application/json;charset=UTF-8", + "tenant": "public", + "Connection": "close", + "Priority": "u=0" + } + time_interval = get_time_interval(context) + data = { + "boundaryType": boundaryType, + "groupId": groupId, + "lowerBoundary": time_interval[0], + "upperBoundary": time_interval[1] + } + print("Request Body: ", data) + response = requests.post(url, params=params, headers=headers, json=data) + if response.status_code == 200: + print(response.json()) + else: + print(response.text) + print("#########################") + + +def get_time_interval(context): + execution_date = context.get('execution_date') + execution_date = execution_date.astimezone(target_timezone) + dag = context.get('dag') + schedule_interval = dag.schedule_interval + if isinstance(schedule_interval, timedelta): + return execution_date.timestamp(), (execution_date + schedule_interval).timestamp() + else: + cron_expr = dag.schedule_interval + cron = croniter(cron_expr, execution_date) + next_run = cron.get_next(datetime) + return execution_date.timestamp(), next_run.timestamp() + + +default_args = { + 'owner': 'inlong', + 'start_date': start_date, + 'end_date': end_date, + 'catchup': False, +} + +dag = DAG( + dag_id, + default_args=default_args, + schedule_interval=schedule_interval, + is_paused_upon_creation=False +) + +clean_task = PythonOperator( + task_id=dag_id, + python_callable=taskFunction, + provide_context=True, + dag=dag, +) diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 2bad5f801f7..c3b70017415 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -103,11 +103,18 @@ agent.install.temp.path=inlong/agent-installer-temp/ # The primary key id of the default agent module used default.module.id=1 -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none dirty.log.clean.enabled=false dirty.log.clean.interval.minutes=5 dirty.dirty.retention.minutes=10 -dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg + +# Airflow configuration +schedule.engine.airflow.baseUrl= +schedule.engine.airflow.username= +schedule.engine.airflow.password= +schedule.engine.airflow.connection.id= +# Please confirm if it is a loopback address +schedule.engine.airflow.inlong.manager.host= +schedule.engine.airflow.cleaner.id= +schedule.engine.airflow.creator.id= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 040c868bcf8..b8d11df3393 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -95,11 +95,17 @@ group.deleted.enabled=false # Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none - dirty.log.clean.enabled=false dirty.log.clean.interval.minutes=5 dirty.dirty.retention.minutes=10 -dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg + +# Airflow configuration +schedule.engine.airflow.baseUrl= +schedule.engine.airflow.username= +schedule.engine.airflow.password= +schedule.engine.airflow.connection.id= +# Please confirm if it is a loopback address +schedule.engine.airflow.inlong.manager.host= +schedule.engine.airflow.cleaner.id= +schedule.engine.airflow.creator.id= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 393eef6b05a..9f408a9d494 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -96,11 +96,17 @@ group.deleted.enabled=false # Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none - dirty.log.clean.enabled=false dirty.log.clean.interval.minutes=5 dirty.dirty.retention.minutes=10 -dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg \ No newline at end of file +dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg + +# Airflow configuration +schedule.engine.airflow.baseUrl= +schedule.engine.airflow.username= +schedule.engine.airflow.password= +schedule.engine.airflow.connection.id= +# Please confirm if it is a loopback address +schedule.engine.airflow.inlong.manager.host= +schedule.engine.airflow.cleaner.id= +schedule.engine.airflow.creator.id= \ No newline at end of file