From 8ac6865b253b078d93709290520b0ff1c51635f5 Mon Sep 17 00:00:00 2001 From: ZKpLo <14148880+zkplo@user.noreply.gitee.com> Date: Mon, 18 Nov 2024 14:04:33 +0800 Subject: [PATCH] [INLONG-11400][Manager] Remove the HttpMethod class and modify the code for exception handling --- .../airflow/AirflowScheduleEngine.java | 31 ++++++++++------ .../schedule/airflow/api/AirflowApi.java | 3 +- .../schedule/airflow/api/BaseAirflowApi.java | 6 ++-- .../connection/AirflowConnectionCreator.java | 11 ++++-- .../connection/AirflowConnectionGetter.java | 3 +- .../airflow/api/dag/DAGCollectionUpdater.java | 3 +- .../schedule/airflow/api/dag/DAGDeletor.java | 3 +- .../schedule/airflow/api/dag/DAGUpdater.java | 3 +- .../airflow/api/dagruns/DAGRunsTrigger.java | 3 +- .../schedule/airflow/enums/HttpMethod.java | 29 --------------- .../exception/AirflowScheduleException.java | 35 +++++++++++++++++-- 11 files changed, 75 insertions(+), 55 deletions(-) delete mode 100644 inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/enums/HttpMethod.java diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java index bbb03cec002..792307e6aeb 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -51,6 +51,12 @@ import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE; import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX; import static org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.DAG_DUPLICATE; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.INIT_CONNECTION_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_ENGINE_SHUTDOWN_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_REGISTER_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_UPDATE_FAILED; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.TASK_DAG_SWITCH_FAILED; /** * Response for processing the start/register/unregister/update/stop requests from {@link AirflowScheduleClient} @@ -97,7 +103,7 @@ private void initConnection() throws Exception { LOGGER.info("AirflowConnection registration response: {}", response.toString()); if (!response.isSuccess()) { LOGGER.error("Initialization connection failed."); - throw new AirflowScheduleException("Initialization connection failed."); + throw new AirflowScheduleException(INIT_CONNECTION_FAILED, "Initialization connection failed."); } } } @@ -108,12 +114,12 @@ private void switchOriginalDAG(boolean isPaused) { AirflowResponse response = serverClient.sendRequest(new DAGUpdater(dagId, isPaused)); LOGGER.info("Response to {} the original DAG : {}", isPaused ? "stop" : "start", response.toString()); if (!response.isSuccess()) { - throw new AirflowScheduleException( + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, String.format("%s does not exist or failed to %s.", dagId, (isPaused ? "stop" : "start"))); } } catch (Exception e) { LOGGER.error("The original DAG {} failed.", isPaused ? "stop" : "start", e); - throw new AirflowScheduleException( + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, String.format("The original DAG %s failed: %s.", isPaused ? "stop" : "start", e.getMessage())); } } @@ -125,7 +131,7 @@ private void switchAllTaskDAG(boolean isPaused) { new DAGCollectionUpdater(INLONG_OFFLINE_DAG_TASK_PREFIX, isPaused)); LOGGER.info("Response to {} task DAG : {}", isPaused ? "stop" : "start", response.toString()); if (!response.isSuccess()) { - throw new AirflowScheduleException( + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, String.format("Failed to %s task DAGs.", isPaused ? "stop" : "start")); } if (!isPaused) { @@ -137,7 +143,7 @@ private void switchAllTaskDAG(boolean isPaused) { } } catch (Exception e) { LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" : "start", e); - throw new AirflowScheduleException( + throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED, String.format("Failed to %s task DAGs: %s", isPaused ? "stop" : "start", e.getMessage())); } } @@ -150,8 +156,9 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) { } catch (Exception e) { LOGGER.error("The Airflow scheduling task with Group ID {} failed to register.", scheduleInfo.getInlongGroupId(), e); - throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " + - "register: %s", scheduleInfo.getInlongGroupId(), e.getMessage())); + throw new AirflowScheduleException(SCHEDULE_TASK_REGISTER_FAILED, + String.format("The Airflow scheduling task with Group ID %s failed to register: %s", + scheduleInfo.getInlongGroupId(), e.getMessage())); } } @@ -200,14 +207,16 @@ public boolean handleUpdate(ScheduleInfo scheduleInfo) { } catch (Exception e) { LOGGER.error("The Airflow scheduling task with Group ID {} failed to update.", scheduleInfo.getInlongGroupId(), e); - throw new AirflowScheduleException(String.format("The Airflow scheduling task with Group ID %s failed to " + - "update: %s", scheduleInfo.getInlongGroupId(), e.getMessage())); + throw new AirflowScheduleException(SCHEDULE_TASK_UPDATE_FAILED, + String.format("The Airflow scheduling task with Group ID %s failed to update: %s", + scheduleInfo.getInlongGroupId(), e.getMessage())); } } public boolean doRegister(ScheduleInfo scheduleInfo, boolean isFirst) throws Exception { if (isFirst && scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { - throw new AirflowScheduleException("Group " + scheduleInfo.getInlongGroupId() + " is already registered"); + throw new AirflowScheduleException(DAG_DUPLICATE, + String.format("Group %s is already registered", scheduleInfo.getInlongGroupId())); } DAGRunConf.DAGRunConfBuilder confBuilder = DAGRunConf.builder() .inlongGroupId(scheduleInfo.getInlongGroupId()) @@ -242,7 +251,7 @@ public void stop() { switchAllTaskDAG(true); } catch (Exception e) { LOGGER.error("Airflow Schedule Engine shutdown failed: ", e); - throw new AirflowScheduleException( + throw new AirflowScheduleException(SCHEDULE_ENGINE_SHUTDOWN_FAILED, String.format("Airflow Schedule Engine shutdown failed: %s", e.getMessage())); } } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java index 1b0fab55a04..4ff1a3284df 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java @@ -17,10 +17,9 @@ package org.apache.inlong.manager.schedule.airflow.api; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; - import okhttp3.Request; import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; import java.util.Map; /** diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java index 697c9bc1fcd..18a1ed5206a 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java @@ -17,7 +17,6 @@ package org.apache.inlong.manager.schedule.airflow.api; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -26,10 +25,13 @@ import okhttp3.MediaType; import okhttp3.Request; import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; import java.util.List; import java.util.Map; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED; + /** * The basic implementation of Airflow API interface. * @@ -74,7 +76,7 @@ public RequestBody getRequestBody() { objectMapper.writeValueAsString(requestBodyParams)); } catch (Exception e) { log.error("Airflow request body construction failed: {}", e.getMessage(), e); - throw new AirflowScheduleException( + throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED, String.format("Airflow request body construction failed: %s", e.getMessage())); } } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java index 3c6d7d330fe..1e5b7d737cf 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java @@ -20,14 +20,17 @@ import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; import org.apache.inlong.manager.schedule.exception.AirflowScheduleException; +import lombok.extern.slf4j.Slf4j; import okhttp3.MediaType; import okhttp3.RequestBody; +import org.springframework.http.HttpMethod; import java.util.Map; +import static org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED; + /** * Build call for AirflowConnectionGetter
* @@ -51,7 +54,7 @@ * *
403 Client does not have sufficient permission.
*/ - +@Slf4j public class AirflowConnectionCreator extends BaseAirflowApi { AirflowConnection connection = null; @@ -71,7 +74,9 @@ public RequestBody getRequestBody() { return RequestBody.create(MediaType.parse("application/json; charset=utf-8"), objectMapper.writeValueAsString(connection)); } catch (Exception e) { - throw new AirflowScheduleException(e.getMessage(), e); + log.error("Airflow request body construction failed: {}", e.getMessage(), e); + throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED, + String.format("Airflow request body construction failed: %s", e.getMessage())); } } return super.getRequestBody(); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java index 63ed661cb05..7dc278ae0b3 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java @@ -20,7 +20,8 @@ import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection; import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; + +import org.springframework.http.HttpMethod; /** * Build call for AirflowConnectionGetter
diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java index fdac190ad92..039a18d9d84 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java @@ -20,7 +20,8 @@ import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection; import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; + +import org.springframework.http.HttpMethod; import java.util.Map; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java index e7a3cdb5b07..23a348d766a 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java @@ -19,7 +19,8 @@ import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; + +import org.springframework.http.HttpMethod; /** * Build call for DAGDeleter< br> diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java index 58ad1fbb473..be8313f1b15 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java @@ -20,7 +20,8 @@ import org.apache.inlong.manager.pojo.schedule.airflow.DAG; import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; + +import org.springframework.http.HttpMethod; /** * Build call for DAGUpdater< br> diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java index 934974af1f6..b9fe7b22600 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java @@ -20,7 +20,8 @@ import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun; import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant; import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi; -import org.apache.inlong.manager.schedule.airflow.enums.HttpMethod; + +import org.springframework.http.HttpMethod; import java.util.Map; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/enums/HttpMethod.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/enums/HttpMethod.java deleted file mode 100644 index cb756ff86fa..00000000000 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/enums/HttpMethod.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.schedule.airflow.enums; - -/** - * Enumeration representing HTTP methods commonly used in RESTful services. - */ -public enum HttpMethod { - GET, - POST, - PUT, - DELETE, - PATCH -} \ No newline at end of file diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java index d725be65460..6b6830fb304 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java @@ -18,16 +18,45 @@ package org.apache.inlong.manager.schedule.exception; /** - * Exceptions occur in the schedule procedure. - * */ + * Represents exceptions specific to the Airflow scheduling process. + * Each exception is associated with a specific error code for better identification. + */ public class AirflowScheduleException extends RuntimeException { + /** + * Enum to define all error codes associated with Airflow scheduling exceptions. + */ + public enum AirflowErrorCode { + INIT_CONNECTION_FAILED, + TASK_DAG_SWITCH_FAILED, + SCHEDULE_TASK_REGISTER_FAILED, + SCHEDULE_TASK_UPDATE_FAILED, + SCHEDULE_ENGINE_SHUTDOWN_FAILED, + BUILD_REQUEST_BODY_FAILED, + DAG_DUPLICATE + } + + private AirflowErrorCode errorCode; + public AirflowScheduleException(String message) { super(message); } + public AirflowScheduleException(AirflowErrorCode errorCode, String message) { + super(message); + this.errorCode = errorCode; + } - public AirflowScheduleException(String message, Throwable cause) { + public AirflowScheduleException(AirflowErrorCode errorCode, String message, Throwable cause) { super(message, cause); + this.errorCode = errorCode; + } + + public AirflowErrorCode getErrorCode() { + return errorCode; } + @Override + public String toString() { + return String.format("ErrorCode: %s, Message: %s", errorCode, getMessage()); + } }