diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java index 5469b4a66e..bbb03cec00 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -53,7 +53,7 @@ import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI; /** - * Response for processing the register/unregister/update requests from {@link AirflowScheduleClient} + * Response for processing the start/register/unregister/update/stop requests from {@link AirflowScheduleClient} */ @Service public class AirflowScheduleEngine implements ScheduleEngine { @@ -96,6 +96,7 @@ private void initConnection() throws Exception { response = serverClient.sendRequest(new AirflowConnectionCreator(newConn)); LOGGER.info("AirflowConnection registration response: {}", response.toString()); if (!response.isSuccess()) { + LOGGER.error("Initialization connection failed."); throw new AirflowScheduleException("Initialization connection failed."); } } @@ -112,7 +113,8 @@ private void switchOriginalDAG(boolean isPaused) { } } catch (Exception e) { LOGGER.error("The original DAG {} failed.", isPaused ? "stop" : "start", e); - throw new RuntimeException(e); + throw new AirflowScheduleException( + String.format("The original DAG %s failed: %s.", isPaused ? "stop" : "start", e.getMessage())); } } } @@ -135,7 +137,8 @@ private void switchAllTaskDAG(boolean isPaused) { } } catch (Exception e) { LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : "start", e); - throw new RuntimeException(e); + throw new AirflowScheduleException( + String.format("Failed to %s task DAGs: %s", isPaused ? "stop" : "start", e.getMessage())); } } @@ -147,7 +150,8 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) { } catch (Exception e) { LOGGER.error("The Airflow scheduling task with Group ID {} failed to register.", scheduleInfo.getInlongGroupId(), e); - throw new AirflowScheduleException(e.getMessage(), e); + throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " + + "register: %s", scheduleInfo.getInlongGroupId(), e.getMessage())); } } @@ -196,7 +200,8 @@ public boolean handleUpdate(ScheduleInfo scheduleInfo) { } catch (Exception e) { LOGGER.error("The Airflow scheduling task with Group ID {} failed to update.", scheduleInfo.getInlongGroupId(), e); - throw new AirflowScheduleException(e.getMessage(), e); + throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " + + "update: %s", scheduleInfo.getInlongGroupId(), e.getMessage())); } } @@ -237,7 +242,8 @@ public void stop() { switchAllTaskDAG(true); } catch (Exception e) { LOGGER.error("Airflow Schedule Engine shutdown failed: ", e); - throw new AirflowScheduleException(e.getMessage(), e); + throw new AirflowScheduleException( + String.format("Airflow Schedule Engine shutdown failed: %s", e.getMessage())); } } } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java index 8451fcfbce..be67a36475 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java @@ -41,10 +41,10 @@ public class AirflowServerClient { private final AirflowConfig config; private final ObjectMapper objectMapper; - public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config, ObjectMapper objectMapper) { + public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config) { this.httpClient = httpClient; this.config = config; - this.objectMapper = objectMapper; + this.objectMapper = new ObjectMapper(); } /** diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java index 8829e9a52a..1b0fab55a0 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java @@ -68,5 +68,9 @@ public interface AirflowApi { */ Request buildRequest(String baseUrl); + /** + * Returns the type of the response expected from this method. + * @return The expected response type. + */ Class getResponseType(); } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java index 3f0979110d..697c9bc1fc 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java @@ -20,8 +20,8 @@ import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; -import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import okhttp3.MediaType; import okhttp3.Request; @@ -32,13 +32,14 @@ /** * The basic implementation of Airflow API interface. + * * @param the type of the response expected from the API, allowing flexibility for various response types. */ @Slf4j public abstract class BaseAirflowApi implements AirflowApi { - protected static final Gson gson = new Gson(); + protected static final ObjectMapper objectMapper = new ObjectMapper(); protected Map pathParams = Maps.newHashMap(); protected Map queryParams = Maps.newHashMap(); protected Map requestBodyParams = Maps.newHashMap(); @@ -70,9 +71,11 @@ public Map getQueryParams() { public RequestBody getRequestBody() { try { return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), - gson.toJson(requestBodyParams)); + objectMapper.writeValueAsString(requestBodyParams)); } catch (Exception e) { - throw new AirflowScheduleException(e.getMessage(), e); + log.error("Airflow request body construction failed: {}", e.getMessage(), e); + throw new AirflowScheduleException( + String.format("Airflow request body construction failed: %s", e.getMessage())); } } @@ -127,6 +130,8 @@ private String buildPathParams(String path, Map pathParams) { private String buildQueryString(Map queryParams) { StringBuilder sb = new StringBuilder(); + // Multiple values can be specified for the same parameter name in the Get parameter. + // (e.g. "?Key=value1&Key=value2") queryParams.forEach((key, value) -> { if (value instanceof List) { ((List) value).forEach(item -> sb.append(key).append("=").append(item).append("&")); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java index f613367140..3c6d7d330f 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java @@ -69,7 +69,7 @@ public RequestBody getRequestBody() { if (connection != null) { try { return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), - gson.toJson(connection)); + objectMapper.writeValueAsString(connection)); } catch (Exception e) { throw new AirflowScheduleException(e.getMessage(), e); } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java index c79bcada45..cb90be33db 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java @@ -22,7 +22,6 @@ import org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor; import org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -81,8 +80,7 @@ public OkHttpClient okHttpClient() { .build(); } @Bean - public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig, - ObjectMapper objectMapper) { - return new AirflowServerClient(okHttpClient, airflowConfig, objectMapper); + public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) { + return new AirflowServerClient(okHttpClient, airflowConfig); } } diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java index 7dc42010ee..2e7d467581 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.schedule.airflow; +import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; + import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -38,12 +40,14 @@ public class AirflowContainerEnv { public static String AIRFLOW_PASSWORD = "airflow"; public static String NORMAL_POSTFIX = "_normal"; public static String CORN_POSTFIX = "_cron"; + public static String AIRFLOW_SCHEDULER_CONTAINER_NAME = "airflow-scheduler_1"; public static String DOCKER_COMPOSE_YAML_PATH = "src/test/resources/airflow/docker-compose.yaml"; + public static String DEFAULT_DAGS_PATH = "/opt/airflow/dags/"; private static DockerComposeContainer environment; private static OkHttpClient httpClient = new OkHttpClient(); - public static void setUp() throws Exception { + public static void setUp() { // Step 1: Start only the airflow-init service environment = new DockerComposeContainer<>(new File(DOCKER_COMPOSE_YAML_PATH)) .withServices("airflow-init") @@ -60,19 +64,16 @@ public static void setUp() throws Exception { .withEnv("AIRFLOW__API__AUTH_BACKEND", "airflow.providers.fab.auth_manager.api.auth.backend.basic_auth"); environment.start(); - log.info("All services are up and running."); - loadTestDAG(); - waitForDAGLoad("dag_cleaner"); + copyTestDAGs(); + waitForDAGsLoad("dag_cleaner"); + log.info("Airflow runtime environment created successfully."); } - private static void loadTestDAG() { + private static void copyTestDAGs() { // After the DAG file is created, the scheduler will regularly scan the DAG file directory and // then load it into memory for scheduling. In order to quickly test the update and unregister, two // test DAGs need to be loaded at the beginning. - String airflowSchedulerName = "airflow-scheduler_1"; - log.info("airflow-scheduler name : " + airflowSchedulerName); - Optional container = environment.getContainerByServiceName(airflowSchedulerName); - log.info("container : " + container.isPresent()); + Optional container = environment.getContainerByServiceName(AIRFLOW_SCHEDULER_CONTAINER_NAME); if (container.isPresent()) { ContainerState airflowScheduler = container.get(); Path dagPath1 = Paths.get("src/test/resources/airflow/dag_cleaner.py").toAbsolutePath(); @@ -80,23 +81,31 @@ private static void loadTestDAG() { Path dagPath3 = Paths.get("src/test/resources/airflow/testGroup_cron.py").toAbsolutePath(); Path dagPath4 = Paths.get("src/test/resources/airflow/testGroup_normal.py").toAbsolutePath(); airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath1), - "/opt/airflow/dags/dag_cleaner.py"); + DEFAULT_DAGS_PATH.concat("dag_cleaner.py")); airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath2), - "/opt/airflow/dags/dag_creator.py"); + DEFAULT_DAGS_PATH.concat("dag_creator.py")); airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath3), - "/opt/airflow/dags/testGroup_cron.py"); + DEFAULT_DAGS_PATH.concat("testGroup_cron.py")); airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath4), - "/opt/airflow/dags/testGroup_normal.py"); + DEFAULT_DAGS_PATH.concat("testGroup_normal.py")); try { - String result = airflowScheduler.execInContainer("bash", "-c", "ls /opt/airflow/dags/").getStdout(); - log.info("/opt/airflow/dags/ has file: {}", result); + String result = + airflowScheduler.execInContainer("bash", "-c", "ls ".concat(DEFAULT_DAGS_PATH)).getStdout(); + log.info(DEFAULT_DAGS_PATH.concat(" has file: {}"), result); } catch (Exception e) { - log.error("Docker Container command execution failed: ", e); + log.warn(String.format( + "Copying the test DAG file may have failed. Docker Container command(\"%s\") execution failed.", + "ls ".contains(DEFAULT_DAGS_PATH)), e); } + } else { + log.error(String.format("Copying test DAG file failed. Airflow scheduler container(%s) does not exist.", + AIRFLOW_SCHEDULER_CONTAINER_NAME)); + throw new AirflowScheduleException("Copying test DAG file failed."); } + log.info("Copy test DAG file successfully."); } - public static void waitForDAGLoad(String dagId) { + public static void waitForDAGsLoad(String dagId) { int total = 10; // Waiting for Airflow to load the initial DAG while (total > 0) { @@ -110,7 +119,7 @@ public static void waitForDAGLoad(String dagId) { break; } } catch (Exception e) { - log.error(e.getMessage(), e); + log.error("The request to check if the original DAG exists failed: {}", e.getMessage(), e); } try { Thread.sleep(30000); diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java index 59f395cfb4..2e4213f56f 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java @@ -20,6 +20,7 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.schedule.BaseScheduleTest; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -32,6 +33,7 @@ import static org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.NORMAL_POSTFIX; import static org.junit.jupiter.api.Assertions.assertTrue; +@Slf4j @EnableConfigurationProperties @ComponentScan(basePackages = "org.apache.inlong.manager") @SpringBootTest(classes = AirflowScheduleEngineTest.class) @@ -40,14 +42,18 @@ public class AirflowScheduleEngineTest { @Autowired private AirflowScheduleEngine scheduleEngine; private static BaseScheduleTest baseScheduleTest = new BaseScheduleTest(); + @BeforeAll public static void initScheduleEngine() { try { AirflowContainerEnv.setUp(); } catch (Exception e) { - throw new RuntimeException(e); + log.error("Airflow runtime environment creation failed.", e); + throw new RuntimeException( + String.format("Airflow runtime environment creation failed: %s", e.getMessage())); } } + @Test @Order(1) public void testRegisterScheduleInfo() { @@ -63,6 +69,7 @@ public void testRegisterScheduleInfo() { scheduleInfo.setInlongGroupId(groupId); assertTrue(scheduleEngine.handleRegister(scheduleInfo)); } + @Test @Order(2) public void testUpdateScheduleInfo() {