Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9995][Manager] Support batch saving of group information and other operations #9996

Merged
merged 9 commits into from
Apr 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public InlongGroupContext context(String credentials) throws Exception {
@Override
public InlongGroupContext init() throws Exception {
InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
WorkflowResult initWorkflowResult = groupClient.initInlongGroup(groupInfo.genRequest());
WorkflowResult initWorkflowResult = groupClient.startProcess(groupInfo.genRequest());
List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
Preconditions.expectNotEmpty(taskViews, "init inlong group info failed");
TaskResponse taskView = taskViews.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
Expand All @@ -45,6 +46,7 @@
import retrofit2.Call;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
Expand Down Expand Up @@ -167,6 +169,16 @@ public String createGroup(InlongGroupRequest groupInfo) {
return response.getData();
}

/**
* Batch create inlong group
*/
public List<BatchResult> batchCreateGroup(List<InlongGroupRequest> groupRequestList) {
Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList));
ClientUtils.assertRespSuccess(response);
return response.getData();
}

/**
* Update inlong group info
*
Expand All @@ -186,9 +198,18 @@ public boolean resetGroup(InlongGroupResetRequest resetRequest) {
return response.getData();
}

public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
public WorkflowResult startProcess(InlongGroupRequest groupInfo) {
Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
inlongGroupApi.startProcess(groupInfo.getInlongGroupId()));
ClientUtils.assertRespSuccess(responseBody);
return responseBody.getData();
}

public WorkflowResult batchStartProcess(List<InlongGroupRequest> groupRequestList) {
List<String> groupIdList = groupRequestList.stream().map(InlongGroupRequest::getInlongGroupId).collect(
Collectors.toList());
Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
inlongGroupApi.batchStartProcess(groupIdList));
ClientUtils.assertRespSuccess(responseBody);
return responseBody.getData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
Expand Down Expand Up @@ -58,6 +59,16 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) {
return response.getData();
}

/**
* Batch create inlong stream.
*/
public List<BatchResult> batchCreateStreamInfo(List<InlongStreamInfo> streamInfos) {
Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos));
ClientUtils.assertRespSuccess(response);
return response.getData();
}

/**
* Query whether the inlong stream ID exists
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.manager.client.api.service.StreamSinkApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
Expand Down Expand Up @@ -54,6 +55,12 @@ public Integer createSink(SinkRequest sinkRequest) {
return response.getData();
}

public List<BatchResult> batchCreateSink(List<SinkRequest> sinkRequests) {
Response<List<BatchResult>> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests));
ClientUtils.assertRespSuccess(response);
return response.getData();
}

/**
* Delete stream sink info by ID.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
Expand Down Expand Up @@ -52,6 +53,16 @@ public Integer createSource(SourceRequest request) {
return response.getData();
}

/**
* Batch create inlong stream source.
*/
public List<BatchResult> batchCreateSource(List<SourceRequest> requestList) {
Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList));
ClientUtils.assertRespSuccess(response);
return response.getData();
}

/**
* List stream sources by the given groupId and streamId.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
Expand Down Expand Up @@ -50,11 +51,17 @@ public interface InlongGroupApi {
@POST("group/save")
Call<Response<String>> createGroup(@Body InlongGroupRequest request);

@POST("group/batchSave")
Call<Response<List<BatchResult>>> batchCreateGroup(@Body List<InlongGroupRequest> requestList);

@POST("group/update")
Call<Response<String>> updateGroup(@Body InlongGroupRequest request);

@POST("group/startProcess/{id}")
Call<Response<WorkflowResult>> initInlongGroup(@Path("id") String id);
Call<Response<WorkflowResult>> startProcess(@Path("id") String id);

@POST("group/batchStartProcess/{id}")
Call<Response<WorkflowResult>> batchStartProcess(@Body List<String> groupIdList);

@POST("group/suspendProcessAsync/{id}")
Call<Response<String>> suspendProcessAsync(@Path("id") String id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
Expand All @@ -41,6 +42,9 @@ public interface InlongStreamApi {
@POST("stream/save")
Call<Response<Integer>> createStream(@Body InlongStreamInfo stream);

@POST("stream/batchSave")
Call<Response<List<BatchResult>>> batchCreateStream(@Body List<InlongStreamInfo> streamInfos);

@GET("stream/exist/{groupId}/{streamId}")
Call<Response<Boolean>> isStreamExists(@Path("groupId") String groupId, @Path("streamId") String streamId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
Expand All @@ -41,6 +42,9 @@ public interface StreamSinkApi {
@POST("sink/save")
Call<Response<Integer>> save(@Body SinkRequest request);

@POST("sink/batchSave")
Call<Response<List<BatchResult>>> batchSave(@Body List<SinkRequest> requestList);

@POST("sink/update")
Call<Response<Boolean>> updateById(@Body SinkRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
Expand All @@ -31,11 +32,16 @@
import retrofit2.http.Path;
import retrofit2.http.Query;

import java.util.List;

public interface StreamSourceApi {

@POST("source/save")
Call<Response<Integer>> createSource(@Body SourceRequest request);

@POST("source/batchSave")
Call<Response<List<BatchResult>>> batchCreateSource(@Body List<SourceRequest> requestList);

@POST("source/update")
Call<Response<Boolean>> updateSource(@Body SourceRequest request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,18 @@ public enum OperationType {

GET,

LIST
LIST,

SUSPEND,

START;

public static OperationType forOperationType(String type) {
for (OperationType operationType : values()) {
if (operationType.name().equals(type)) {
dockerzhang marked this conversation as resolved.
Show resolved Hide resolved
return operationType;
}
}
throw new IllegalArgumentException(String.format("Unsupported operation type for %s", type));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.common;

import org.apache.inlong.manager.common.enums.OperationTarget;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchResult {

// Unique identification of operation target
private String uniqueKey;
private boolean success;
private OperationTarget operationTarget;
private String errMsg;

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ public class ProcessResponse {
private List<TaskResponse> currentTasks;

@ApiModelProperty(value = "Extra information shown in the list")
private Map<String, Object> showInList;
private List<Map<String, Object>> showInList;

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ public class TaskResponse {
private Object extParams;

@ApiModelProperty(value = "Extra information shown in the list")
private Map<String, Object> showInList;
private List<Map<String, Object>> showInList;

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -56,12 +58,14 @@ public String getInlongGroupId() {
}

@Override
public Map<String, Object> showInList() {
public List<Map<String, Object>> showInList() {
List<Map<String, Object>> showInList = new ArrayList<>();
Map<String, Object> show = Maps.newHashMap();
if (consumeInfo != null) {
show.put("inlongGroupId", consumeInfo.getInlongGroupId());
show.put("consumerGroup", consumeInfo.getConsumerGroup());
}
return show;
showInList.add(show);
return showInList;
}
}
Loading
Loading