diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index fcc907396ea..d053ea37a5c 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest; import org.apache.inlong.manager.pojo.sort.SortStatusInfo; import org.apache.inlong.manager.pojo.sort.SortStatusRequest; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; @@ -320,8 +321,8 @@ public Boolean finishTagSwitch(String groupId) { return response.getData(); } - public Boolean submitOfflineJob(String groupId) { - Response responseBody = ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId)); + public Boolean submitOfflineJob(OfflineJobSubmitRequest request) { + Response responseBody = ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(request)); ClientUtils.assertRespSuccess(responseBody); return responseBody.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java index 930a3a8144b..1761193644e 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java @@ -26,6 +26,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import retrofit2.Call; @@ -99,6 +100,6 @@ public interface InlongGroupApi { @GET("group/switch/finish/{groupId}") Call> finishTagSwitch(@Path("groupId") String groupId); - @POST("group/submitOfflineJob/{groupId}") - Call> submitOfflineJob(@Path("groupId") String groupId); + @POST("group/submitOfflineJob") + Call> submitOfflineJob(@Body OfflineJobSubmitRequest request); } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index 4abcc0f3594..d9957e81f54 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -93,6 +93,10 @@ public class InlongConstants { public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream"; public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch"; + public static final String BOUNDARY_TYPE = "BOUNDARY_TYPE"; + public static final String LOWER_BOUNDARY = "LOWER_BOUNDARY"; + public static final String UPPER_BOUNDARY = "UPPER_BOUNDARY"; + public static final Integer DISABLE_ZK = 0; public static final Integer ENABLE_ZK = 1; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index 2a82325ad99..9364fda8e78 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -130,6 +130,10 @@ public enum ErrorCodeEnum { SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"), SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition is not allowed"), + BOUNDED_SOURCE_TYPE_NOT_SUPPORTED(1801, "Bounded source type %s not supported"), + BOUNDARY_TYPE_NOT_SUPPORTED(1802, "Boundary type %s not supported"), + BOUNDARIES_NOT_FOUND(1803, "Boundaries not found"), + WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"), WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no operation authority"), WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"), diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java index 9188c6b73f2..f3809ce8aeb 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.plugin.flink; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; @@ -258,8 +259,16 @@ private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) { list.add(flinkInfo.getLocalConfPath()); list.add("-checkpoint.interval"); list.add("60000"); - list.add("-runtime.execution.mode"); - list.add(flinkInfo.getRuntimeExecutionMode()); + if (InlongConstants.RUNTIME_EXECUTION_MODE_BATCH.equalsIgnoreCase(flinkInfo.getRuntimeExecutionMode())) { + list.add("-runtime.execution.mode"); + list.add(flinkInfo.getRuntimeExecutionMode()); + list.add("-source.boundary.type"); + list.add(flinkInfo.getBoundaryType()); + list.add("-source.lower.boundary"); + list.add(flinkInfo.getLowerBoundary()); + list.add("-source.upper.boundary"); + list.add(flinkInfo.getUpperBoundary()); + } return list.toArray(new String[0]); } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java index 4c3c75f855c..e8b924f1385 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java @@ -54,4 +54,10 @@ public class FlinkInfo { private String exceptionMsg; private String runtimeExecutionMode; + + private String boundaryType; + + private String lowerBoundary; + + private String upperBoundary; } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java index 1c93e7636aa..9a6eca45ce7 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.plugin.offline; +import org.apache.inlong.common.bounded.Boundaries; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; @@ -30,7 +31,8 @@ public class FlinkOfflineJobOperator implements OfflineJobOperator { @Override - public void submitOfflineJob(String groupId, List streamInfoList) throws Exception { - submitFlinkJobs(groupId, streamInfoList, true); + public void submitOfflineJob(String groupId, List streamInfoList, Boundaries boundaries) + throws Exception { + submitFlinkJobs(groupId, streamInfoList, true, boundaries); } } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index fd0c2c8bf2b..1110ef45d81 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.plugin.util; +import org.apache.inlong.common.bounded.Boundaries; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.util.JsonUtils; @@ -222,11 +223,11 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception { public static ListenerResult submitFlinkJobs(String groupId, List streamInfoList) throws Exception { - return submitFlinkJobs(groupId, streamInfoList, false); + return submitFlinkJobs(groupId, streamInfoList, false, null); } public static ListenerResult submitFlinkJobs(String groupId, List streamInfoList, - boolean isBatchJob) throws Exception { + boolean isBatchJob, Boundaries boundaries) throws Exception { int sinkCount = streamInfoList.stream() .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size()) .reduce(0, Integer::sum); @@ -240,7 +241,8 @@ public static ListenerResult submitFlinkJobs(String groupId, List listenerResults = new ArrayList<>(); for (InlongStreamInfo streamInfo : streamInfoList) { listenerResults.add( - FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo), isBatchJob)); + FlinkUtils.submitFlinkJob( + streamInfo, FlinkUtils.genFlinkJobName(streamInfo), isBatchJob, boundaries)); } // only one stream in group for now @@ -254,11 +256,11 @@ public static ListenerResult submitFlinkJobs(String groupId, List sinkList = streamInfo.getSinkList(); List sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) { @@ -298,6 +300,9 @@ public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); if (isBatchJob) { flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH); + flinkInfo.setBoundaryType(boundaries.getBoundaryType().getType()); + flinkInfo.setLowerBoundary(boundaries.getLowerBound()); + flinkInfo.setUpperBoundary(boundaries.getUpperBound()); } else { flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java new file mode 100644 index 00000000000..6e9dcc91e87 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotNull; + +@Data +public class OfflineJobSubmitRequest { + + @ApiModelProperty("Inlong Group ID") + @NotNull + private String groupId; + + @ApiModelProperty("Source boundary type, TIME and OFFSET are supported") + @NotNull + private String boundaryType; + + @ApiModelProperty("The lower bound for bounded source") + @NotNull + private String lowerBoundary; + + @ApiModelProperty("The upper bound for bounded source") + @NotNull + private String upperBoundary; +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java index 2b667664749..6e369c4c411 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java @@ -17,12 +17,14 @@ package org.apache.inlong.manager.schedule.quartz; +import org.apache.inlong.common.bounded.BoundaryType; import org.apache.inlong.manager.client.api.ClientConfiguration; import org.apache.inlong.manager.client.api.impl.InlongClientImpl; import org.apache.inlong.manager.client.api.inner.client.ClientFactory; import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient; import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.auth.DefaultAuthentication; +import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest; import lombok.AllArgsConstructor; import lombok.Data; @@ -35,10 +37,11 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import static org.apache.inlong.manager.schedule.util.ScheduleUtils.END_TIME; import static org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST; import static org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT; -import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID; -import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY; +import static org.apache.inlong.manager.schedule.util.ScheduleUtils.PASSWORD; +import static org.apache.inlong.manager.schedule.util.ScheduleUtils.USERNAME; @Data @NoArgsConstructor @@ -49,31 +52,49 @@ public class QuartzOfflineSyncJob implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(QuartzOfflineSyncJob.class); private volatile InlongGroupClient groupClient; + private long endTime; @Override public void execute(JobExecutionContext context) throws JobExecutionException { LOGGER.info("QuartzOfflineSyncJob run once"); JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); initGroupClientIfNeeded(jobDataMap); + String inlongGroupId = context.getJobDetail().getKey().getName(); - LOGGER.info("Starting submit offline job for group {}", inlongGroupId); - if (groupClient.submitOfflineJob(inlongGroupId)) { - LOGGER.info("Successfully submitting offline job for group {}", inlongGroupId); - } else { - LOGGER.warn("Failed to submit offline job for group {}", inlongGroupId); + long lowerBoundary = context.getScheduledFireTime().getTime(); + long upperBoundary = context.getNextFireTime() == null ? endTime : context.getNextFireTime().getTime(); + + OfflineJobSubmitRequest request = new OfflineJobSubmitRequest(); + request.setGroupId(inlongGroupId); + request.setBoundaryType(BoundaryType.TIME.getType()); + request.setLowerBoundary(String.valueOf(lowerBoundary)); + request.setUpperBoundary(String.valueOf(upperBoundary)); + LOGGER.info("Starting submit offline job for group: {}, with lower boundary: {} and upper boundary: {}", + inlongGroupId, lowerBoundary, upperBoundary); + + try { + if (groupClient.submitOfflineJob(request)) { + LOGGER.info("Successfully submitting offline job for group {}", inlongGroupId); + } else { + LOGGER.warn("Failed to submit offline job for group {}", inlongGroupId); + } + } catch (Exception e) { + LOGGER.error("Exception to submit offline job for group {}, error msg: {}", inlongGroupId, e.getMessage()); } + } private void initGroupClientIfNeeded(JobDataMap jobDataMap) { if (groupClient == null) { String host = (String) jobDataMap.get(MANAGER_HOST); int port = (int) jobDataMap.get(MANAGER_PORT); - String secreteId = (String) jobDataMap.get(SECRETE_ID); - String secreteKey = (String) jobDataMap.get(SECRETE_KEY); - LOGGER.info("Initializing Inlong group client, with host: {}, port: {}, userName : {}", - host, port, secreteId); + String username = (String) jobDataMap.get(USERNAME); + String password = (String) jobDataMap.get(PASSWORD); + endTime = (long) jobDataMap.get(END_TIME); + LOGGER.info("Initializing Inlong group client, with host: {}, port: {}, userName : {}, endTime: {}", + host, port, username, endTime); ClientConfiguration configuration = new ClientConfiguration(); - configuration.setAuthentication(new DefaultAuthentication(secreteId, secreteKey)); + configuration.setAuthentication(new DefaultAuthentication(username, password)); String serviceUrl = host + ":" + port; InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl, configuration); ClientFactory clientFactory = ClientUtils.getClientFactory(inlongClient.getConfiguration()); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java index 50c5d9505fa..4a974f299e8 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java @@ -48,17 +48,20 @@ public class ScheduleUtils { public static final String MANAGER_HOST = "HOST"; public static final String MANAGER_PORT = "PORT"; - public static final String SECRETE_ID = "SECRETE_ID"; - public static final String SECRETE_KEY = "SECRETE_KEY"; + public static final String USERNAME = "USERNAME"; + public static final String PASSWORD = "PASSWORD"; + public static final String END_TIME = "END_TIME"; public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, Class clz, - String host, Integer port, String secreteId, String secreteKey) { + String host, Integer port, String username, String password) { + return JobBuilder.newJob(clz) .withIdentity(scheduleInfo.getInlongGroupId()) .usingJobData(MANAGER_HOST, host) .usingJobData(MANAGER_PORT, port) - .usingJobData(SECRETE_ID, secreteId) - .usingJobData(SECRETE_KEY, secreteKey) + .usingJobData(USERNAME, username) + .usingJobData(PASSWORD, password) + .usingJobData(END_TIME, scheduleInfo.getEndTime().getTime()) .build(); } diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java index 1f0329a318b..af54fbcc555 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java @@ -22,6 +22,7 @@ import java.sql.Timestamp; +import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_ROUND; import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND; public class BaseScheduleTest { @@ -42,6 +43,10 @@ public ScheduleInfo genDefaultScheduleInfo() { return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS); } + public ScheduleInfo genOneroundScheduleInfo() { + return genNormalScheduleInfo(GROUP_ID, ONE_ROUND.getUnit(), DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS); + } + public ScheduleInfo genNormalScheduleInfo(String groupId, String scheduleUnit, int scheduleInterval, long timeSpanInMs) { ScheduleInfo scheduleInfo = new ScheduleInfo(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index c6643c3259b..f8a943c7b43 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest; import org.apache.inlong.manager.pojo.user.UserInfo; import javax.validation.Valid; @@ -219,10 +220,10 @@ void updateAfterApprove( List getGroupByBackUpClusterTag(String clusterTag); /** - * Submitting offline job for the given group. - * @param groupId the inlong group to submit offline job + * Submitting offline job. + * @param request request to submit offline sync job * * */ - Boolean submitOfflineJob(String groupId); + Boolean submitOfflineJob(OfflineJobSubmitRequest request); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index d2f3c0c948a..7ea4d001ee5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.service.group; +import org.apache.inlong.common.bounded.Boundaries; +import org.apache.inlong.common.bounded.BoundaryType; import org.apache.inlong.manager.common.auth.Authentication.AuthType; import org.apache.inlong.manager.common.auth.SecretTokenAuthentication; import org.apache.inlong.manager.common.consts.InlongConstants; @@ -59,6 +61,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -78,6 +81,7 @@ import org.apache.inlong.manager.service.sink.StreamSinkService; import org.apache.inlong.manager.service.source.SourceOperatorFactory; import org.apache.inlong.manager.service.source.StreamSourceOperator; +import org.apache.inlong.manager.service.source.bounded.BoundedSourceType; import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.service.tenant.InlongTenantService; import org.apache.inlong.manager.service.user.InlongRoleService; @@ -932,8 +936,9 @@ public List getGroupByBackUpClusterTag(String clusterTag) { } @Override - public Boolean submitOfflineJob(String groupId) { + public Boolean submitOfflineJob(OfflineJobSubmitRequest request) { // 1. get stream info list + String groupId = request.getGroupId(); InlongGroupInfo groupInfo = get(groupId); if (groupInfo == null) { String msg = String.format("InLong group not found for group=%s", groupId); @@ -946,7 +951,40 @@ public Boolean submitOfflineJob(String groupId) { LOGGER.warn("No stream info found for group {}, skip submit offline job", groupId); return false; } - return scheduleOperator.submitOfflineJob(groupId, streamInfoList); + + // check if source type is bounded source + streamInfoList.forEach(this::checkBoundedSource); + + // get the source boundaries + checkSourceBoundaryType(request.getBoundaryType()); + BoundaryType boundaryType = BoundaryType.getInstance(request.getBoundaryType()); + if (boundaryType == null) { + throw new BusinessException(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED.getMessage(), request.getBoundaryType())); + } + Boundaries boundaries = new Boundaries(request.getLowerBoundary(), request.getUpperBoundary(), boundaryType); + + LOGGER.info("Check bounded source success, start to submitting offline job for group {}", groupId); + + return scheduleOperator.submitOfflineJob(groupId, streamInfoList, boundaries); + } + + private void checkBoundedSource(InlongStreamInfo streamInfo) { + streamInfo.getSourceList().forEach(stream -> { + if (!BoundedSourceType.isBoundedSource(stream.getSourceType())) { + throw new BusinessException(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED.getMessage(), + stream.getSourceType())); + } + }); + } + + private void checkSourceBoundaryType(String sourceBoundaryType) { + if (!BoundaryType.isSupportBoundaryType(sourceBoundaryType)) { + throw new BusinessException(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED.getMessage(), + sourceBoundaryType)); + } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java index 653523294e1..5b1e3c2df68 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.schedule; +import org.apache.inlong.common.bounded.Boundaries; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; @@ -100,7 +101,8 @@ public interface ScheduleOperator { * Start offline sync job when the schedule instance callback. * @param groupId groupId to start offline job * @param streamInfoList stream list to start offline job + * @param boundaries source boundaries for bounded source * @Return whether succeed * */ - Boolean submitOfflineJob(String groupId, List streamInfoList); + Boolean submitOfflineJob(String groupId, List streamInfoList, Boundaries boundaries); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java index 268109e1a2f..61f847f2444 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.schedule; +import org.apache.inlong.common.bounded.Boundaries; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -178,12 +179,12 @@ public Boolean handleGroupApprove(String groupId) { } @Override - public Boolean submitOfflineJob(String groupId, List streamInfoList) { + public Boolean submitOfflineJob(String groupId, List streamInfoList, Boundaries boundaries) { if (offlineJobOperator == null) { offlineJobOperator = OfflineJobOperatorFactory.getOfflineJobOperator(); } try { - offlineJobOperator.submitOfflineJob(groupId, streamInfoList); + offlineJobOperator.submitOfflineJob(groupId, streamInfoList, boundaries); LOGGER.info("Submit offline job for group {} and stream list {} success.", groupId, streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList())); } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java new file mode 100644 index 00000000000..df61fd2d51c --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.source.bounded; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import lombok.Getter; + +@Getter +public enum BoundedSourceType { + + PULSAR("pulsar"); + + private final String sourceType; + + BoundedSourceType(String name) { + this.sourceType = name; + } + + public static BoundedSourceType getInstance(String name) { + for (BoundedSourceType source : values()) { + if (source.getSourceType().equalsIgnoreCase(name)) { + return source; + } + } + throw new BusinessException(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED.getMessage(), name)); + } + + public static boolean isBoundedSource(String name) { + for (BoundedSourceType source : values()) { + if (source.getSourceType().equalsIgnoreCase(name)) { + return true; + } + } + return false; + } +} diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 3a9389c2973..caa4aa6c837 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -33,6 +33,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest; import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import org.apache.inlong.manager.service.group.InlongGroupProcessService; @@ -43,6 +44,8 @@ import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.GetMapping; @@ -65,6 +68,7 @@ @Api(tags = "Inlong-Group-API") public class InlongGroupController { + private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupController.class); @Autowired private InlongGroupService groupService; @Autowired @@ -252,10 +256,10 @@ public Response finishTagSwitch(@PathVariable String groupId) { return Response.success(groupService.finishTagSwitch(groupId)); } - @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = RequestMethod.POST) + @RequestMapping(value = "/group/submitOfflineJob", method = RequestMethod.POST) @ApiOperation(value = "Submitting inlong offline job process") - @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) - public Response submitOfflineJob(@PathVariable String groupId) { - return Response.success(groupService.submitOfflineJob(groupId)); + public Response submitOfflineJob(@RequestBody OfflineJobSubmitRequest request) { + LOGGER.info("Received offline job submit request {}", request); + return Response.success(groupService.submitOfflineJob(request)); } } \ No newline at end of file diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java index f6d395dc7e1..d6bd3bdf382 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java @@ -17,11 +17,13 @@ package org.apache.inlong.manager.workflow.processor; +import org.apache.inlong.common.bounded.Boundaries; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import java.util.List; public interface OfflineJobOperator { - void submitOfflineJob(String groupId, List streamInfoList) throws Exception; + void submitOfflineJob(String groupId, List streamInfoList, Boundaries boundaries) + throws Exception; }