diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index f773204eba2..79a713a71e0 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -38,6 +38,8 @@ seatunnel: telemetry: metric: enabled: false + log: + scheduled-deletion-enable: true http: enable-http: true port: 8080 diff --git a/docs/en/seatunnel-engine/logging.md b/docs/en/seatunnel-engine/logging.md index 094cb15febd..be0bc12f0a2 100644 --- a/docs/en/seatunnel-engine/logging.md +++ b/docs/en/seatunnel-engine/logging.md @@ -92,6 +92,24 @@ SeaTunnel provides an API for querying logs. For more details, please refer to the [REST-API](rest-api-v2.md). +## SeaTunnel Log Configuration + +### Scheduled deletion of old logs + +SeaTunnel supports scheduled deletion of old log files to prevent disk space exhaustion. You can add the following configuration in the `seatunnel.yml` file: + +```yaml +seatunnel: + engine: + history-job-expire-minutes: 1440 + telemetry: + logs: + scheduled-deletion-enable: true +``` + +- `history-job-expire-minutes`: Sets the retention time for historical job data and logs (in minutes). The system will automatically clear expired job information and log files after the specified period. +- `scheduled-deletion-enable`: Enable scheduled cleanup, with default value of `true`. The system will automatically delete relevant log files when job expiration time, as defined by `history-job-expire-minutes`, is reached. If this feature is disabled, logs will remain permanently on disk, requiring manual management, which may affect disk space usage. It is recommended to configure this setting based on specific needs. + ## Best practices for developers You can create an SLF4J logger by calling `org.slf4j.LoggerFactory#LoggerFactory.getLogger` with the Class of your class as an argument. diff --git a/docs/zh/seatunnel-engine/logging.md b/docs/zh/seatunnel-engine/logging.md index 7d4f4f1d62b..f97ea572e8c 100644 --- a/docs/zh/seatunnel-engine/logging.md +++ b/docs/zh/seatunnel-engine/logging.md @@ -92,6 +92,25 @@ SeaTunnel 提供了一个 API,用于查询日志。 有关详细信息,请参阅 [REST-API](rest-api-v2.md)。 +## SeaTunnel 日志配置 + +### 定时删除旧日志 + +SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足。您可以在 `seatunnel.yml` 文件中添加以下配置: + +```yaml +seatunnel: + engine: + history-job-expire-minutes: 1440 + telemetry: + logs: + scheduled-deletion-enable: true +``` + +- `history-job-expire-minutes`: 设置历史作业和日志的保留时间(单位:分钟)。系统将在指定的时间后自动清除过期的作业信息和日志文件。 +- `scheduled-deletion-enable`: 启用定时清理功能,默认为 `true`。系统将在作业达到 `history-job-expire-minutes` 设置的过期时间后自动删除相关日志文件。关闭该功能后,日志将永久保留在磁盘上,需要用户自行管理,否则可能影响磁盘占用。建议根据需求合理配置。 + + ## 开发人员最佳实践 您可以通过调用 `org.slf4j.LoggerFactory#LoggerFactory.getLogger` 并以您的类的类作为参数来创建 SLF4J 记录器。 diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java index 1aa52488f66..4afbfacb71d 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.platform.commons.util.StringUtils; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -35,12 +36,15 @@ import org.testcontainers.utility.DockerLoggerFactory; import org.testcontainers.utility.MountableFile; +import com.beust.jcommander.internal.Lists; +import com.hazelcast.jet.datamodel.Tuple2; import io.restassured.response.Response; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -52,7 +56,11 @@ public class JobLogIT extends SeaTunnelContainer { private static final String CUSTOM_JOB_NAME = "test-job-log-file"; + private static final String CUSTOM_JOB_NAME2 = "test-job-log-file2"; + private static final String CUSTOM_JOB_NAME3 = "test-job-log-file3"; private static final long CUSTOM_JOB_ID = 862969647010611201L; + private static final long CUSTOM_JOB_ID2 = 862969647010611202L; + private static final long CUSTOM_JOB_ID3 = 862969647010611203L; private static final String confFile = "/fakesource_to_console.conf"; private static final Path BIN_PATH = Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL); @@ -99,10 +107,29 @@ public void tearDown() throws Exception { @Test public void testJobLogFile() throws Exception { submitJobAndAssertResponse( - server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME, CUSTOM_JOB_ID); + server, JobMode.BATCH.name(), false, CUSTOM_JOB_NAME, CUSTOM_JOB_ID); + + submitJobAndAssertResponse( + server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME2, CUSTOM_JOB_ID2); + + submitJobAndAssertResponse( + server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME3, CUSTOM_JOB_ID3); assertConsoleLog(); assertFileLog(); + List> before = + Lists.newArrayList( + Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID + ".log"), + Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"), + Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + ".log")); + assertFileLogClean(before); + Thread.sleep(90000); + List> after = + Lists.newArrayList( + Tuple2.tuple2(true, "job-" + CUSTOM_JOB_ID + ".log"), + Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"), + Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + ".log")); + assertFileLogClean(after); } private void assertConsoleLog() { @@ -168,6 +195,22 @@ private void assertFileLog() throws IOException, InterruptedException { }); } + private void assertFileLogClean(List> tuple2s) + throws IOException, InterruptedException { + for (Tuple2 tuple2 : tuple2s) { + Container.ExecResult execResult = + server.execInContainer( + "sh", "-c", "find /tmp/seatunnel/logs -name " + tuple2.f1() + "\n"); + String file = execResult.getStdout(); + execResult = + secondServer.execInContainer( + "sh", "-c", "find /tmp/seatunnel/logs -name " + tuple2.f1() + "\n"); + String file1 = execResult.getStdout(); + Assertions.assertEquals( + tuple2.f0(), StringUtils.isBlank(file) && StringUtils.isBlank(file1)); + } + } + private Response submitJob( GenericContainer container, String jobMode, diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml index 5878531b2e3..6b0387caa03 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml @@ -17,7 +17,7 @@ seatunnel: engine: - history-job-expire-minutes: 1440 + history-job-expire-minutes: 1 backup-count: 2 queue-type: blockingqueue print-execution-info-interval: 10 @@ -35,3 +35,8 @@ seatunnel: enable-http: true port: 8080 enable-dynamic-port: false + telemetry: + metric: + enabled: false + logs: + scheduled-deletion-enable: true diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 84fef4251ec..80b928fcdcd 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -34,4 +34,9 @@ seatunnel: namespace: /tmp/seatunnel/checkpoint_snapshot/ http: enable-http: false - port: 8080 \ No newline at end of file + port: 8080 + telemetry: + metric: + enabled: false + logs: + scheduled-deletion-enable: true \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index ffa984de1fb..c0f6b6ad6ce 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig; import org.apache.seatunnel.engine.common.config.server.TelemetryConfig; +import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig; import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig; import org.apache.seatunnel.engine.common.config.server.ThreadShareMode; @@ -330,17 +331,19 @@ private Map parseConnectorJarHAStoragePluginConfig( } private TelemetryConfig parseTelemetryConfig(Node telemetryNode) { - TelemetryConfig metricConfig = new TelemetryConfig(); + TelemetryConfig telemetryConfig = new TelemetryConfig(); for (Node node : childElements(telemetryNode)) { String name = cleanNodeName(node); if (ServerConfigOptions.TELEMETRY_METRIC.key().equals(name)) { - metricConfig.setMetric(parseTelemetryMetricConfig(node)); + telemetryConfig.setMetric(parseTelemetryMetricConfig(node)); + } else if (ServerConfigOptions.TELEMETRY_LOGS.key().equals(name)) { + telemetryConfig.setLogs(parseTelemetryLogsConfig(node)); } else { LOGGER.warning("Unrecognized element: " + name); } } - return metricConfig; + return telemetryConfig; } private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) { @@ -357,6 +360,20 @@ private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) { return metricConfig; } + private TelemetryLogsConfig parseTelemetryLogsConfig(Node logsNode) { + TelemetryLogsConfig logsConfig = new TelemetryLogsConfig(); + for (Node node : childElements(logsNode)) { + String name = cleanNodeName(node); + if (ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.key().equals(name)) { + logsConfig.setEnabled(getBooleanValue(getTextContent(node))); + } else { + LOGGER.warning("Unrecognized element: " + name); + } + } + + return logsConfig; + } + private HttpConfig parseHttpConfig(Node httpNode) { HttpConfig httpConfig = new HttpConfig(); for (Node node : childElements(httpNode)) { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index d9e9662a990..7153cccd0f5 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -211,6 +211,20 @@ public class ServerConfigOptions { .withDescription( "Whether to use classloader cache mode. With cache mode, all jobs share the same classloader if the jars are the same"); + public static final Option TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE = + Options.key("scheduled-deletion-enable") + .booleanType() + .defaultValue(true) + .withDescription( + "Enable scheduled cleanup, with default value of true. The system will automatically delete relevant log files when job expiration time, as defined by `history-job-expire-minutes`, is reached. " + + "If this feature is disabled, logs will remain permanently on disk, requiring manual management, which may affect disk space usage. It is recommended to configure this setting based on specific needs."); + + public static final Option TELEMETRY_LOGS = + Options.key("logs") + .type(new TypeReference() {}) + .defaultValue(new TelemetryLogsConfig()) + .withDescription("The telemetry logs configuration."); + public static final Option TELEMETRY_METRIC_ENABLED = Options.key("enabled") .booleanType() diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java index c3e603eea4c..7652832d46d 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java @@ -25,4 +25,6 @@ public class TelemetryConfig implements Serializable { private TelemetryMetricConfig metric = ServerConfigOptions.TELEMETRY_METRIC.defaultValue(); + + private TelemetryLogsConfig logs = ServerConfigOptions.TELEMETRY_LOGS.defaultValue(); } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java new file mode 100644 index 00000000000..9eb8b329754 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.config.server; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class TelemetryLogsConfig implements Serializable { + + private boolean enabled = + ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.defaultValue(); +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java new file mode 100644 index 00000000000..7d92d7dfbc8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.utils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.builder.api.Component; +import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; +import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration; +import org.apache.logging.log4j.core.lookup.StrSubstitutor; + +import java.lang.reflect.Field; + +public class LogUtil { + + /** Get configuration log path by log4j */ + public static String getLogPath() throws NoSuchFieldException, IllegalAccessException { + String routingAppender = "routingAppender"; + String fileAppender = "fileAppender"; + PropertiesConfiguration config = getLogConfiguration(); + // Get routingAppender log file path + String routingLogFilePath = getRoutingLogFilePath(config); + + // Get fileAppender log file path + String fileLogPath = getFileLogPath(config); + String logRef = + config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream() + .map(Object::toString) + .filter(ref -> ref.contains(routingAppender) || ref.contains(fileAppender)) + .findFirst() + .orElse(StringUtils.EMPTY); + if (logRef.equals(routingAppender)) { + return routingLogFilePath.substring(0, routingLogFilePath.lastIndexOf("/")); + } else if (logRef.equals(fileAppender)) { + return fileLogPath.substring(0, routingLogFilePath.lastIndexOf("/")); + } else { + throw new IllegalArgumentException( + String.format("Log file path is empty, get logRef : %s", logRef)); + } + } + + private static PropertiesConfiguration getLogConfiguration() { + LoggerContext context = (LoggerContext) LogManager.getContext(false); + return (PropertiesConfiguration) context.getConfiguration(); + } + + private static String getRoutingLogFilePath(PropertiesConfiguration config) + throws NoSuchFieldException, IllegalAccessException { + Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent"); + propertiesField.setAccessible(true); + Component propertiesComponent = (Component) propertiesField.get(config); + StrSubstitutor substitutor = config.getStrSubstitutor(); + return propertiesComponent.getComponents().stream() + .filter( + component -> + "routingAppender".equals(component.getAttributes().get("name"))) + .flatMap(component -> component.getComponents().stream()) + .flatMap(component -> component.getComponents().stream()) + .flatMap(component -> component.getComponents().stream()) + .map(component -> substitutor.replace(component.getAttributes().get("fileName"))) + .findFirst() + .orElse(null); + } + + private static String getFileLogPath(PropertiesConfiguration config) + throws NoSuchFieldException, IllegalAccessException { + Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent"); + propertiesField.setAccessible(true); + Component propertiesComponent = (Component) propertiesField.get(config); + StrSubstitutor substitutor = config.getStrSubstitutor(); + return propertiesComponent.getComponents().stream() + .filter(component -> "fileAppender".equals(component.getAttributes().get("name"))) + .map(component -> substitutor.replace(component.getAttributes().get("fileName"))) + .findFirst() + .orElse(null); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java new file mode 100644 index 00000000000..ef445b50cb4 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.seatunnel.engine.core.job; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ExecutionAddress implements Serializable { + String hostname; + int port; +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java index aea57beef33..f0ef24bad68 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.Set; @AllArgsConstructor @NoArgsConstructor @@ -38,6 +39,7 @@ public class JobDAGInfo implements Serializable { Map envOptions; Map> pipelineEdges; Map vertexInfoMap; + Set historyExecutionPlan; public JsonObject toJsonObject() { JsonObject pipelineEdgesJsonObject = new JsonObject(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 02fdc98b1ee..eae0627343a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -398,6 +398,7 @@ private void initCoordinatorService() { jobHistoryService = new JobHistoryService( + nodeEngine, runningJobStateIMap, logger, pendingJobMasterMap, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index fb53fb44597..5721a7db30c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService; import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService; import org.apache.seatunnel.engine.server.service.slot.SlotService; +import org.apache.seatunnel.engine.server.telemetry.log.TaskLogManagerService; import org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus; import org.apache.hadoop.fs.FileSystem; @@ -46,6 +47,7 @@ import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker; import lombok.Getter; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import java.util.Properties; import java.util.concurrent.Executors; @@ -55,6 +57,7 @@ import static com.hazelcast.spi.properties.ClusterProperty.INVOCATION_MAX_RETRY_COUNT; import static com.hazelcast.spi.properties.ClusterProperty.INVOCATION_RETRY_PAUSE; +@Slf4j public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker { @@ -71,6 +74,7 @@ public class SeaTunnelServer private CoordinatorService coordinatorService; private ScheduledExecutorService monitorService; private JettyService jettyService; + private TaskLogManagerService taskLogManagerService; @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor; @@ -136,6 +140,16 @@ public void init(NodeEngine engine, Properties hzProperties) { seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode()); + // task log manager service + if (seaTunnelConfig.getEngineConfig().getTelemetryConfig() != null + && seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs() != null + && seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs().isEnabled()) { + taskLogManagerService = + new TaskLogManagerService( + seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs()); + taskLogManagerService.initClean(); + } + // Start Jetty server if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnabled()) { jettyService = new JettyService(nodeEngine, seaTunnelConfig); @@ -337,6 +351,10 @@ public ConnectorPackageService getConnectorPackageService() { return getCoordinatorService().getConnectorPackageService(); } + public TaskLogManagerService getTaskLogManagerService() { + return taskLogManagerService; + } + public ThreadPoolStatus getThreadPoolStatusMetrics() { return coordinatorService.getThreadPoolStatusMetrics(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java index bbe6d77e00f..62df65c9173 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex; import org.apache.seatunnel.engine.core.job.Edge; +import org.apache.seatunnel.engine.core.job.ExecutionAddress; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.VertexInfo; @@ -42,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -52,7 +54,8 @@ public static JobDAGInfo getJobDAGInfo( LogicalDag logicalDag, JobImmutableInformation jobImmutableInformation, EngineConfig engineConfig, - boolean isPhysicalDAGIInfo) { + boolean isPhysicalDAGIInfo, + Set historyExecutionAddress) { List pipelines = new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, engineConfig) .generate() @@ -89,7 +92,8 @@ public static JobDAGInfo getJobDAGInfo( jobImmutableInformation.getJobId(), logicalDag.getJobConfig().getEnvOptions(), pipelineWithEdges, - vertexInfoMap); + vertexInfoMap, + historyExecutionAddress); } else { // Generate LogicalPlan DAG List edges = @@ -136,7 +140,8 @@ public static JobDAGInfo getJobDAGInfo( jobImmutableInformation.getJobId(), logicalDag.getJobConfig().getEnvOptions(), pipelineWithEdges, - vertexInfoMap); + vertexInfoMap, + historyExecutionAddress); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java index a73708a5485..7b0d83ef035 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.core.job.ExecutionAddress; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.JobStatus; @@ -33,15 +34,22 @@ import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.PendingSourceState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.telemetry.log.operation.CleanLogOperation; +import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; +import com.hazelcast.cluster.Address; +import com.hazelcast.core.EntryEvent; import com.hazelcast.logging.ILogger; import com.hazelcast.map.IMap; +import com.hazelcast.map.listener.EntryExpiredListener; +import com.hazelcast.spi.impl.NodeEngine; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; import scala.Tuple2; import java.io.Serializable; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -52,6 +60,9 @@ import java.util.stream.Stream; public class JobHistoryService { + + private final NodeEngine nodeEngine; + /** * IMap key is one of jobId {@link * org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and {@link @@ -91,6 +102,7 @@ public class JobHistoryService { private final int finishedJobExpireTime; public JobHistoryService( + NodeEngine nodeEngine, IMap runningJobStateIMap, ILogger logger, Map> pendingJobMasterMap, @@ -99,6 +111,7 @@ public JobHistoryService( IMap finishedJobMetricsImap, IMap finishedJobVertexInfoImap, int finishedJobExpireTime) { + this.nodeEngine = nodeEngine; this.runningJobStateIMap = runningJobStateIMap; this.logger = logger; this.pendingJobMasterMap = pendingJobMasterMap; @@ -106,6 +119,7 @@ public JobHistoryService( this.finishedJobStateImap = finishedJobStateImap; this.finishedJobMetricsImap = finishedJobMetricsImap; this.finishedJobDAGInfoImap = finishedJobVertexInfoImap; + this.finishedJobDAGInfoImap.addEntryListener(new JobInfoExpiredListener(), true); this.objectMapper = new ObjectMapper(); this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); this.finishedJobExpireTime = finishedJobExpireTime; @@ -287,4 +301,30 @@ public static final class PipelineStateData implements Serializable { private PipelineStatus pipelineStatus; private Map executionStateMap; } + + private class JobInfoExpiredListener implements EntryExpiredListener { + @Override + public void entryExpired(EntryEvent event) { + Long jobId = event.getKey(); + JobDAGInfo jobDagInfo = event.getOldValue(); + try { + Set historyExecutionPlan = jobDagInfo.getHistoryExecutionPlan(); + + historyExecutionPlan.forEach( + address -> { + logger.info("clean job log, jobId: " + jobId + ", address: " + address); + try { + NodeEngineUtil.sendOperationToMemberNode( + nodeEngine, + new CleanLogOperation(jobId), + new Address(address.getHostname(), address.getPort())); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + logger.warning("clean job log err", e); + } + } + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 04812e078fb..49b676c97ee 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -44,6 +44,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; +import org.apache.seatunnel.engine.core.job.ExecutionAddress; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.JobInfo; @@ -89,10 +90,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -157,6 +160,8 @@ public class JobMaster { private final IMap runningJobInfoIMap; + @Getter private final Set historyExecutionAddress = new HashSet<>(); + private final IMap> metricsImap; /** If the job or pipeline cancel by user, needRestore will be false */ @@ -394,6 +399,17 @@ public boolean preApplyResources(SubPlan subPlan) { == preApplyResourceFutures.size(); if (enoughResource) { + for (Map.Entry> entry : + preApplyResourceFutures.entrySet()) { + try { + Address worker = entry.getValue().get().getWorker(); + historyExecutionAddress.add( + new ExecutionAddress(worker.getHost(), worker.getPort())); + + } catch (Exception e) { + LOGGER.warning("history execution plan add worker failed", e); + } + } if (isSubPlan) { // SubPlan applies for resources separately and needs to be merged into the entire // job's resources @@ -569,7 +585,11 @@ public JobDAGInfo getJobDAGInfo() { if (jobDAGInfo == null) { jobDAGInfo = DAGUtils.getJobDAGInfo( - logicalDag, jobImmutableInformation, engineConfig, isPhysicalDAGIInfo); + logicalDag, + jobImmutableInformation, + engineConfig, + isPhysicalDAGIInfo, + historyExecutionAddress); } return jobDAGInfo; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java index 7d582bf5892..a1e84f9ec76 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java @@ -18,14 +18,7 @@ package org.apache.seatunnel.engine.server.rest.service; import org.apache.seatunnel.common.utils.ExceptionUtils; - -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.builder.api.Component; -import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; -import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration; -import org.apache.logging.log4j.core.lookup.StrSubstitutor; +import org.apache.seatunnel.engine.common.utils.LogUtil; import com.hazelcast.internal.util.StringUtil; import com.hazelcast.spi.impl.NodeEngineImpl; @@ -34,7 +27,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.URL; @@ -47,73 +39,13 @@ public BaseLogService(NodeEngineImpl nodeEngine) { /** Get configuration log path */ public String getLogPath() { try { - String routingAppender = "routingAppender"; - String fileAppender = "fileAppender"; - PropertiesConfiguration config = getLogConfiguration(); - // Get routingAppender log file path - String routingLogFilePath = getRoutingLogFilePath(config); - - // Get fileAppender log file path - String fileLogPath = getFileLogPath(config); - String logRef = - config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream() - .map(Object::toString) - .filter( - ref -> - ref.contains(routingAppender) - || ref.contains(fileAppender)) - .findFirst() - .orElse(StringUtils.EMPTY); - if (logRef.equals(routingAppender)) { - return routingLogFilePath.substring(0, routingLogFilePath.lastIndexOf("/")); - } else if (logRef.equals(fileAppender)) { - return fileLogPath.substring(0, routingLogFilePath.lastIndexOf("/")); - } else { - log.warn(String.format("Log file path is empty, get logRef : %s", logRef)); - return null; - } + return LogUtil.getLogPath(); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("Get log path error,{}", ExceptionUtils.getMessage(e)); return null; } } - private String getFileLogPath(PropertiesConfiguration config) - throws NoSuchFieldException, IllegalAccessException { - Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent"); - propertiesField.setAccessible(true); - Component propertiesComponent = (Component) propertiesField.get(config); - StrSubstitutor substitutor = config.getStrSubstitutor(); - return propertiesComponent.getComponents().stream() - .filter(component -> "fileAppender".equals(component.getAttributes().get("name"))) - .map(component -> substitutor.replace(component.getAttributes().get("fileName"))) - .findFirst() - .orElse(null); - } - - private String getRoutingLogFilePath(PropertiesConfiguration config) - throws NoSuchFieldException, IllegalAccessException { - Field propertiesField = BuiltConfiguration.class.getDeclaredField("appendersComponent"); - propertiesField.setAccessible(true); - Component propertiesComponent = (Component) propertiesField.get(config); - StrSubstitutor substitutor = config.getStrSubstitutor(); - return propertiesComponent.getComponents().stream() - .filter( - component -> - "routingAppender".equals(component.getAttributes().get("name"))) - .flatMap(component -> component.getComponents().stream()) - .flatMap(component -> component.getComponents().stream()) - .flatMap(component -> component.getComponents().stream()) - .map(component -> substitutor.replace(component.getAttributes().get("fileName"))) - .findFirst() - .orElse(null); - } - - private PropertiesConfiguration getLogConfiguration() { - LoggerContext context = (LoggerContext) LogManager.getContext(false); - return (PropertiesConfiguration) context.getConfiguration(); - } - protected String sendGet(String urlString) { try { HttpURLConnection connection = (HttpURLConnection) new URL(urlString).openConnection(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java index b304f7fdde1..f342aaba454 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -164,7 +165,8 @@ nodeEngine, new GetJobStatusOperation(jobId)) logicalDag, jobImmutableInformation, getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(), - true); + true, + new HashSet<>()); jobInfoJson .add(RestConstant.JOB_ID, String.valueOf(jobId)) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java index a9267675761..47525de0a52 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java @@ -44,6 +44,7 @@ import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation; import org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation; import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation; +import org.apache.seatunnel.engine.server.telemetry.log.operation.CleanLogOperation; import com.hazelcast.internal.serialization.DataSerializerHook; import com.hazelcast.internal.serialization.impl.FactoryIdHelper; @@ -104,6 +105,8 @@ public class TaskDataSerializerHook implements DataSerializerHook { public static final int CLOSE_READER_OPERATION = 26; + public static final int CLEAN_LOG_OPERATION = 27; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY, @@ -176,6 +179,8 @@ public IdentifiedDataSerializable create(int typeId) { return new JobEventReportOperation(); case CLOSE_READER_OPERATION: return new CloseIdleReaderOperation(); + case CLEAN_LOG_OPERATION: + return new CleanLogOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java new file mode 100644 index 00000000000..f116ada66c8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.telemetry.log; + +import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig; +import org.apache.seatunnel.engine.common.utils.LogUtil; + +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@Slf4j +public class TaskLogManagerService { + private String path; + + public TaskLogManagerService(TelemetryLogsConfig log) {} + + public void initClean() { + try { + path = LogUtil.getLogPath(); + } catch (Exception e) { + log.warn( + "The corresponding log file path is not properly configured, please check the log configuration file.", + e); + } + } + + public void clean(long jobId) { + log.info("Cleaning logs for jobId: {} , path : {}", jobId, path); + if (path == null) { + return; + } + String[] logFiles = getLogFiles(jobId, path); + for (String logFile : logFiles) { + try { + Files.delete(Paths.get(path + "/" + logFile)); + } catch (IOException e) { + log.warn("Failed to delete log file: {}", logFile, e); + } + } + } + + private String[] getLogFiles(long jobId, String path) { + File logDir = new File(path); + if (!logDir.exists() || !logDir.isDirectory()) { + log.warn( + "Skipping deletion: Log directory '{}' either does not exist or is not a valid directory. Please verify the path and ensure the logs are being written correctly.", + path); + return new String[0]; + } + + return logDir.list((dir, name) -> name.contains(String.valueOf(jobId))); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java new file mode 100644 index 00000000000..2f5f215d80c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.telemetry.log.operation; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; +import org.apache.seatunnel.engine.server.task.operation.TracingOperation; +import org.apache.seatunnel.engine.server.telemetry.log.TaskLogManagerService; + +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; + +public class CleanLogOperation extends TracingOperation implements IdentifiedDataSerializable { + + private long jobId; + + public CleanLogOperation(long jobId) { + super(); + this.jobId = jobId; + } + + public CleanLogOperation() {} + + @Override + public void runInternal() throws Exception { + SeaTunnelServer service = getService(); + TaskLogManagerService taskLogManagerService = service.getTaskLogManagerService(); + if (taskLogManagerService != null) { + taskLogManagerService.clean(jobId); + } + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.CLEAN_LOG_OPERATION; + } + + @Override + public String getServiceName() { + return SeaTunnelServer.SERVICE_NAME; + } +}