From 7daa4e70362e2eaf65a6dfb5789413721fee4047 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 15:43:55 +0800 Subject: [PATCH 1/9] [INLONG-9995][Manager] Support batch saving of group information and other operations --- .../client/api/impl/InlongGroupImpl.java | 2 +- .../api/inner/client/InlongGroupClient.java | 24 ++++++++- .../api/inner/client/InlongStreamClient.java | 9 ++++ .../api/inner/client/StreamSinkClient.java | 6 +++ .../api/inner/client/StreamSourceClient.java | 9 ++++ .../client/api/service/InlongGroupApi.java | 8 ++- .../client/api/service/InlongStreamApi.java | 3 ++ .../client/api/service/StreamSinkApi.java | 3 ++ .../client/api/service/StreamSourceApi.java | 5 ++ .../manager/common/enums/OperationType.java | 15 +++++- .../form/process/ApplyGroupProcessForm.java | 52 +++++++++++++++++-- .../form/task/InlongGroupApproveForm.java | 38 +++++++++++++- .../group/InlongGroupProcessService.java | 35 +++++++++++++ .../service/group/InlongGroupService.java | 10 ++++ .../service/group/InlongGroupServiceImpl.java | 11 ++++ .../apply/AfterApprovedTaskListener.java | 20 ++++--- .../apply/ApproveApplyProcessListener.java | 29 ++++++----- .../apply/CancelApplyProcessListener.java | 33 +++++++----- .../apply/RejectApplyProcessListener.java | 31 ++++++----- .../service/sink/StreamSinkService.java | 9 ++++ .../service/sink/StreamSinkServiceImpl.java | 10 ++++ .../service/source/StreamSourceService.java | 9 ++++ .../source/StreamSourceServiceImpl.java | 10 ++++ .../service/stream/InlongStreamService.java | 9 ++++ .../stream/InlongStreamServiceImpl.java | 10 ++++ .../web/controller/InlongGroupController.java | 20 +++++++ .../controller/InlongStreamController.java | 17 ++++++ .../web/controller/StreamSinkController.java | 7 +++ .../controller/StreamSourceController.java | 10 ++++ .../openapi/OpenInLongGroupController.java | 9 ++++ .../openapi/OpenInLongStreamController.java | 8 +++ .../openapi/OpenStreamSinkController.java | 7 +++ .../openapi/OpenStreamSourceController.java | 10 ++++ 33 files changed, 429 insertions(+), 59 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java index 9316760874d..7a991803dff 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java @@ -117,7 +117,7 @@ public InlongGroupContext context(String credentials) throws Exception { @Override public InlongGroupContext init() throws Exception { InlongGroupInfo groupInfo = this.groupContext.getGroupInfo(); - WorkflowResult initWorkflowResult = groupClient.initInlongGroup(groupInfo.genRequest()); + WorkflowResult initWorkflowResult = groupClient.startProcess(groupInfo.genRequest()); List taskViews = initWorkflowResult.getNewTasks(); Preconditions.expectNotEmpty(taskViews, "init inlong group info failed"); TaskResponse taskView = taskViews.get(0); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index c7462fd0b21..cd7ded9075d 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -45,6 +45,7 @@ import retrofit2.Call; import java.util.List; +import java.util.stream.Collectors; import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD; import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD; @@ -167,6 +168,16 @@ public String createGroup(InlongGroupRequest groupInfo) { return response.getData(); } + /** + * Batch create an inlong group + */ + public List batchCreateGroup(List groupRequestList) { + Response> response = + ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * Update inlong group info * @@ -186,9 +197,18 @@ public boolean resetGroup(InlongGroupResetRequest resetRequest) { return response.getData(); } - public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) { + public WorkflowResult startProcess(InlongGroupRequest groupInfo) { + Response responseBody = ClientUtils.executeHttpCall( + inlongGroupApi.startProcess(groupInfo.getInlongGroupId())); + ClientUtils.assertRespSuccess(responseBody); + return responseBody.getData(); + } + + public WorkflowResult BatchStartProcess(List groupRequestList) { + List groupIdList = groupRequestList.stream().map(InlongGroupRequest::getInlongGroupId).collect( + Collectors.toList()); Response responseBody = ClientUtils.executeHttpCall( - inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId())); + inlongGroupApi.batchStartProcess(groupIdList)); ClientUtils.assertRespSuccess(responseBody); return responseBody.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java index a71d7ac2b3a..4db85a71d80 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java @@ -58,6 +58,15 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) { return response.getData(); } + /** + * Batch create an inlong stream. + */ + public List batchCreateStreamInfo(List streamInfos) { + Response> response = ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * Query whether the inlong stream ID exists * diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java index bb493f59668..99d686dacb3 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java @@ -54,6 +54,12 @@ public Integer createSink(SinkRequest sinkRequest) { return response.getData(); } + public List batchCreateSink(List sinkRequests) { + Response> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * Delete stream sink info by ID. */ diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java index 88a1ed7cc32..44bee6ced9f 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java @@ -52,6 +52,15 @@ public Integer createSource(SourceRequest request) { return response.getData(); } + /** + * Batch create inlong stream source. + */ + public List batchCreateSource(List requestList) { + Response> response = ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * List stream sources by the given groupId and streamId. */ diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java index 9f9df342b2c..86820e10e43 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java @@ -50,11 +50,17 @@ public interface InlongGroupApi { @POST("group/save") Call> createGroup(@Body InlongGroupRequest request); + @POST("group/batchSave") + Call>> batchCreateGroup(@Body List requestList); + @POST("group/update") Call> updateGroup(@Body InlongGroupRequest request); @POST("group/startProcess/{id}") - Call> initInlongGroup(@Path("id") String id); + Call> startProcess(@Path("id") String id); + + @POST("group/batchStartProcess/{id}") + Call> batchStartProcess(@Body List groupIdList); @POST("group/suspendProcessAsync/{id}") Call> suspendProcessAsync(@Path("id") String id); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java index 273c50fee97..2438c01d2a6 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java @@ -41,6 +41,9 @@ public interface InlongStreamApi { @POST("stream/save") Call> createStream(@Body InlongStreamInfo stream); + @POST("stream/batchSave") + Call>> batchCreateStream(@Body List streamInfos); + @GET("stream/exist/{groupId}/{streamId}") Call> isStreamExists(@Path("groupId") String groupId, @Path("streamId") String streamId); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java index 2443db5aab1..37452e905b3 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java @@ -41,6 +41,9 @@ public interface StreamSinkApi { @POST("sink/save") Call> save(@Body SinkRequest request); + @POST("sink/batchSave") + Call>> batchSave(@Body List requestList); + @POST("sink/update") Call> updateById(@Body SinkRequest request); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java index d383e30d86f..68f3f740b3d 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java @@ -31,11 +31,16 @@ import retrofit2.http.Path; import retrofit2.http.Query; +import java.util.List; + public interface StreamSourceApi { @POST("source/save") Call> createSource(@Body SourceRequest request); + @POST("source/batchSave") + Call>> batchCreateSource(@Body List requestList); + @POST("source/update") Call> updateSource(@Body SourceRequest request); diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java index 0215a694e9b..7ccbe0ef830 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java @@ -35,5 +35,18 @@ public enum OperationType { GET, - LIST + LIST, + + SUSPEND, + + START; + + public static OperationType forOperationType(String type) { + for (OperationType operationType : values()) { + if (operationType.name().equals(type)) { + return operationType; + } + } + throw new IllegalArgumentException(String.format("Unsupported operation type for %s", type)); + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java index c3b7005dd9e..0a562be353b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java @@ -22,13 +22,20 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; +import com.google.common.base.Joiner; import com.google.common.collect.Maps; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Apply inlong group process form @@ -39,15 +46,19 @@ public class ApplyGroupProcessForm extends BaseProcessForm { public static final String FORM_NAME = "ApplyGroupProcessForm"; - @ApiModelProperty(value = "Inlong group info", required = true) + @ApiModelProperty(value = "Inlong group info") private InlongGroupInfo groupInfo; @ApiModelProperty(value = "All inlong stream info under the inlong group, including the sink info") private List streamInfoList; + @ApiModelProperty(value = "Inlong group info list") + private List groupFullInfoList; + @Override public void validate() throws FormValidateException { - Preconditions.expectNotNull(groupInfo, "inlong group info is empty"); + Preconditions.expectTrue(groupInfo != null || CollectionUtils.isNotEmpty(groupFullInfoList), + "inlong group info is empty"); } @Override @@ -57,14 +68,45 @@ public String getFormName() { @Override public String getInlongGroupId() { - return groupInfo.getInlongGroupId(); + if (groupInfo != null) { + return groupInfo.getInlongGroupId(); + } + List groupIdList = groupFullInfoList.stream().map(v -> { + InlongGroupInfo groupInfo = v.getGroupInfo(); + return groupInfo.getInlongGroupId(); + }).collect(Collectors.toList()); + return Joiner.on(",").join(groupIdList); } @Override public Map showInList() { Map show = Maps.newHashMap(); - show.put("inlongGroupId", groupInfo.getInlongGroupId()); - show.put("inlongGroupMode", groupInfo.getInlongGroupMode()); + if (groupInfo != null) { + show.put("inlongGroupId", groupInfo.getInlongGroupId()); + show.put("inlongGroupMode", groupInfo.getInlongGroupMode()); + } else { + List groupIdList = new ArrayList<>(); + List groupModeList = new ArrayList<>(); + groupFullInfoList.forEach(v -> { + InlongGroupInfo groupInfo = v.getGroupInfo(); + groupIdList.add(groupInfo.getInlongGroupId()); + groupModeList.add(groupInfo.getInlongGroupMode()); + }); + show.put("inlongGroupId", Joiner.on(",").join(groupIdList)); + show.put("inlongGroupMode", Joiner.on(",").join(groupModeList)); + } return show; } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class GroupFullInfo { + + private InlongGroupInfo groupInfo; + + private List streamInfoList; + + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java index bcbcc571c01..a4e9ddd8fb4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java @@ -22,10 +22,16 @@ import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; /** @@ -37,15 +43,19 @@ public class InlongGroupApproveForm extends BaseTaskForm { public static final String FORM_NAME = "InlongGroupApproveForm"; - @ApiModelProperty(value = "Inlong group approve info", required = true) + @ApiModelProperty(value = "Inlong group approve info") private InlongGroupApproveRequest groupApproveInfo; @ApiModelProperty(value = "All inlong stream info under the inlong group, including the sink info") private List streamApproveInfoList; + @ApiModelProperty(value = "Inlong group info list request") + private List groupApproveFullInfoList; + @Override public void validate() throws FormValidateException { - Preconditions.expectNotNull(groupApproveInfo, "inlong group approve info is empty"); + Preconditions.expectTrue(groupApproveInfo != null || CollectionUtils.isNotEmpty(groupApproveFullInfoList), + "inlong group approve info is empty"); } @Override @@ -53,4 +63,28 @@ public String getFormName() { return FORM_NAME; } + @JsonIgnore + public List getApproveFullRequest() { + List result = new ArrayList<>(); + if (groupApproveInfo != null) { + result.add(new GroupApproveFullRequest(groupApproveInfo, streamApproveInfoList)); + } + if (CollectionUtils.isNotEmpty(groupApproveFullInfoList)) { + result.addAll(groupApproveFullInfoList); + } + return result; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class GroupApproveFullRequest { + + private InlongGroupApproveRequest groupApproveInfo; + + private List streamApproveInfoList; + + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java index dbb78584b7d..8052315d1a1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java @@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.workflow.TaskResponse; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm; +import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm.GroupFullInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.service.workflow.WorkflowService; @@ -52,6 +53,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -116,6 +118,24 @@ public WorkflowResult startProcess(String groupId, String operator) { return result; } + public WorkflowResult batchStartProcess(List groupIdList, String operator) { + for (String groupId : groupIdList) { + LOGGER.info("begin to start approve process for groupId={} by operator={}", groupId, operator); + + groupService.updateStatus(groupId, GroupStatus.TO_BE_APPROVAL.getCode(), operator); + } + ApplyGroupProcessForm form = genApplyGroupProcessForm(groupIdList); + WorkflowResult result = workflowService.start(ProcessName.APPLY_GROUP_PROCESS, operator, form); + List tasks = result.getNewTasks(); + if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) { + throw new BusinessException(ErrorCodeEnum.WORKFLOW_START_RECORD_FAILED, + String.format("failed to start inlong group for groupId=%s", groupIdList)); + } + + LOGGER.info("success to start approve process for groupId={} by operator={}", groupIdList, operator); + return result; + } + /** * Suspend InlongGroup in an asynchronous way. * @@ -368,6 +388,21 @@ private ApplyGroupProcessForm genApplyGroupProcessForm(String groupId) { return form; } + private ApplyGroupProcessForm genApplyGroupProcessForm(List groupIdList) { + ApplyGroupProcessForm form = new ApplyGroupProcessForm(); + List groupFullInfoList = new ArrayList<>(); + for (String groupId : groupIdList) { + InlongGroupInfo groupInfo = groupService.get(groupId); + List infoList = streamService.listBriefWithSink(groupInfo.getInlongGroupId()); + GroupFullInfo groupFullInfo = new GroupFullInfo(); + groupFullInfo.setGroupInfo(groupInfo); + groupFullInfo.setStreamInfoList(infoList); + groupFullInfoList.add(groupFullInfo); + } + form.setGroupFullInfoList(groupFullInfoList); + return form; + } + /** * Generate the form of [Group Resource Workflow] */ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index d2f8b090487..483f55d8296 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -60,6 +60,16 @@ String save(@Valid @NotNull(message = "inlong group request cannot be null") Inl String save(@Valid @NotNull(message = "inlong group request cannot be null") InlongGroupRequest groupInfo, UserInfo opInfo); + /** + * Batch save inlong group info. + * + * @param groupRequestList group request list need to save + * @param operator name of operator + * @return inlong group id list after saving + */ + List batchSave( + @Valid @NotNull(message = "inlong group request list cannot be null") List groupRequestList, + String operator); /** * Query whether the specified group id exists * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 2d87b74995a..9304747ea39 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -216,6 +216,17 @@ public String save(InlongGroupRequest request, UserInfo opInfo) { return groupId; } + @Override + @Transactional(rollbackFor = Throwable.class) + public List batchSave(List groupRequestList, String operator) { + List groupIdList = new ArrayList<>(); + for (InlongGroupRequest groupRequest : groupRequestList) { + String groupId = this.save(groupRequest, operator); + groupIdList.add(groupId); + } + return groupIdList; + } + @Override public Boolean exist(String groupId) { Preconditions.expectNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java index 819f455e599..63e671f33ca 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm; +import org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm.GroupApproveFullRequest; import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.workflow.WorkflowContext; @@ -32,6 +33,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.List; + /** * The listener for modifying InlongGroup info after the application InlongGroup process is approved. */ @@ -57,15 +60,18 @@ public TaskEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { InlongGroupApproveForm form = (InlongGroupApproveForm) context.getActionContext().getForm(); - InlongGroupApproveRequest approveInfo = form.getGroupApproveInfo(); - String groupId = approveInfo.getInlongGroupId(); - log.info("begin to execute AfterApprovedTaskListener for groupId={}", groupId); + List approveRequestList = form.getApproveFullRequest(); + for (GroupApproveFullRequest request : approveRequestList) { + InlongGroupApproveRequest approveInfo = request.getGroupApproveInfo(); + String groupId = approveInfo.getInlongGroupId(); + log.info("begin to execute AfterApprovedTaskListener for groupId={}", groupId); - // save the inlong group and other info after approval - groupService.updateAfterApprove(approveInfo, context.getOperator()); - streamService.updateAfterApprove(form.getStreamApproveInfoList(), context.getOperator()); + // save the inlong group and other info after approval + groupService.updateAfterApprove(approveInfo, context.getOperator()); + streamService.updateAfterApprove(request.getStreamApproveInfoList(), context.getOperator()); - log.info("success to execute AfterApprovedTaskListener for groupId={}", groupId); + log.info("success to execute AfterApprovedTaskListener for groupId={}", groupId); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java index 4d285488f60..f74153f6a82 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.group.apply; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.enums.ProcessName; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -37,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; import java.util.List; /** @@ -63,21 +65,22 @@ public ProcessEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm(); - String groupId = form.getInlongGroupId(); - log.info("begin to execute ApproveApplyProcessListener for groupId={}", groupId); + List groupList = Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA)); + for (String groupId : groupList) { + log.info("begin to execute ApproveApplyProcessListener for groupId={}", groupId); - InlongGroupInfo groupInfo = groupService.get(groupId); - GroupResourceProcessForm processForm = new GroupResourceProcessForm(); - processForm.setGroupInfo(groupInfo); - String username = context.getOperator(); - List streamList = streamService.list(groupId); - processForm.setStreamInfos(streamList); + InlongGroupInfo groupInfo = groupService.get(groupId); + GroupResourceProcessForm processForm = new GroupResourceProcessForm(); + processForm.setGroupInfo(groupInfo); + List streamList = streamService.list(groupId); + processForm.setStreamInfos(streamList); - // may run for long time, make it async processing - UserInfo userInfo = LoginUserUtils.getLoginUser(); - EXECUTOR_SERVICE.execute( - () -> workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, processForm)); - log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId); + // may run for long time, make it async processing + UserInfo userInfo = LoginUserUtils.getLoginUser(); + EXECUTOR_SERVICE.execute( + () -> workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, processForm)); + log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java index c6b2871ba86..5f03ebbf0fa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.group.apply; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -31,6 +32,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; +import java.util.List; import java.util.Objects; /** @@ -54,22 +57,24 @@ public ProcessEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm(); - String groupId = form.getInlongGroupId(); - log.info("begin to execute CancelApplyProcessListener for groupId={}", groupId); + List groupList = Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA)); + for (String groupId : groupList) { + log.info("begin to execute CancelApplyProcessListener for groupId={}", groupId); - // only the [ToBeApproval] status allowed the canceling operation - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); - if (entity == null) { - throw new WorkflowListenerException("InlongGroup not found with groupId=" + groupId); - } - if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { - throw new WorkflowListenerException(String.format("Current status [%s] was not allowed to cancel", - GroupStatus.forCode(entity.getStatus()))); - } - String operator = context.getOperator(); - groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), operator); + // only the [ToBeApproval] status allowed the canceling operation + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + if (entity == null) { + throw new WorkflowListenerException("InlongGroup not found with groupId=" + groupId); + } + if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { + throw new WorkflowListenerException(String.format("Current status [%s] was not allowed to cancel", + GroupStatus.forCode(entity.getStatus()))); + } + String operator = context.getOperator(); + groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), operator); - log.info("success to execute CancelApplyProcessListener for groupId={}", groupId); + log.info("success to execute CancelApplyProcessListener for groupId={}", groupId); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java index 4b387e6da91..8ad8bd766ae 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.group.apply; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -32,6 +33,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; +import java.util.List; import java.util.Objects; /** @@ -54,21 +57,23 @@ public ProcessEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm(); - String groupId = form.getInlongGroupId(); - log.info("begin to execute RejectApplyProcessListener for groupId={}", groupId); + List groupList = Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA)); + for (String groupId : groupList) { + log.info("begin to execute RejectApplyProcessListener for groupId={}", groupId); - // only the [TO_BE_APPROVAL] status allowed the rejecting operation - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); - if (entity == null) { - throw new WorkflowListenerException("inlong group not found with groupId=" + groupId); - } - if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { - throw new WorkflowListenerException("current status was not allowed to reject inlong group"); - } + // only the [TO_BE_APPROVAL] status allowed the rejecting operation + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + if (entity == null) { + throw new WorkflowListenerException("inlong group not found with groupId=" + groupId); + } + if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { + throw new WorkflowListenerException("current status was not allowed to reject inlong group"); + } - // after reject, update InlongGroup status to [APPROVE_REJECTED] - String username = context.getOperator(); - groupService.updateStatus(groupId, GroupStatus.APPROVE_REJECTED.getCode(), username); + // after reject, update InlongGroup status to [APPROVE_REJECTED] + String username = context.getOperator(); + groupService.updateStatus(groupId, GroupStatus.APPROVE_REJECTED.getCode(), username); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index 06f97bb762a..bf14d09b454 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -59,6 +59,15 @@ public interface StreamSinkService { */ Integer save(SinkRequest request, UserInfo opInfo); + /** + * Batch save the sink info. + * + * @param requestList sink request list need to save + * @param operator name of operator + * @return sink id list after saving + */ + List batchSave(List requestList, String operator); + /** * Get stream sink info based on id. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 31092bdb0b7..b916fc32d17 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -234,6 +234,16 @@ public Integer save(SinkRequest request, UserInfo opInfo) { return id; } + @Override + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); + for (SinkRequest request : requestList) { + int id = this.save(request, operator); + resultList.add(id); + } + return resultList; + } + @Override public StreamSink get(Integer id) { if (id == null) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index 0dd4decd28f..b4cd8b34023 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -52,6 +52,15 @@ public interface StreamSourceService { */ Integer save(SourceRequest request, UserInfo opInfo); + /** + * Batch save the source information + * + * @param requestList Source request list. + * @param operator Operator's name. + * @return source id list after saving. + */ + List batchSave(List requestList, String operator); + /** * Query source information based on id * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 2d3855b05e0..b57f108e6bb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -162,6 +162,16 @@ public Integer save(SourceRequest request, UserInfo opInfo) { return sourceOperator.saveOpt(request, groupEntity.getStatus(), opInfo.getName()); } + @Override + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); + for (SourceRequest request : requestList) { + int id = this.save(request, operator); + resultList.add(id); + } + return resultList; + } + @Override public StreamSource get(Integer id) { if (id == null) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java index 328e9ae73cc..34bfe613281 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java @@ -50,6 +50,15 @@ public interface InlongStreamService { */ Integer save(InlongStreamRequest request, String operator); + /** + * Batch save inlong stream information. + * + * @param requestList Inlong stream information list. + * @param operator The name of operator. + * @return Id list after successful save. + */ + List batchSave(List requestList, String operator); + /** * Save inlong stream information. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index b9d8ef8d364..a443512c9bd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -184,6 +184,16 @@ public Integer save(InlongStreamRequest request, String operator) { return streamEntity.getId(); } + @Override + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); + for (InlongStreamRequest request : requestList) { + int id = this.save(request, operator); + resultList.add(id); + } + return resultList; + } + @Override public Integer save(InlongStreamRequest request, UserInfo opInfo) { InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId()); diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 0ad34ae506d..55cb699a503 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -77,6 +77,15 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Inlon return Response.success(groupService.save(groupRequest, operator)); } + @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.GROUP) + @ApiOperation(value = "Batch Save inlong group") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List groupRequestList) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupService.batchSave(groupRequestList, operator)); + } + @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET) @ApiOperation(value = "Is the inlong group id exists") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) @@ -155,14 +164,24 @@ public Response deleteAsync(@PathVariable String groupId) { @RequestMapping(value = "/group/startProcess/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Start inlong approval process") + @OperationLog(operation = OperationType.START, operationTarget = OperationTarget.GROUP) @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) public Response startProcess(@PathVariable String groupId) { String operator = LoginUserUtils.getLoginUser().getName(); return Response.success(groupProcessOperation.startProcess(groupId, operator)); } + @RequestMapping(value = "/group/batchStartProcess", method = RequestMethod.POST) + @ApiOperation(value = "Batch start inlong approval process") + @OperationLog(operation = OperationType.START, operationTarget = OperationTarget.GROUP) + public Response batchStartProcess(@RequestBody List groupIdList) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupProcessOperation.batchStartProcess(groupIdList, operator)); + } + @RequestMapping(value = "/group/suspendProcess/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Suspend inlong group process") + @OperationLog(operation = OperationType.SUSPEND, operationTarget = OperationTarget.GROUP) @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) public Response suspendProcess(@PathVariable String groupId) { String operator = LoginUserUtils.getLoginUser().getName(); @@ -178,6 +197,7 @@ public Response restartProcess(@PathVariable String groupId) { } @RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method = RequestMethod.POST) + @OperationLog(operation = OperationType.SUSPEND, operationTarget = OperationTarget.GROUP) @ApiOperation(value = "Suspend inlong group process") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) public Response suspendProcessAsync(@PathVariable String groupId) { diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java index da747818cee..ca01b9b3d56 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java @@ -86,6 +86,14 @@ public Response save(@RequestBody InlongStreamRequest request) { return Response.success(result); } + @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) + @ApiOperation(value = "Batch save inlong stream") + public Response> batchSave(@RequestBody List requestList) { + List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + @RequestMapping(value = "/stream/exist/{groupId}/{streamId}", method = RequestMethod.GET) @ApiOperation(value = "Is the inlong stream exists") @ApiImplicitParams({ @@ -154,6 +162,7 @@ public Response startProcess(@PathVariable String groupId, @PathVariabl } @RequestMapping(value = "/stream/suspendProcess/{groupId}/{streamId}", method = RequestMethod.POST) + @OperationLog(operation = OperationType.SUSPEND, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Suspend inlong stream process") @ApiImplicitParams({ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), @@ -256,4 +265,12 @@ public Response> listMessages(@RequestParam String groupId, return Response.success(streamService.listMessages(groupId, streamId, messageCount, username)); } + @RequestMapping(value = "/stream/listByTenant", method = RequestMethod.POST) + @ApiOperation(value = "List inlong stream briefs by paginating, only wedata use") + public Response> listByTenant(@RequestBody InlongStreamPageRequest request) { + request.setCurrentUser(LoginUserUtils.getLoginUser().getName()); + request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)); + return Response.success(streamService.listBriefByTenant(request)); + } + } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index e8d0ef93ab3..2e530fbc64b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -66,6 +66,13 @@ public Response save(@Validated @RequestBody SinkRequest request) { return Response.success(sinkService.save(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) + @ApiOperation(value = "Batch save stream sink") + public Response> batchSave(@Validated @RequestBody List requestList) { + return Response.success(sinkService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/get/{id}", method = RequestMethod.GET) @ApiOperation(value = "Get stream sink") @OperationLog(operation = OperationType.GET, operationTarget = OperationTarget.SINK) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index 8e7645f993c..99fe165329b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -44,6 +44,8 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + /** * Stream source control layer */ @@ -62,6 +64,14 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Sour return Response.success(sourceService.save(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SOURCE) + @ApiOperation(value = "Batch save stream source") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List requestList) { + return Response.success(sourceService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/source/get/{id}", method = RequestMethod.GET) @ApiOperation(value = "Get stream source") @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java index 8dca1ad748b..dc08ea62819 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java @@ -88,6 +88,15 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Inlon return Response.success(groupService.save(groupRequest, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.GROUP) + @ApiOperation(value = "Batch Save inlong group") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List groupRequestList) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupService.batchSave(groupRequestList, operator)); + } + @RequestMapping(value = "/group/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.GROUP) @ApiOperation(value = "Update inlong group") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java index 893e0e589b2..81bef8121ff 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java @@ -105,6 +105,14 @@ public Response save(@RequestBody InlongStreamRequest request) { return Response.success(streamService.save(request, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) + @ApiOperation(value = "Batch save inlong stream") + public Response> batchSave(@RequestBody List requestList) { + List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + @RequestMapping(value = "/stream/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Update inlong stream") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java index d69f0895f43..b44076cd835 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java @@ -83,6 +83,13 @@ public Response save(@Validated @RequestBody SinkRequest request) { return Response.success(sinkService.save(request, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) + @ApiOperation(value = "Batch save stream sink") + public Response> batchSave(@Validated @RequestBody List requestList) { + return Response.success(sinkService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SINK) @ApiOperation(value = "Update stream sink") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java index 6bd7ab19ddb..c8097ddeaba 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java @@ -43,6 +43,8 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + /** * Open InLong Stream Source controller */ @@ -80,6 +82,14 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Sour return Response.success(sourceService.save(request, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SOURCE) + @ApiOperation(value = "Batch save stream source") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List requestList) { + return Response.success(sourceService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/source/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SOURCE) @ApiOperation(value = "Update stream source") From c0b301f5ffbff18076ac86096fd70fc3389bc576 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 15:45:31 +0800 Subject: [PATCH 2/9] [INLONG-9995][Manager] Fix error --- .../manager/client/api/inner/client/InlongGroupClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index cd7ded9075d..c78e8bcaf86 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -204,7 +204,7 @@ public WorkflowResult startProcess(InlongGroupRequest groupInfo) { return responseBody.getData(); } - public WorkflowResult BatchStartProcess(List groupRequestList) { + public WorkflowResult batchStartProcess(List groupRequestList) { List groupIdList = groupRequestList.stream().map(InlongGroupRequest::getInlongGroupId).collect( Collectors.toList()); Response responseBody = ClientUtils.executeHttpCall( From 513bfad1c76675caab2e2ed0101e21fb27c92cde Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 15:49:10 +0800 Subject: [PATCH 3/9] [INLONG-9995][Manager] Fix error --- .../manager/client/api/inner/client/InlongGroupClient.java | 2 +- .../manager/client/api/inner/client/InlongStreamClient.java | 2 +- .../pojo/workflow/form/process/ApplyGroupProcessForm.java | 2 +- .../manager/pojo/workflow/form/task/InlongGroupApproveForm.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index c78e8bcaf86..c1c31039723 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -169,7 +169,7 @@ public String createGroup(InlongGroupRequest groupInfo) { } /** - * Batch create an inlong group + * Batch create inlong group */ public List batchCreateGroup(List groupRequestList) { Response> response = diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java index 4db85a71d80..a1a10c5abf8 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java @@ -59,7 +59,7 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) { } /** - * Batch create an inlong stream. + * Batch create inlong stream. */ public List batchCreateStreamInfo(List streamInfos) { Response> response = ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos)); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java index 0a562be353b..27d3e30b709 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java @@ -52,7 +52,7 @@ public class ApplyGroupProcessForm extends BaseProcessForm { @ApiModelProperty(value = "All inlong stream info under the inlong group, including the sink info") private List streamInfoList; - @ApiModelProperty(value = "Inlong group info list") + @ApiModelProperty(value = "Inlong group full info list") private List groupFullInfoList; @Override diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java index a4e9ddd8fb4..e7fbbc37ff8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java @@ -49,7 +49,7 @@ public class InlongGroupApproveForm extends BaseTaskForm { @ApiModelProperty(value = "All inlong stream info under the inlong group, including the sink info") private List streamApproveInfoList; - @ApiModelProperty(value = "Inlong group info list request") + @ApiModelProperty(value = "Inlong group approve full info list") private List groupApproveFullInfoList; @Override From 1d9021e3e36aad3ce6d879b6385c61b66cc6f800 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 16:28:21 +0800 Subject: [PATCH 4/9] [INLONG-9995][Manager] Fix error --- .../pojo/workflow/ProcessResponse.java | 2 +- .../manager/pojo/workflow/TaskResponse.java | 2 +- .../form/process/ApplyConsumeProcessForm.java | 8 +++-- .../form/process/ApplyGroupProcessForm.java | 29 ++++++++++--------- .../process/GroupResourceProcessForm.java | 7 +++-- .../workflow/form/process/ProcessForm.java | 3 +- .../service/workflow/WorkflowServiceImpl.java | 4 +-- 7 files changed, 32 insertions(+), 23 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java index 215fbb3051f..059e8edb82d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java @@ -77,6 +77,6 @@ public class ProcessResponse { private List currentTasks; @ApiModelProperty(value = "Extra information shown in the list") - private Map showInList; + private List> showInList; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java index 519d6a7c168..06cae85145d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java @@ -89,6 +89,6 @@ public class TaskResponse { private Object extParams; @ApiModelProperty(value = "Extra information shown in the list") - private Map showInList; + private List> showInList; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java index 4b6792604c4..10db1197fcd 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java @@ -26,6 +26,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -56,12 +58,14 @@ public String getInlongGroupId() { } @Override - public Map showInList() { + public List> showInList() { + List> showInList = new ArrayList<>(); Map show = Maps.newHashMap(); if (consumeInfo != null) { show.put("inlongGroupId", consumeInfo.getInlongGroupId()); show.put("consumerGroup", consumeInfo.getConsumerGroup()); } - return show; + showInList.add(show); + return showInList; } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java index 27d3e30b709..272052f514d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java @@ -79,23 +79,24 @@ public String getInlongGroupId() { } @Override - public Map showInList() { - Map show = Maps.newHashMap(); + public List> showInList() { + List> showList = new ArrayList<>(); if (groupInfo != null) { - show.put("inlongGroupId", groupInfo.getInlongGroupId()); - show.put("inlongGroupMode", groupInfo.getInlongGroupMode()); - } else { - List groupIdList = new ArrayList<>(); - List groupModeList = new ArrayList<>(); - groupFullInfoList.forEach(v -> { - InlongGroupInfo groupInfo = v.getGroupInfo(); - groupIdList.add(groupInfo.getInlongGroupId()); - groupModeList.add(groupInfo.getInlongGroupMode()); + addShowInfo(groupInfo, showList); + } + if (CollectionUtils.isNotEmpty(groupFullInfoList)) { + groupFullInfoList.forEach(groupFullInfo -> { + addShowInfo(groupFullInfo.getGroupInfo(), showList); }); - show.put("inlongGroupId", Joiner.on(",").join(groupIdList)); - show.put("inlongGroupMode", Joiner.on(",").join(groupModeList)); } - return show; + return showList; + } + + private void addShowInfo(InlongGroupInfo groupInfo, List> showList) { + Map show = Maps.newHashMap(); + show.put("inlongGroupId", groupInfo.getInlongGroupId()); + show.put("inlongGroupMode", groupInfo.getInlongGroupMode()); + showList.add(show); } @Data diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java index d955a2c5a66..1dd1294653b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java @@ -26,6 +26,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,11 +62,13 @@ public String getInlongGroupId() { } @Override - public Map showInList() { + public List> showInList() { + List> showInList = new ArrayList<>(); Map show = new HashMap<>(); show.put("inlongGroupId", groupInfo.getInlongGroupId()); show.put("groupOperateType", this.groupOperateType); - return show; + showInList.add(show); + return showInList; } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java index 95f4d1451c4..bcf1278e971 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.List; import java.util.Map; /** @@ -47,7 +48,7 @@ default String getTitle() { /** * Field data displayed in the process list. */ - default Map showInList() { + default List> showInList() { return null; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java index 0863b8e6661..739b68fcab5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java @@ -244,7 +244,7 @@ private Consumer addCurrentTask(TaskRequest query) { }; } - private Map getShowInList(WorkflowProcessEntity processEntity) { + private List> getShowInList(WorkflowProcessEntity processEntity) { WorkflowProcess process = processDefService.getByName(processEntity.getName()); if (process == null || process.getFormClass() == null) { return null; @@ -270,7 +270,7 @@ private void addShowInListForEachTask(List taskList) { query.setIdList(list); List processEntities = queryService.listProcessEntity(query); - Map> processShowInListMap = Maps.newHashMap(); + Map>> processShowInListMap = Maps.newHashMap(); processEntities.forEach(entity -> processShowInListMap.put(entity.getId(), getShowInList(entity))); taskList.forEach(task -> task.setShowInList(processShowInListMap.get(task.getProcessId()))); } From 00df16e39e462f3cd9439f291dbddb1e70b52f4a Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 16:31:00 +0800 Subject: [PATCH 5/9] [INLONG-9995][Manager] Fix error --- .../manager/web/controller/InlongStreamController.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java index ca01b9b3d56..d78533a48d5 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java @@ -265,12 +265,4 @@ public Response> listMessages(@RequestParam String groupId, return Response.success(streamService.listMessages(groupId, streamId, messageCount, username)); } - @RequestMapping(value = "/stream/listByTenant", method = RequestMethod.POST) - @ApiOperation(value = "List inlong stream briefs by paginating, only wedata use") - public Response> listByTenant(@RequestBody InlongStreamPageRequest request) { - request.setCurrentUser(LoginUserUtils.getLoginUser().getName()); - request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.TENANT_ADMIN)); - return Response.success(streamService.listBriefByTenant(request)); - } - } From 2ebabf7c0e741542ac0fb36436f14c90bf802daf Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 21:14:49 +0800 Subject: [PATCH 6/9] [INLONG-9995][Manager] Fix error --- .../api/inner/client/InlongGroupClient.java | 5 ++- .../api/inner/client/InlongStreamClient.java | 6 ++- .../api/inner/client/StreamSinkClient.java | 5 ++- .../api/inner/client/StreamSourceClient.java | 6 ++- .../client/api/service/InlongGroupApi.java | 3 +- .../client/api/service/InlongStreamApi.java | 3 +- .../client/api/service/StreamSinkApi.java | 3 +- .../client/api/service/StreamSourceApi.java | 3 +- .../manager/pojo/common/BatchResult.java | 39 +++++++++++++++++++ .../service/group/InlongGroupService.java | 3 +- .../service/group/InlongGroupServiceImpl.java | 23 ++++++++--- .../service/sink/StreamSinkService.java | 3 +- .../service/sink/StreamSinkServiceImpl.java | 23 +++++++++-- .../service/source/StreamSourceService.java | 3 +- .../source/StreamSourceServiceImpl.java | 23 +++++++++-- .../service/stream/InlongStreamService.java | 3 +- .../stream/InlongStreamServiceImpl.java | 22 +++++++++-- .../web/controller/InlongGroupController.java | 3 +- .../controller/InlongStreamController.java | 3 +- .../web/controller/StreamSinkController.java | 3 +- .../controller/StreamSourceController.java | 3 +- .../openapi/OpenInLongGroupController.java | 3 +- .../openapi/OpenInLongStreamController.java | 5 ++- .../openapi/OpenStreamSinkController.java | 3 +- .../openapi/OpenStreamSourceController.java | 3 +- 25 files changed, 160 insertions(+), 42 deletions(-) create mode 100644 inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index c1c31039723..54b61f91c2a 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -171,8 +172,8 @@ public String createGroup(InlongGroupRequest groupInfo) { /** * Batch create inlong group */ - public List batchCreateGroup(List groupRequestList) { - Response> response = + public List batchCreateGroup(List groupRequestList) { + Response> response = ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList)); ClientUtils.assertRespSuccess(response); return response.getData(); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java index a1a10c5abf8..89ca7eef687 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; @@ -61,8 +62,9 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) { /** * Batch create inlong stream. */ - public List batchCreateStreamInfo(List streamInfos) { - Response> response = ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos)); + public List batchCreateStreamInfo(List streamInfos) { + Response> response = + ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos)); ClientUtils.assertRespSuccess(response); return response.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java index 99d686dacb3..c350900f067 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.client.api.service.StreamSinkApi; import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -54,8 +55,8 @@ public Integer createSink(SinkRequest sinkRequest) { return response.getData(); } - public List batchCreateSink(List sinkRequests) { - Response> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests)); + public List batchCreateSink(List sinkRequests) { + Response> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests)); ClientUtils.assertRespSuccess(response); return response.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java index 44bee6ced9f..baa6a601ba8 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.SourcePageRequest; @@ -55,8 +56,9 @@ public Integer createSource(SourceRequest request) { /** * Batch create inlong stream source. */ - public List batchCreateSource(List requestList) { - Response> response = ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList)); + public List batchCreateSource(List requestList) { + Response> response = + ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList)); ClientUtils.assertRespSuccess(response); return response.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java index 86820e10e43..819d9802d9f 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -51,7 +52,7 @@ public interface InlongGroupApi { Call> createGroup(@Body InlongGroupRequest request); @POST("group/batchSave") - Call>> batchCreateGroup(@Body List requestList); + Call>> batchCreateGroup(@Body List requestList); @POST("group/update") Call> updateGroup(@Body InlongGroupRequest request); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java index 2438c01d2a6..6b47d8aeb23 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; @@ -42,7 +43,7 @@ public interface InlongStreamApi { Call> createStream(@Body InlongStreamInfo stream); @POST("stream/batchSave") - Call>> batchCreateStream(@Body List streamInfos); + Call>> batchCreateStream(@Body List streamInfos); @GET("stream/exist/{groupId}/{streamId}") Call> isStreamExists(@Path("groupId") String groupId, @Path("streamId") String streamId); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java index 37452e905b3..75088a5eee7 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -42,7 +43,7 @@ public interface StreamSinkApi { Call> save(@Body SinkRequest request); @POST("sink/batchSave") - Call>> batchSave(@Body List requestList); + Call>> batchSave(@Body List requestList); @POST("sink/update") Call> updateById(@Body SinkRequest request); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java index 68f3f740b3d..31fb815aff7 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.SourcePageRequest; @@ -39,7 +40,7 @@ public interface StreamSourceApi { Call> createSource(@Body SourceRequest request); @POST("source/batchSave") - Call>> batchCreateSource(@Body List requestList); + Call>> batchCreateSource(@Body List requestList); @POST("source/update") Call> updateSource(@Body SourceRequest request); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java new file mode 100644 index 00000000000..140dfb09bc4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.common; + +import org.apache.inlong.manager.common.enums.OperationTarget; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BatchResult { + + // Unique identification of operation target + private String uniqueKey; + private boolean success; + private OperationTarget operationTarget; + private String errMsg; + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index 483f55d8296..959f46edf14 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.group; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -67,7 +68,7 @@ String save(@Valid @NotNull(message = "inlong group request cannot be null") Inl * @param operator name of operator * @return inlong group id list after saving */ - List batchSave( + List batchSave( @Valid @NotNull(message = "inlong group request list cannot be null") List groupRequestList, String operator); /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 9304747ea39..9c4f85baac7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.ProcessName; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -42,6 +43,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -218,13 +220,24 @@ public String save(InlongGroupRequest request, UserInfo opInfo) { @Override @Transactional(rollbackFor = Throwable.class) - public List batchSave(List groupRequestList, String operator) { - List groupIdList = new ArrayList<>(); + public List batchSave(List groupRequestList, String operator) { + List resultList = new ArrayList<>(); for (InlongGroupRequest groupRequest : groupRequestList) { - String groupId = this.save(groupRequest, operator); - groupIdList.add(groupId); + BatchResult result = BatchResult.builder() + .uniqueKey(groupRequest.getInlongGroupId()) + .operationTarget(OperationTarget.GROUP) + .build(); + try { + this.save(groupRequest, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save inlong group for groupId={}", groupRequest.getInlongGroupId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); } - return groupIdList; + return resultList; } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index bf14d09b454..79da89d5128 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.sink; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -66,7 +67,7 @@ public interface StreamSinkService { * @param operator name of operator * @return sink id list after saving */ - List batchSave(List requestList, String operator); + List batchSave(List requestList, String operator); /** * Get stream sink info based on id. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index b916fc32d17..e97282a65ba 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -20,6 +20,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; @@ -34,6 +35,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -235,11 +237,24 @@ public Integer save(SinkRequest request, UserInfo opInfo) { } @Override - public List batchSave(List requestList, String operator) { - List resultList = new ArrayList<>(); + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); for (SinkRequest request : requestList) { - int id = this.save(request, operator); - resultList.add(id); + BatchResult result = BatchResult.builder() + .uniqueKey(request.getSinkName() + "-" + request.getInlongGroupId() + "-" + + request.getInlongStreamId()) + .operationTarget(OperationTarget.SINK) + .build(); + try { + this.save(request, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save save source info for sinkName={}, groupId={}, streamId={}", + request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); } return resultList; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index b4cd8b34023..879ca5e0124 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.source; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; @@ -59,7 +60,7 @@ public interface StreamSourceService { * @param operator Operator's name. * @return source id list after saving. */ - List batchSave(List requestList, String operator); + List batchSave(List requestList, String operator); /** * Query source information based on id diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index b57f108e6bb..37795e8b2fa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -33,6 +34,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -163,11 +165,24 @@ public Integer save(SourceRequest request, UserInfo opInfo) { } @Override - public List batchSave(List requestList, String operator) { - List resultList = new ArrayList<>(); + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); for (SourceRequest request : requestList) { - int id = this.save(request, operator); - resultList.add(id); + BatchResult result = BatchResult.builder() + .uniqueKey(request.getSourceName() + "-" + request.getInlongGroupId() + "-" + + request.getInlongStreamId()) + .operationTarget(OperationTarget.SOURCE) + .build(); + try { + this.save(request, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save save source info for sourceName={}, groupId={}, streamId={}", + request.getSourceName(), request.getInlongGroupId(), request.getInlongStreamId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); } return resultList; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java index 34bfe613281..e54ff475b51 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.stream; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.sink.AddFieldRequest; @@ -57,7 +58,7 @@ public interface InlongStreamService { * @param operator The name of operator. * @return Id list after successful save. */ - List batchSave(List requestList, String operator); + List batchSave(List requestList, String operator); /** * Save inlong stream information. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index a443512c9bd..21df28228fd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -20,6 +20,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.tool.excel.ExcelTool; @@ -35,6 +36,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -185,11 +187,23 @@ public Integer save(InlongStreamRequest request, String operator) { } @Override - public List batchSave(List requestList, String operator) { - List resultList = new ArrayList<>(); + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); for (InlongStreamRequest request : requestList) { - int id = this.save(request, operator); - resultList.add(id); + BatchResult result = BatchResult.builder() + .uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId()) + .operationTarget(OperationTarget.STREAM) + .build(); + try { + this.save(request, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save inlong stream for groupId={}, streamId={}", request.getInlongGroupId(), + request.getInlongStreamId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); } return resultList; } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 55cb699a503..da69535631b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -80,7 +81,7 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Inlon @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.GROUP) @ApiOperation(value = "Batch Save inlong group") - public Response> batchSave( + public Response> batchSave( @Validated(SaveValidation.class) @RequestBody List groupRequestList) { String operator = LoginUserUtils.getLoginUser().getName(); return Response.success(groupService.batchSave(groupRequestList, operator)); diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java index d78533a48d5..46a195bce0d 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.tool.excel.ExcelTool; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; @@ -89,7 +90,7 @@ public Response save(@RequestBody InlongStreamRequest request) { @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Batch save inlong stream") - public Response> batchSave(@RequestBody List requestList) { + public Response> batchSave(@RequestBody List requestList) { List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); return Response.success(result); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index 2e530fbc64b..7869758cdb3 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.UpdateByIdValidation; import org.apache.inlong.manager.common.validation.UpdateByKeyValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -69,7 +70,7 @@ public Response save(@Validated @RequestBody SinkRequest request) { @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) @ApiOperation(value = "Batch save stream sink") - public Response> batchSave(@Validated @RequestBody List requestList) { + public Response> batchSave(@Validated @RequestBody List requestList) { return Response.success(sinkService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index 99fe165329b..e410af5c1ed 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; @@ -67,7 +68,7 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Sour @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SOURCE) @ApiOperation(value = "Batch save stream source") - public Response> batchSave( + public Response> batchSave( @Validated(SaveValidation.class) @RequestBody List requestList) { return Response.success(sourceService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java index dc08ea62819..3ee366910db 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -91,7 +92,7 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Inlon @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.GROUP) @ApiOperation(value = "Batch Save inlong group") - public Response> batchSave( + public Response> batchSave( @Validated(SaveValidation.class) @RequestBody List groupRequestList) { String operator = LoginUserUtils.getLoginUser().getName(); return Response.success(groupService.batchSave(groupRequestList, operator)); diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java index 81bef8121ff..903e730280f 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.sink.AddFieldRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; @@ -108,8 +109,8 @@ public Response save(@RequestBody InlongStreamRequest request) { @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Batch save inlong stream") - public Response> batchSave(@RequestBody List requestList) { - List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); + public Response> batchSave(@RequestBody List requestList) { + List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); return Response.success(result); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java index b44076cd835..1d570033df2 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.UpdateByIdValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.sink.SinkPageRequest; import org.apache.inlong.manager.pojo.sink.SinkRequest; @@ -86,7 +87,7 @@ public Response save(@Validated @RequestBody SinkRequest request) { @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) @ApiOperation(value = "Batch save stream sink") - public Response> batchSave(@Validated @RequestBody List requestList) { + public Response> batchSave(@Validated @RequestBody List requestList) { return Response.success(sinkService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java index c8097ddeaba..64e98a58bc4 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.SourcePageRequest; @@ -85,7 +86,7 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Sour @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SOURCE) @ApiOperation(value = "Batch save stream source") - public Response> batchSave( + public Response> batchSave( @Validated(SaveValidation.class) @RequestBody List requestList) { return Response.success(sourceService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); } From 90b5d0af31e0c1be581bb7b0735fa0604a3c2cf4 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Tue, 16 Apr 2024 22:15:09 +0800 Subject: [PATCH 7/9] [INLONG-9995][Manager] Fix error --- .../inlong/manager/web/controller/InlongStreamController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java index 46a195bce0d..c8c6c51186b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java @@ -91,7 +91,7 @@ public Response save(@RequestBody InlongStreamRequest request) { @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Batch save inlong stream") public Response> batchSave(@RequestBody List requestList) { - List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); + List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); return Response.success(result); } From 9943313f81ac542b8e66b53c7660cec4dd572bd6 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 17 Apr 2024 10:45:02 +0800 Subject: [PATCH 8/9] [INLONG-9995][Manager] Fix error --- .../manager/service/datatype/CsvDataTypeOperator.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java index cb1cededdbc..0a1118706ce 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java @@ -46,11 +46,10 @@ public List parseFields(String str, InlongStreamInfo streamInfo) thro separator = (char) Integer.parseInt(streamInfo.getDataSeparator()); } String[] bodys = StringUtils.split(str, separator); - if (bodys.length != fields.size()) { - return fields; - } for (int i = 0; i < bodys.length; i++) { - fields.get(i).setFieldValue(bodys[i]); + if (i < fields.size()) { + fields.get(i).setFieldValue(bodys[i]); + } } } catch (Exception e) { log.warn("parse fields failed for groupId = {}, streamId = {}", streamInfo.getInlongGroupId(), From 52f8c214eef7ee3dd9643d79d53d1a3e56d516a9 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Thu, 18 Apr 2024 18:13:23 +0800 Subject: [PATCH 9/9] [INLONG-9995][Manager] Fix error --- .../org/apache/inlong/manager/common/enums/OperationType.java | 2 +- .../inlong/manager/service/sink/StreamSinkServiceImpl.java | 4 ++-- .../manager/service/source/StreamSourceServiceImpl.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java index 7ccbe0ef830..ab39ad311d5 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java @@ -43,7 +43,7 @@ public enum OperationType { public static OperationType forOperationType(String type) { for (OperationType operationType : values()) { - if (operationType.name().equals(type)) { + if (operationType.name().equalsIgnoreCase(type)) { return operationType; } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index e97282a65ba..4d7fd556ace 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -241,8 +241,8 @@ public List batchSave(List requestList, String operato List resultList = new ArrayList<>(); for (SinkRequest request : requestList) { BatchResult result = BatchResult.builder() - .uniqueKey(request.getSinkName() + "-" + request.getInlongGroupId() + "-" - + request.getInlongStreamId()) + .uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId() + "-" + + request.getSinkName()) .operationTarget(OperationTarget.SINK) .build(); try { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 37795e8b2fa..c252a6a00cd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -169,8 +169,8 @@ public List batchSave(List requestList, String opera List resultList = new ArrayList<>(); for (SourceRequest request : requestList) { BatchResult result = BatchResult.builder() - .uniqueKey(request.getSourceName() + "-" + request.getInlongGroupId() + "-" - + request.getInlongStreamId()) + .uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId() + "-" + + request.getSourceName()) .operationTarget(OperationTarget.SOURCE) .build(); try {