diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java index 9316760874d..7a991803dff 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java @@ -117,7 +117,7 @@ public InlongGroupContext context(String credentials) throws Exception { @Override public InlongGroupContext init() throws Exception { InlongGroupInfo groupInfo = this.groupContext.getGroupInfo(); - WorkflowResult initWorkflowResult = groupClient.initInlongGroup(groupInfo.genRequest()); + WorkflowResult initWorkflowResult = groupClient.startProcess(groupInfo.genRequest()); List taskViews = initWorkflowResult.getNewTasks(); Preconditions.expectNotEmpty(taskViews, "init inlong group info failed"); TaskResponse taskView = taskViews.get(0); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index c7462fd0b21..54b61f91c2a 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -45,6 +46,7 @@ import retrofit2.Call; import java.util.List; +import java.util.stream.Collectors; import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD; import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD; @@ -167,6 +169,16 @@ public String createGroup(InlongGroupRequest groupInfo) { return response.getData(); } + /** + * Batch create inlong group + */ + public List batchCreateGroup(List groupRequestList) { + Response> response = + ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * Update inlong group info * @@ -186,9 +198,18 @@ public boolean resetGroup(InlongGroupResetRequest resetRequest) { return response.getData(); } - public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) { + public WorkflowResult startProcess(InlongGroupRequest groupInfo) { + Response responseBody = ClientUtils.executeHttpCall( + inlongGroupApi.startProcess(groupInfo.getInlongGroupId())); + ClientUtils.assertRespSuccess(responseBody); + return responseBody.getData(); + } + + public WorkflowResult batchStartProcess(List groupRequestList) { + List groupIdList = groupRequestList.stream().map(InlongGroupRequest::getInlongGroupId).collect( + Collectors.toList()); Response responseBody = ClientUtils.executeHttpCall( - inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId())); + inlongGroupApi.batchStartProcess(groupIdList)); ClientUtils.assertRespSuccess(responseBody); return responseBody.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java index a71d7ac2b3a..89ca7eef687 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; @@ -58,6 +59,16 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) { return response.getData(); } + /** + * Batch create inlong stream. + */ + public List batchCreateStreamInfo(List streamInfos) { + Response> response = + ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * Query whether the inlong stream ID exists * diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java index bb493f59668..c350900f067 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.client.api.service.StreamSinkApi; import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -54,6 +55,12 @@ public Integer createSink(SinkRequest sinkRequest) { return response.getData(); } + public List batchCreateSink(List sinkRequests) { + Response> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * Delete stream sink info by ID. */ diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java index 88a1ed7cc32..baa6a601ba8 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.SourcePageRequest; @@ -52,6 +53,16 @@ public Integer createSource(SourceRequest request) { return response.getData(); } + /** + * Batch create inlong stream source. + */ + public List batchCreateSource(List requestList) { + Response> response = + ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + /** * List stream sources by the given groupId and streamId. */ diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java index 9f9df342b2c..819d9802d9f 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -50,11 +51,17 @@ public interface InlongGroupApi { @POST("group/save") Call> createGroup(@Body InlongGroupRequest request); + @POST("group/batchSave") + Call>> batchCreateGroup(@Body List requestList); + @POST("group/update") Call> updateGroup(@Body InlongGroupRequest request); @POST("group/startProcess/{id}") - Call> initInlongGroup(@Path("id") String id); + Call> startProcess(@Path("id") String id); + + @POST("group/batchStartProcess/{id}") + Call> batchStartProcess(@Body List groupIdList); @POST("group/suspendProcessAsync/{id}") Call> suspendProcessAsync(@Path("id") String id); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java index 273c50fee97..6b47d8aeb23 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; @@ -41,6 +42,9 @@ public interface InlongStreamApi { @POST("stream/save") Call> createStream(@Body InlongStreamInfo stream); + @POST("stream/batchSave") + Call>> batchCreateStream(@Body List streamInfos); + @GET("stream/exist/{groupId}/{streamId}") Call> isStreamExists(@Path("groupId") String groupId, @Path("streamId") String streamId); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java index 2443db5aab1..75088a5eee7 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -41,6 +42,9 @@ public interface StreamSinkApi { @POST("sink/save") Call> save(@Body SinkRequest request); + @POST("sink/batchSave") + Call>> batchSave(@Body List requestList); + @POST("sink/update") Call> updateById(@Body SinkRequest request); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java index d383e30d86f..31fb815aff7 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.client.api.service; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.SourcePageRequest; @@ -31,11 +32,16 @@ import retrofit2.http.Path; import retrofit2.http.Query; +import java.util.List; + public interface StreamSourceApi { @POST("source/save") Call> createSource(@Body SourceRequest request); + @POST("source/batchSave") + Call>> batchCreateSource(@Body List requestList); + @POST("source/update") Call> updateSource(@Body SourceRequest request); diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java index 0215a694e9b..ab39ad311d5 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java @@ -35,5 +35,18 @@ public enum OperationType { GET, - LIST + LIST, + + SUSPEND, + + START; + + public static OperationType forOperationType(String type) { + for (OperationType operationType : values()) { + if (operationType.name().equalsIgnoreCase(type)) { + return operationType; + } + } + throw new IllegalArgumentException(String.format("Unsupported operation type for %s", type)); + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java new file mode 100644 index 00000000000..140dfb09bc4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.common; + +import org.apache.inlong.manager.common.enums.OperationTarget; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BatchResult { + + // Unique identification of operation target + private String uniqueKey; + private boolean success; + private OperationTarget operationTarget; + private String errMsg; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java index 215fbb3051f..059e8edb82d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java @@ -77,6 +77,6 @@ public class ProcessResponse { private List currentTasks; @ApiModelProperty(value = "Extra information shown in the list") - private Map showInList; + private List> showInList; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java index 519d6a7c168..06cae85145d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java @@ -89,6 +89,6 @@ public class TaskResponse { private Object extParams; @ApiModelProperty(value = "Extra information shown in the list") - private Map showInList; + private List> showInList; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java index 4b6792604c4..10db1197fcd 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java @@ -26,6 +26,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -56,12 +58,14 @@ public String getInlongGroupId() { } @Override - public Map showInList() { + public List> showInList() { + List> showInList = new ArrayList<>(); Map show = Maps.newHashMap(); if (consumeInfo != null) { show.put("inlongGroupId", consumeInfo.getInlongGroupId()); show.put("consumerGroup", consumeInfo.getConsumerGroup()); } - return show; + showInList.add(show); + return showInList; } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java index c3b7005dd9e..272052f514d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java @@ -22,13 +22,20 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; +import com.google.common.base.Joiner; import com.google.common.collect.Maps; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Apply inlong group process form @@ -39,15 +46,19 @@ public class ApplyGroupProcessForm extends BaseProcessForm { public static final String FORM_NAME = "ApplyGroupProcessForm"; - @ApiModelProperty(value = "Inlong group info", required = true) + @ApiModelProperty(value = "Inlong group info") private InlongGroupInfo groupInfo; @ApiModelProperty(value = "All inlong stream info under the inlong group, including the sink info") private List streamInfoList; + @ApiModelProperty(value = "Inlong group full info list") + private List groupFullInfoList; + @Override public void validate() throws FormValidateException { - Preconditions.expectNotNull(groupInfo, "inlong group info is empty"); + Preconditions.expectTrue(groupInfo != null || CollectionUtils.isNotEmpty(groupFullInfoList), + "inlong group info is empty"); } @Override @@ -57,14 +68,46 @@ public String getFormName() { @Override public String getInlongGroupId() { - return groupInfo.getInlongGroupId(); + if (groupInfo != null) { + return groupInfo.getInlongGroupId(); + } + List groupIdList = groupFullInfoList.stream().map(v -> { + InlongGroupInfo groupInfo = v.getGroupInfo(); + return groupInfo.getInlongGroupId(); + }).collect(Collectors.toList()); + return Joiner.on(",").join(groupIdList); } @Override - public Map showInList() { + public List> showInList() { + List> showList = new ArrayList<>(); + if (groupInfo != null) { + addShowInfo(groupInfo, showList); + } + if (CollectionUtils.isNotEmpty(groupFullInfoList)) { + groupFullInfoList.forEach(groupFullInfo -> { + addShowInfo(groupFullInfo.getGroupInfo(), showList); + }); + } + return showList; + } + + private void addShowInfo(InlongGroupInfo groupInfo, List> showList) { Map show = Maps.newHashMap(); show.put("inlongGroupId", groupInfo.getInlongGroupId()); show.put("inlongGroupMode", groupInfo.getInlongGroupMode()); - return show; + showList.add(show); + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class GroupFullInfo { + + private InlongGroupInfo groupInfo; + + private List streamInfoList; + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java index d955a2c5a66..1dd1294653b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java @@ -26,6 +26,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,11 +62,13 @@ public String getInlongGroupId() { } @Override - public Map showInList() { + public List> showInList() { + List> showInList = new ArrayList<>(); Map show = new HashMap<>(); show.put("inlongGroupId", groupInfo.getInlongGroupId()); show.put("groupOperateType", this.groupOperateType); - return show; + showInList.add(show); + return showInList; } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java index 95f4d1451c4..bcf1278e971 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.List; import java.util.Map; /** @@ -47,7 +48,7 @@ default String getTitle() { /** * Field data displayed in the process list. */ - default Map showInList() { + default List> showInList() { return null; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java index bcbcc571c01..e7fbbc37ff8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java @@ -22,10 +22,16 @@ import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; /** @@ -37,15 +43,19 @@ public class InlongGroupApproveForm extends BaseTaskForm { public static final String FORM_NAME = "InlongGroupApproveForm"; - @ApiModelProperty(value = "Inlong group approve info", required = true) + @ApiModelProperty(value = "Inlong group approve info") private InlongGroupApproveRequest groupApproveInfo; @ApiModelProperty(value = "All inlong stream info under the inlong group, including the sink info") private List streamApproveInfoList; + @ApiModelProperty(value = "Inlong group approve full info list") + private List groupApproveFullInfoList; + @Override public void validate() throws FormValidateException { - Preconditions.expectNotNull(groupApproveInfo, "inlong group approve info is empty"); + Preconditions.expectTrue(groupApproveInfo != null || CollectionUtils.isNotEmpty(groupApproveFullInfoList), + "inlong group approve info is empty"); } @Override @@ -53,4 +63,28 @@ public String getFormName() { return FORM_NAME; } + @JsonIgnore + public List getApproveFullRequest() { + List result = new ArrayList<>(); + if (groupApproveInfo != null) { + result.add(new GroupApproveFullRequest(groupApproveInfo, streamApproveInfoList)); + } + if (CollectionUtils.isNotEmpty(groupApproveFullInfoList)) { + result.addAll(groupApproveFullInfoList); + } + return result; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class GroupApproveFullRequest { + + private InlongGroupApproveRequest groupApproveInfo; + + private List streamApproveInfoList; + + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java index cb1cededdbc..0a1118706ce 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java @@ -46,11 +46,10 @@ public List parseFields(String str, InlongStreamInfo streamInfo) thro separator = (char) Integer.parseInt(streamInfo.getDataSeparator()); } String[] bodys = StringUtils.split(str, separator); - if (bodys.length != fields.size()) { - return fields; - } for (int i = 0; i < bodys.length; i++) { - fields.get(i).setFieldValue(bodys[i]); + if (i < fields.size()) { + fields.get(i).setFieldValue(bodys[i]); + } } } catch (Exception e) { log.warn("parse fields failed for groupId = {}, streamId = {}", streamInfo.getInlongGroupId(), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java index dbb78584b7d..8052315d1a1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java @@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.workflow.TaskResponse; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm; +import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm.GroupFullInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.service.workflow.WorkflowService; @@ -52,6 +53,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -116,6 +118,24 @@ public WorkflowResult startProcess(String groupId, String operator) { return result; } + public WorkflowResult batchStartProcess(List groupIdList, String operator) { + for (String groupId : groupIdList) { + LOGGER.info("begin to start approve process for groupId={} by operator={}", groupId, operator); + + groupService.updateStatus(groupId, GroupStatus.TO_BE_APPROVAL.getCode(), operator); + } + ApplyGroupProcessForm form = genApplyGroupProcessForm(groupIdList); + WorkflowResult result = workflowService.start(ProcessName.APPLY_GROUP_PROCESS, operator, form); + List tasks = result.getNewTasks(); + if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) { + throw new BusinessException(ErrorCodeEnum.WORKFLOW_START_RECORD_FAILED, + String.format("failed to start inlong group for groupId=%s", groupIdList)); + } + + LOGGER.info("success to start approve process for groupId={} by operator={}", groupIdList, operator); + return result; + } + /** * Suspend InlongGroup in an asynchronous way. * @@ -368,6 +388,21 @@ private ApplyGroupProcessForm genApplyGroupProcessForm(String groupId) { return form; } + private ApplyGroupProcessForm genApplyGroupProcessForm(List groupIdList) { + ApplyGroupProcessForm form = new ApplyGroupProcessForm(); + List groupFullInfoList = new ArrayList<>(); + for (String groupId : groupIdList) { + InlongGroupInfo groupInfo = groupService.get(groupId); + List infoList = streamService.listBriefWithSink(groupInfo.getInlongGroupId()); + GroupFullInfo groupFullInfo = new GroupFullInfo(); + groupFullInfo.setGroupInfo(groupInfo); + groupFullInfo.setStreamInfoList(infoList); + groupFullInfoList.add(groupFullInfo); + } + form.setGroupFullInfoList(groupFullInfoList); + return form; + } + /** * Generate the form of [Group Resource Workflow] */ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index d2f8b090487..959f46edf14 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.group; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -60,6 +61,16 @@ String save(@Valid @NotNull(message = "inlong group request cannot be null") Inl String save(@Valid @NotNull(message = "inlong group request cannot be null") InlongGroupRequest groupInfo, UserInfo opInfo); + /** + * Batch save inlong group info. + * + * @param groupRequestList group request list need to save + * @param operator name of operator + * @return inlong group id list after saving + */ + List batchSave( + @Valid @NotNull(message = "inlong group request list cannot be null") List groupRequestList, + String operator); /** * Query whether the specified group id exists * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 2d87b74995a..9c4f85baac7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.ProcessName; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -42,6 +43,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -216,6 +218,28 @@ public String save(InlongGroupRequest request, UserInfo opInfo) { return groupId; } + @Override + @Transactional(rollbackFor = Throwable.class) + public List batchSave(List groupRequestList, String operator) { + List resultList = new ArrayList<>(); + for (InlongGroupRequest groupRequest : groupRequestList) { + BatchResult result = BatchResult.builder() + .uniqueKey(groupRequest.getInlongGroupId()) + .operationTarget(OperationTarget.GROUP) + .build(); + try { + this.save(groupRequest, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save inlong group for groupId={}", groupRequest.getInlongGroupId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); + } + return resultList; + } + @Override public Boolean exist(String groupId) { Preconditions.expectNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java index 819f455e599..63e671f33ca 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest; import org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm; +import org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm.GroupApproveFullRequest; import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.workflow.WorkflowContext; @@ -32,6 +33,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.List; + /** * The listener for modifying InlongGroup info after the application InlongGroup process is approved. */ @@ -57,15 +60,18 @@ public TaskEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { InlongGroupApproveForm form = (InlongGroupApproveForm) context.getActionContext().getForm(); - InlongGroupApproveRequest approveInfo = form.getGroupApproveInfo(); - String groupId = approveInfo.getInlongGroupId(); - log.info("begin to execute AfterApprovedTaskListener for groupId={}", groupId); + List approveRequestList = form.getApproveFullRequest(); + for (GroupApproveFullRequest request : approveRequestList) { + InlongGroupApproveRequest approveInfo = request.getGroupApproveInfo(); + String groupId = approveInfo.getInlongGroupId(); + log.info("begin to execute AfterApprovedTaskListener for groupId={}", groupId); - // save the inlong group and other info after approval - groupService.updateAfterApprove(approveInfo, context.getOperator()); - streamService.updateAfterApprove(form.getStreamApproveInfoList(), context.getOperator()); + // save the inlong group and other info after approval + groupService.updateAfterApprove(approveInfo, context.getOperator()); + streamService.updateAfterApprove(request.getStreamApproveInfoList(), context.getOperator()); - log.info("success to execute AfterApprovedTaskListener for groupId={}", groupId); + log.info("success to execute AfterApprovedTaskListener for groupId={}", groupId); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java index 4d285488f60..f74153f6a82 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.group.apply; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.enums.ProcessName; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -37,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; import java.util.List; /** @@ -63,21 +65,22 @@ public ProcessEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm(); - String groupId = form.getInlongGroupId(); - log.info("begin to execute ApproveApplyProcessListener for groupId={}", groupId); + List groupList = Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA)); + for (String groupId : groupList) { + log.info("begin to execute ApproveApplyProcessListener for groupId={}", groupId); - InlongGroupInfo groupInfo = groupService.get(groupId); - GroupResourceProcessForm processForm = new GroupResourceProcessForm(); - processForm.setGroupInfo(groupInfo); - String username = context.getOperator(); - List streamList = streamService.list(groupId); - processForm.setStreamInfos(streamList); + InlongGroupInfo groupInfo = groupService.get(groupId); + GroupResourceProcessForm processForm = new GroupResourceProcessForm(); + processForm.setGroupInfo(groupInfo); + List streamList = streamService.list(groupId); + processForm.setStreamInfos(streamList); - // may run for long time, make it async processing - UserInfo userInfo = LoginUserUtils.getLoginUser(); - EXECUTOR_SERVICE.execute( - () -> workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, processForm)); - log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId); + // may run for long time, make it async processing + UserInfo userInfo = LoginUserUtils.getLoginUser(); + EXECUTOR_SERVICE.execute( + () -> workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, processForm)); + log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java index c6b2871ba86..5f03ebbf0fa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.group.apply; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -31,6 +32,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; +import java.util.List; import java.util.Objects; /** @@ -54,22 +57,24 @@ public ProcessEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm(); - String groupId = form.getInlongGroupId(); - log.info("begin to execute CancelApplyProcessListener for groupId={}", groupId); + List groupList = Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA)); + for (String groupId : groupList) { + log.info("begin to execute CancelApplyProcessListener for groupId={}", groupId); - // only the [ToBeApproval] status allowed the canceling operation - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); - if (entity == null) { - throw new WorkflowListenerException("InlongGroup not found with groupId=" + groupId); - } - if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { - throw new WorkflowListenerException(String.format("Current status [%s] was not allowed to cancel", - GroupStatus.forCode(entity.getStatus()))); - } - String operator = context.getOperator(); - groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), operator); + // only the [ToBeApproval] status allowed the canceling operation + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + if (entity == null) { + throw new WorkflowListenerException("InlongGroup not found with groupId=" + groupId); + } + if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { + throw new WorkflowListenerException(String.format("Current status [%s] was not allowed to cancel", + GroupStatus.forCode(entity.getStatus()))); + } + String operator = context.getOperator(); + groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), operator); - log.info("success to execute CancelApplyProcessListener for groupId={}", groupId); + log.info("success to execute CancelApplyProcessListener for groupId={}", groupId); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java index 4b387e6da91..8ad8bd766ae 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.group.apply; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -32,6 +33,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; +import java.util.List; import java.util.Objects; /** @@ -54,21 +57,23 @@ public ProcessEvent event() { @Override public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException { ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm(); - String groupId = form.getInlongGroupId(); - log.info("begin to execute RejectApplyProcessListener for groupId={}", groupId); + List groupList = Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA)); + for (String groupId : groupList) { + log.info("begin to execute RejectApplyProcessListener for groupId={}", groupId); - // only the [TO_BE_APPROVAL] status allowed the rejecting operation - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); - if (entity == null) { - throw new WorkflowListenerException("inlong group not found with groupId=" + groupId); - } - if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { - throw new WorkflowListenerException("current status was not allowed to reject inlong group"); - } + // only the [TO_BE_APPROVAL] status allowed the rejecting operation + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + if (entity == null) { + throw new WorkflowListenerException("inlong group not found with groupId=" + groupId); + } + if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { + throw new WorkflowListenerException("current status was not allowed to reject inlong group"); + } - // after reject, update InlongGroup status to [APPROVE_REJECTED] - String username = context.getOperator(); - groupService.updateStatus(groupId, GroupStatus.APPROVE_REJECTED.getCode(), username); + // after reject, update InlongGroup status to [APPROVE_REJECTED] + String username = context.getOperator(); + groupService.updateStatus(groupId, GroupStatus.APPROVE_REJECTED.getCode(), username); + } return ListenerResult.success(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index 06f97bb762a..79da89d5128 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.sink; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -59,6 +60,15 @@ public interface StreamSinkService { */ Integer save(SinkRequest request, UserInfo opInfo); + /** + * Batch save the sink info. + * + * @param requestList sink request list need to save + * @param operator name of operator + * @return sink id list after saving + */ + List batchSave(List requestList, String operator); + /** * Get stream sink info based on id. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 31092bdb0b7..4d7fd556ace 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -20,6 +20,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; @@ -34,6 +35,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -234,6 +236,29 @@ public Integer save(SinkRequest request, UserInfo opInfo) { return id; } + @Override + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); + for (SinkRequest request : requestList) { + BatchResult result = BatchResult.builder() + .uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId() + "-" + + request.getSinkName()) + .operationTarget(OperationTarget.SINK) + .build(); + try { + this.save(request, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save save source info for sinkName={}, groupId={}, streamId={}", + request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); + } + return resultList; + } + @Override public StreamSink get(Integer id) { if (id == null) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index 0dd4decd28f..879ca5e0124 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.source; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; @@ -52,6 +53,15 @@ public interface StreamSourceService { */ Integer save(SourceRequest request, UserInfo opInfo); + /** + * Batch save the source information + * + * @param requestList Source request list. + * @param operator Operator's name. + * @return source id list after saving. + */ + List batchSave(List requestList, String operator); + /** * Query source information based on id * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 2d3855b05e0..c252a6a00cd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -33,6 +34,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -162,6 +164,29 @@ public Integer save(SourceRequest request, UserInfo opInfo) { return sourceOperator.saveOpt(request, groupEntity.getStatus(), opInfo.getName()); } + @Override + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); + for (SourceRequest request : requestList) { + BatchResult result = BatchResult.builder() + .uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId() + "-" + + request.getSourceName()) + .operationTarget(OperationTarget.SOURCE) + .build(); + try { + this.save(request, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save save source info for sourceName={}, groupId={}, streamId={}", + request.getSourceName(), request.getInlongGroupId(), request.getInlongStreamId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); + } + return resultList; + } + @Override public StreamSource get(Integer id) { if (id == null) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java index 328e9ae73cc..e54ff475b51 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.stream; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.sink.AddFieldRequest; @@ -50,6 +51,15 @@ public interface InlongStreamService { */ Integer save(InlongStreamRequest request, String operator); + /** + * Batch save inlong stream information. + * + * @param requestList Inlong stream information list. + * @param operator The name of operator. + * @return Id list after successful save. + */ + List batchSave(List requestList, String operator); + /** * Save inlong stream information. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index b9d8ef8d364..21df28228fd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -20,6 +20,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.OperationTarget; import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.tool.excel.ExcelTool; @@ -35,6 +36,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; @@ -184,6 +186,28 @@ public Integer save(InlongStreamRequest request, String operator) { return streamEntity.getId(); } + @Override + public List batchSave(List requestList, String operator) { + List resultList = new ArrayList<>(); + for (InlongStreamRequest request : requestList) { + BatchResult result = BatchResult.builder() + .uniqueKey(request.getInlongGroupId() + "-" + request.getInlongStreamId()) + .operationTarget(OperationTarget.STREAM) + .build(); + try { + this.save(request, operator); + result.setSuccess(true); + } catch (Exception e) { + LOGGER.error("failed to save inlong stream for groupId={}, streamId={}", request.getInlongGroupId(), + request.getInlongStreamId(), e); + result.setSuccess(false); + result.setErrMsg(e.getMessage()); + } + resultList.add(result); + } + return resultList; + } + @Override public Integer save(InlongStreamRequest request, UserInfo opInfo) { InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java index 0863b8e6661..739b68fcab5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java @@ -244,7 +244,7 @@ private Consumer addCurrentTask(TaskRequest query) { }; } - private Map getShowInList(WorkflowProcessEntity processEntity) { + private List> getShowInList(WorkflowProcessEntity processEntity) { WorkflowProcess process = processDefService.getByName(processEntity.getName()); if (process == null || process.getFormClass() == null) { return null; @@ -270,7 +270,7 @@ private void addShowInListForEachTask(List taskList) { query.setIdList(list); List processEntities = queryService.listProcessEntity(query); - Map> processShowInListMap = Maps.newHashMap(); + Map>> processShowInListMap = Maps.newHashMap(); processEntities.forEach(entity -> processShowInListMap.put(entity.getId(), getShowInList(entity))); taskList.forEach(task -> task.setShowInList(processShowInListMap.get(task.getProcessId()))); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 0ad34ae506d..da69535631b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; @@ -77,6 +78,15 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Inlon return Response.success(groupService.save(groupRequest, operator)); } + @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.GROUP) + @ApiOperation(value = "Batch Save inlong group") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List groupRequestList) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupService.batchSave(groupRequestList, operator)); + } + @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET) @ApiOperation(value = "Is the inlong group id exists") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) @@ -155,14 +165,24 @@ public Response deleteAsync(@PathVariable String groupId) { @RequestMapping(value = "/group/startProcess/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Start inlong approval process") + @OperationLog(operation = OperationType.START, operationTarget = OperationTarget.GROUP) @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) public Response startProcess(@PathVariable String groupId) { String operator = LoginUserUtils.getLoginUser().getName(); return Response.success(groupProcessOperation.startProcess(groupId, operator)); } + @RequestMapping(value = "/group/batchStartProcess", method = RequestMethod.POST) + @ApiOperation(value = "Batch start inlong approval process") + @OperationLog(operation = OperationType.START, operationTarget = OperationTarget.GROUP) + public Response batchStartProcess(@RequestBody List groupIdList) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupProcessOperation.batchStartProcess(groupIdList, operator)); + } + @RequestMapping(value = "/group/suspendProcess/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Suspend inlong group process") + @OperationLog(operation = OperationType.SUSPEND, operationTarget = OperationTarget.GROUP) @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) public Response suspendProcess(@PathVariable String groupId) { String operator = LoginUserUtils.getLoginUser().getName(); @@ -178,6 +198,7 @@ public Response restartProcess(@PathVariable String groupId) { } @RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method = RequestMethod.POST) + @OperationLog(operation = OperationType.SUSPEND, operationTarget = OperationTarget.GROUP) @ApiOperation(value = "Suspend inlong group process") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) public Response suspendProcessAsync(@PathVariable String groupId) { diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java index da747818cee..c8c6c51186b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.tool.excel.ExcelTool; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; @@ -86,6 +87,14 @@ public Response save(@RequestBody InlongStreamRequest request) { return Response.success(result); } + @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) + @ApiOperation(value = "Batch save inlong stream") + public Response> batchSave(@RequestBody List requestList) { + List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + @RequestMapping(value = "/stream/exist/{groupId}/{streamId}", method = RequestMethod.GET) @ApiOperation(value = "Is the inlong stream exists") @ApiImplicitParams({ @@ -154,6 +163,7 @@ public Response startProcess(@PathVariable String groupId, @PathVariabl } @RequestMapping(value = "/stream/suspendProcess/{groupId}/{streamId}", method = RequestMethod.POST) + @OperationLog(operation = OperationType.SUSPEND, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Suspend inlong stream process") @ApiImplicitParams({ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index e8d0ef93ab3..7869758cdb3 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.UpdateByIdValidation; import org.apache.inlong.manager.common.validation.UpdateByKeyValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.common.UpdateResult; @@ -66,6 +67,13 @@ public Response save(@Validated @RequestBody SinkRequest request) { return Response.success(sinkService.save(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) + @ApiOperation(value = "Batch save stream sink") + public Response> batchSave(@Validated @RequestBody List requestList) { + return Response.success(sinkService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/get/{id}", method = RequestMethod.GET) @ApiOperation(value = "Get stream sink") @OperationLog(operation = OperationType.GET, operationTarget = OperationTarget.SINK) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index 8e7645f993c..e410af5c1ed 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; @@ -44,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + /** * Stream source control layer */ @@ -62,6 +65,14 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Sour return Response.success(sourceService.save(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SOURCE) + @ApiOperation(value = "Batch save stream source") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List requestList) { + return Response.success(sourceService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/source/get/{id}", method = RequestMethod.GET) @ApiOperation(value = "Get stream source") @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java index 8dca1ad748b..3ee366910db 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -88,6 +89,15 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Inlon return Response.success(groupService.save(groupRequest, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.GROUP) + @ApiOperation(value = "Batch Save inlong group") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List groupRequestList) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupService.batchSave(groupRequestList, operator)); + } + @RequestMapping(value = "/group/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.GROUP) @ApiOperation(value = "Update inlong group") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java index 893e0e589b2..903e730280f 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.sink.AddFieldRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; @@ -105,6 +106,14 @@ public Response save(@RequestBody InlongStreamRequest request) { return Response.success(streamService.save(request, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.STREAM) + @ApiOperation(value = "Batch save inlong stream") + public Response> batchSave(@RequestBody List requestList) { + List result = streamService.batchSave(requestList, LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + @RequestMapping(value = "/stream/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.STREAM) @ApiOperation(value = "Update inlong stream") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java index d69f0895f43..1d570033df2 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.UpdateByIdValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.sink.SinkPageRequest; import org.apache.inlong.manager.pojo.sink.SinkRequest; @@ -83,6 +84,13 @@ public Response save(@Validated @RequestBody SinkRequest request) { return Response.success(sinkService.save(request, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SINK) + @ApiOperation(value = "Batch save stream sink") + public Response> batchSave(@Validated @RequestBody List requestList) { + return Response.success(sinkService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SINK) @ApiOperation(value = "Update stream sink") diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java index 6bd7ab19ddb..64e98a58bc4 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.pojo.source.SourcePageRequest; @@ -43,6 +44,8 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + /** * Open InLong Stream Source controller */ @@ -80,6 +83,14 @@ public Response save(@Validated(SaveValidation.class) @RequestBody Sour return Response.success(sourceService.save(request, LoginUserUtils.getLoginUser())); } + @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST) + @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SOURCE) + @ApiOperation(value = "Batch save stream source") + public Response> batchSave( + @Validated(SaveValidation.class) @RequestBody List requestList) { + return Response.success(sourceService.batchSave(requestList, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/source/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SOURCE) @ApiOperation(value = "Update stream source")