Skip to content

Commit

Permalink
[INLONG-11400][Manager] Remove the HttpMethod class and modify the co…
Browse files Browse the repository at this point in the history
…de for exception handling
  • Loading branch information
ZKpLo committed Nov 18, 2024
1 parent b60b08c commit 8ac6865
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE;
import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX;
import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI;
import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.DAG_DUPLICATE;
import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.INIT_CONNECTION_FAILED;
import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_ENGINE_SHUTDOWN_FAILED;
import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_REGISTER_FAILED;
import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_UPDATE_FAILED;
import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.TASK_DAG_SWITCH_FAILED;

/**
* Response for processing the start/register/unregister/update/stop requests from {@link AirflowScheduleClient}
Expand Down Expand Up @@ -97,7 +103,7 @@ private void initConnection() throws Exception {
LOGGER.info("AirflowConnection registration response: {}", response.toString());
if (!response.isSuccess()) {
LOGGER.error("Initialization connection failed.");
throw new AirflowScheduleException("Initialization connection failed.");
throw new AirflowScheduleException(INIT_CONNECTION_FAILED, "Initialization connection failed.");
}
}
}
Expand All @@ -108,12 +114,12 @@ private void switchOriginalDAG(boolean isPaused) {
AirflowResponse<DAG> response = serverClient.sendRequest(new DAGUpdater(dagId, isPaused));
LOGGER.info("Response to {} the original DAG : {}", isPaused ? "stop" : "start", response.toString());
if (!response.isSuccess()) {
throw new AirflowScheduleException(
throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
String.format("%s does not exist or failed to %s.", dagId, (isPaused ? "stop" : "start")));
}
} catch (Exception e) {
LOGGER.error("The original DAG {} failed.", isPaused ? "stop" : "start", e);
throw new AirflowScheduleException(
throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
String.format("The original DAG %s failed: %s.", isPaused ? "stop" : "start", e.getMessage()));
}
}
Expand All @@ -125,7 +131,7 @@ private void switchAllTaskDAG(boolean isPaused) {
new DAGCollectionUpdater(INLONG_OFFLINE_DAG_TASK_PREFIX, isPaused));
LOGGER.info("Response to {} task DAG : {}", isPaused ? "stop" : "start", response.toString());
if (!response.isSuccess()) {
throw new AirflowScheduleException(
throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
String.format("Failed to %s task DAGs.", isPaused ? "stop" : "start"));
}
if (!isPaused) {
Expand All @@ -137,7 +143,7 @@ private void switchAllTaskDAG(boolean isPaused) {
}
} catch (Exception e) {
LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : "start", e);
throw new AirflowScheduleException(
throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
String.format("Failed to %s task DAGs: %s", isPaused ? "stop" : "start", e.getMessage()));
}
}
Expand All @@ -150,8 +156,9 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
} catch (Exception e) {
LOGGER.error("The Airflow scheduling task with Group ID {} failed to register.",
scheduleInfo.getInlongGroupId(), e);
throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " +
"register: %s", scheduleInfo.getInlongGroupId(), e.getMessage()));
throw new AirflowScheduleException(SCHEDULE_TASK_REGISTER_FAILED,
String.format("The Airflow scheduling task with Group ID %s failed to register: %s",
scheduleInfo.getInlongGroupId(), e.getMessage()));
}
}

Expand Down Expand Up @@ -200,14 +207,16 @@ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
} catch (Exception e) {
LOGGER.error("The Airflow scheduling task with Group ID {} failed to update.",
scheduleInfo.getInlongGroupId(), e);
throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " +
"update: %s", scheduleInfo.getInlongGroupId(), e.getMessage()));
throw new AirflowScheduleException(SCHEDULE_TASK_UPDATE_FAILED,
String.format("The Airflow scheduling task with Group ID %s failed to update: %s",
scheduleInfo.getInlongGroupId(), e.getMessage()));
}
}

public boolean doRegister(ScheduleInfo scheduleInfo, boolean isFirst) throws Exception {
if (isFirst && scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
throw new AirflowScheduleException("Group " + scheduleInfo.getInlongGroupId() + " is already registered");
throw new AirflowScheduleException(DAG_DUPLICATE,
String.format("Group %s is already registered", scheduleInfo.getInlongGroupId()));
}
DAGRunConf.DAGRunConfBuilder confBuilder = DAGRunConf.builder()
.inlongGroupId(scheduleInfo.getInlongGroupId())
Expand Down Expand Up @@ -242,7 +251,7 @@ public void stop() {
switchAllTaskDAG(true);
} catch (Exception e) {
LOGGER.error("Airflow Schedule Engine shutdown failed: ", e);
throw new AirflowScheduleException(
throw new AirflowScheduleException(SCHEDULE_ENGINE_SHUTDOWN_FAILED,
String.format("Airflow Schedule Engine shutdown failed: %s", e.getMessage()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.inlong.manager.schedule.airflow.api;

import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;

import okhttp3.Request;
import okhttp3.RequestBody;
import org.springframework.http.HttpMethod;

import java.util.Map;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.inlong.manager.schedule.airflow.api;

import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;
import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -26,10 +25,13 @@
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.springframework.http.HttpMethod;

import java.util.List;
import java.util.Map;

import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED;

/**
* The basic implementation of Airflow API interface.
*
Expand Down Expand Up @@ -74,7 +76,7 @@ public RequestBody getRequestBody() {
objectMapper.writeValueAsString(requestBodyParams));
} catch (Exception e) {
log.error("Airflow request body construction failed: {}", e.getMessage(), e);
throw new AirflowScheduleException(
throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED,
String.format("Airflow request body construction failed: %s", e.getMessage()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;
import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;

import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.springframework.http.HttpMethod;

import java.util.Map;

import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED;

/**
* Build call for AirflowConnectionGetter<br>
* <table border="10">
Expand All @@ -51,7 +54,7 @@
* <tr><td> 403 </td><td> Client does not have sufficient permission. </td></tr>
* </table>
*/

@Slf4j
public class AirflowConnectionCreator extends BaseAirflowApi<AirflowConnection> {

AirflowConnection connection = null;
Expand All @@ -71,7 +74,9 @@ public RequestBody getRequestBody() {
return RequestBody.create(MediaType.parse("application/json; charset=utf-8"),
objectMapper.writeValueAsString(connection));
} catch (Exception e) {
throw new AirflowScheduleException(e.getMessage(), e);
log.error("Airflow request body construction failed: {}", e.getMessage(), e);
throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED,
String.format("Airflow request body construction failed: %s", e.getMessage()));
}
}
return super.getRequestBody();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;

import org.springframework.http.HttpMethod;

/**
* Build call for AirflowConnectionGetter<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection;
import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;

import org.springframework.http.HttpMethod;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;

import org.springframework.http.HttpMethod;

/**
* Build call for DAGDeleter< br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.inlong.manager.pojo.schedule.airflow.DAG;
import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;

import org.springframework.http.HttpMethod;

/**
* Build call for DAGUpdater< br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun;
import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod;

import org.springframework.http.HttpMethod;

import java.util.Map;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,45 @@
package org.apache.inlong.manager.schedule.exception;

/**
* Exceptions occur in the schedule procedure.
* */
* Represents exceptions specific to the Airflow scheduling process.
* Each exception is associated with a specific error code for better identification.
*/
public class AirflowScheduleException extends RuntimeException {

/**
* Enum to define all error codes associated with Airflow scheduling exceptions.
*/
public enum AirflowErrorCode {
INIT_CONNECTION_FAILED,
TASK_DAG_SWITCH_FAILED,
SCHEDULE_TASK_REGISTER_FAILED,
SCHEDULE_TASK_UPDATE_FAILED,
SCHEDULE_ENGINE_SHUTDOWN_FAILED,
BUILD_REQUEST_BODY_FAILED,
DAG_DUPLICATE
}

private AirflowErrorCode errorCode;

public AirflowScheduleException(String message) {
super(message);
}
public AirflowScheduleException(AirflowErrorCode errorCode, String message) {
super(message);
this.errorCode = errorCode;
}

public AirflowScheduleException(String message, Throwable cause) {
public AirflowScheduleException(AirflowErrorCode errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}

public AirflowErrorCode getErrorCode() {
return errorCode;
}

@Override
public String toString() {
return String.format("ErrorCode: %s, Message: %s", errorCode, getMessage());
}
}

0 comments on commit 8ac6865

Please sign in to comment.