Skip to content

Commit

Permalink
[INLONG-11400][Manager] Modify logging issues and fix UT
Browse files Browse the repository at this point in the history
  • Loading branch information
ZKpLo committed Nov 15, 2024
1 parent 8b242da commit b60b08c
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI;

/**
* Response for processing the register/unregister/update requests from {@link AirflowScheduleClient}
* Response for processing the start/register/unregister/update/stop requests from {@link AirflowScheduleClient}
*/
@Service
public class AirflowScheduleEngine implements ScheduleEngine {
Expand Down Expand Up @@ -96,6 +96,7 @@ private void initConnection() throws Exception {
response = serverClient.sendRequest(new AirflowConnectionCreator(newConn));
LOGGER.info("AirflowConnection registration response: {}", response.toString());
if (!response.isSuccess()) {
LOGGER.error("Initialization connection failed.");
throw new AirflowScheduleException("Initialization connection failed.");
}
}
Expand All @@ -112,7 +113,8 @@ private void switchOriginalDAG(boolean isPaused) {
}
} catch (Exception e) {
LOGGER.error("The original DAG {} failed.", isPaused ? "stop" : "start", e);
throw new RuntimeException(e);
throw new AirflowScheduleException(
String.format("The original DAG %s failed: %s.", isPaused ? "stop" : "start", e.getMessage()));
}
}
}
Expand All @@ -135,7 +137,8 @@ private void switchAllTaskDAG(boolean isPaused) {
}
} catch (Exception e) {
LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : "start", e);
throw new RuntimeException(e);
throw new AirflowScheduleException(
String.format("Failed to %s task DAGs: %s", isPaused ? "stop" : "start", e.getMessage()));
}
}

Expand All @@ -147,7 +150,8 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
} catch (Exception e) {
LOGGER.error("The Airflow scheduling task with Group ID {} failed to register.",
scheduleInfo.getInlongGroupId(), e);
throw new AirflowScheduleException(e.getMessage(), e);
throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " +
"register: %s", scheduleInfo.getInlongGroupId(), e.getMessage()));
}
}

Expand Down Expand Up @@ -196,7 +200,8 @@ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
} catch (Exception e) {
LOGGER.error("The Airflow scheduling task with Group ID {} failed to update.",
scheduleInfo.getInlongGroupId(), e);
throw new AirflowScheduleException(e.getMessage(), e);
throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " +
"update: %s", scheduleInfo.getInlongGroupId(), e.getMessage()));
}
}

Expand Down Expand Up @@ -237,7 +242,8 @@ public void stop() {
switchAllTaskDAG(true);
} catch (Exception e) {
LOGGER.error("Airflow Schedule Engine shutdown failed: ", e);
throw new AirflowScheduleException(e.getMessage(), e);
throw new AirflowScheduleException(
String.format("Airflow Schedule Engine shutdown failed: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public class AirflowServerClient {
private final AirflowConfig config;
private final ObjectMapper objectMapper;

public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config, ObjectMapper objectMapper) {
public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config) {
this.httpClient = httpClient;
this.config = config;
this.objectMapper = objectMapper;
this.objectMapper = new ObjectMapper();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,9 @@ public interface AirflowApi<T> {
*/
Request buildRequest(String baseUrl);

/**
* Returns the type of the response expected from this method.
* @return The expected response type.
*/
Class<T> getResponseType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;
import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.Request;
Expand All @@ -32,13 +32,14 @@

/**
* The basic implementation of Airflow API interface.
*
* @param <T> the type of the response expected from the API, allowing flexibility for various response types.
*/

@Slf4j
public abstract class BaseAirflowApi<T> implements AirflowApi<T> {

protected static final Gson gson = new Gson();
protected static final ObjectMapper objectMapper = new ObjectMapper();
protected Map<String, String> pathParams = Maps.newHashMap();
protected Map<String, Object> queryParams = Maps.newHashMap();
protected Map<String, Object> requestBodyParams = Maps.newHashMap();
Expand Down Expand Up @@ -70,9 +71,11 @@ public Map<String, Object> getQueryParams() {
public RequestBody getRequestBody() {
try {
return RequestBody.create(MediaType.parse("application/json; charset=utf-8"),
gson.toJson(requestBodyParams));
objectMapper.writeValueAsString(requestBodyParams));
} catch (Exception e) {
throw new AirflowScheduleException(e.getMessage(), e);
log.error("Airflow request body construction failed: {}", e.getMessage(), e);
throw new AirflowScheduleException(
String.format("Airflow request body construction failed: %s", e.getMessage()));
}
}

Expand Down Expand Up @@ -127,6 +130,8 @@ private String buildPathParams(String path, Map<String, String> pathParams) {

private String buildQueryString(Map<String, Object> queryParams) {
StringBuilder sb = new StringBuilder();
// Multiple values can be specified for the same parameter name in the Get parameter.
// (e.g. "?Key=value1&Key=value2")
queryParams.forEach((key, value) -> {
if (value instanceof List) {
((List) value).forEach(item -> sb.append(key).append("=").append(item).append("&"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public RequestBody getRequestBody() {
if (connection != null) {
try {
return RequestBody.create(MediaType.parse("application/json; charset=utf-8"),
gson.toJson(connection));
objectMapper.writeValueAsString(connection));
} catch (Exception e) {
throw new AirflowScheduleException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor;
import org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -81,8 +80,7 @@ public OkHttpClient okHttpClient() {
.build();
}
@Bean
public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig,
ObjectMapper objectMapper) {
return new AirflowServerClient(okHttpClient, airflowConfig, objectMapper);
public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) {
return new AirflowServerClient(okHttpClient, airflowConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.schedule.airflow;

import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;

import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -38,12 +40,14 @@ public class AirflowContainerEnv {
public static String AIRFLOW_PASSWORD = "airflow";
public static String NORMAL_POSTFIX = "_normal";
public static String CORN_POSTFIX = "_cron";
public static String AIRFLOW_SCHEDULER_CONTAINER_NAME = "airflow-scheduler_1";
public static String DOCKER_COMPOSE_YAML_PATH = "src/test/resources/airflow/docker-compose.yaml";
public static String DEFAULT_DAGS_PATH = "/opt/airflow/dags/";

private static DockerComposeContainer<?> environment;
private static OkHttpClient httpClient = new OkHttpClient();

public static void setUp() throws Exception {
public static void setUp() {
// Step 1: Start only the airflow-init service
environment = new DockerComposeContainer<>(new File(DOCKER_COMPOSE_YAML_PATH))
.withServices("airflow-init")
Expand All @@ -60,43 +64,48 @@ public static void setUp() throws Exception {
.withEnv("AIRFLOW__API__AUTH_BACKEND",
"airflow.providers.fab.auth_manager.api.auth.backend.basic_auth");
environment.start();
log.info("All services are up and running.");
loadTestDAG();
waitForDAGLoad("dag_cleaner");
copyTestDAGs();
waitForDAGsLoad("dag_cleaner");
log.info("Airflow runtime environment created successfully.");
}

private static void loadTestDAG() {
private static void copyTestDAGs() {
// After the DAG file is created, the scheduler will regularly scan the DAG file directory and
// then load it into memory for scheduling. In order to quickly test the update and unregister, two
// test DAGs need to be loaded at the beginning.
String airflowSchedulerName = "airflow-scheduler_1";
log.info("airflow-scheduler name : " + airflowSchedulerName);
Optional<ContainerState> container = environment.getContainerByServiceName(airflowSchedulerName);
log.info("container : " + container.isPresent());
Optional<ContainerState> container = environment.getContainerByServiceName(AIRFLOW_SCHEDULER_CONTAINER_NAME);
if (container.isPresent()) {
ContainerState airflowScheduler = container.get();
Path dagPath1 = Paths.get("src/test/resources/airflow/dag_cleaner.py").toAbsolutePath();
Path dagPath2 = Paths.get("src/test/resources/airflow/dag_creator.py").toAbsolutePath();
Path dagPath3 = Paths.get("src/test/resources/airflow/testGroup_cron.py").toAbsolutePath();
Path dagPath4 = Paths.get("src/test/resources/airflow/testGroup_normal.py").toAbsolutePath();
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath1),
"/opt/airflow/dags/dag_cleaner.py");
DEFAULT_DAGS_PATH.concat("dag_cleaner.py"));
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath2),
"/opt/airflow/dags/dag_creator.py");
DEFAULT_DAGS_PATH.concat("dag_creator.py"));
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath3),
"/opt/airflow/dags/testGroup_cron.py");
DEFAULT_DAGS_PATH.concat("testGroup_cron.py"));
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath4),
"/opt/airflow/dags/testGroup_normal.py");
DEFAULT_DAGS_PATH.concat("testGroup_normal.py"));
try {
String result = airflowScheduler.execInContainer("bash", "-c", "ls /opt/airflow/dags/").getStdout();
log.info("/opt/airflow/dags/ has file: {}", result);
String result =
airflowScheduler.execInContainer("bash", "-c", "ls ".concat(DEFAULT_DAGS_PATH)).getStdout();
log.info(DEFAULT_DAGS_PATH.concat(" has file: {}"), result);
} catch (Exception e) {
log.error("Docker Container command execution failed: ", e);
log.warn(String.format(
"Copying the test DAG file may have failed. Docker Container command(\"%s\") execution failed.",
"ls ".contains(DEFAULT_DAGS_PATH)), e);
}
} else {
log.error(String.format("Copying test DAG file failed. Airflow scheduler container(%s) does not exist.",
AIRFLOW_SCHEDULER_CONTAINER_NAME));
throw new AirflowScheduleException("Copying test DAG file failed.");
}
log.info("Copy test DAG file successfully.");
}

public static void waitForDAGLoad(String dagId) {
public static void waitForDAGsLoad(String dagId) {
int total = 10;
// Waiting for Airflow to load the initial DAG
while (total > 0) {
Expand All @@ -110,7 +119,7 @@ public static void waitForDAGLoad(String dagId) {
break;
}
} catch (Exception e) {
log.error(e.getMessage(), e);
log.error("The request to check if the original DAG exists failed: {}", e.getMessage(), e);
}
try {
Thread.sleep(30000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.schedule.BaseScheduleTest;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -32,6 +33,7 @@
import static org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.NORMAL_POSTFIX;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Slf4j
@EnableConfigurationProperties
@ComponentScan(basePackages = "org.apache.inlong.manager")
@SpringBootTest(classes = AirflowScheduleEngineTest.class)
Expand All @@ -40,14 +42,18 @@ public class AirflowScheduleEngineTest {
@Autowired
private AirflowScheduleEngine scheduleEngine;
private static BaseScheduleTest baseScheduleTest = new BaseScheduleTest();

@BeforeAll
public static void initScheduleEngine() {
try {
AirflowContainerEnv.setUp();
} catch (Exception e) {
throw new RuntimeException(e);
log.error("Airflow runtime environment creation failed.", e);
throw new RuntimeException(
String.format("Airflow runtime environment creation failed: %s", e.getMessage()));
}
}

@Test
@Order(1)
public void testRegisterScheduleInfo() {
Expand All @@ -63,6 +69,7 @@ public void testRegisterScheduleInfo() {
scheduleInfo.setInlongGroupId(groupId);
assertTrue(scheduleEngine.handleRegister(scheduleInfo));
}

@Test
@Order(2)
public void testUpdateScheduleInfo() {
Expand Down

0 comments on commit b60b08c

Please sign in to comment.