Skip to content

Commit

Permalink
[INLONG-9995][Manager] Fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Apr 16, 2024
1 parent 00df16e commit 2ebabf7
Show file tree
Hide file tree
Showing 25 changed files with 160 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
Expand Down Expand Up @@ -171,8 +172,8 @@ public String createGroup(InlongGroupRequest groupInfo) {
/**
* Batch create inlong group
*/
public List<String> batchCreateGroup(List<InlongGroupRequest> groupRequestList) {
Response<List<String>> response =
public List<BatchResult> batchCreateGroup(List<InlongGroupRequest> groupRequestList) {
Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList));
ClientUtils.assertRespSuccess(response);
return response.getData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
Expand Down Expand Up @@ -61,8 +62,9 @@ public Integer createStreamInfo(InlongStreamInfo streamInfo) {
/**
* Batch create inlong stream.
*/
public List<Integer> batchCreateStreamInfo(List<InlongStreamInfo> streamInfos) {
Response<List<Integer>> response = ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos));
public List<BatchResult> batchCreateStreamInfo(List<InlongStreamInfo> streamInfos) {
Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.manager.client.api.service.StreamSinkApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
Expand Down Expand Up @@ -54,8 +55,8 @@ public Integer createSink(SinkRequest sinkRequest) {
return response.getData();
}

public List<Integer> batchCreateSink(List<SinkRequest> sinkRequests) {
Response<List<Integer>> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests));
public List<BatchResult> batchCreateSink(List<SinkRequest> sinkRequests) {
Response<List<BatchResult>> response = ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
Expand Down Expand Up @@ -55,8 +56,9 @@ public Integer createSource(SourceRequest request) {
/**
* Batch create inlong stream source.
*/
public List<Integer> batchCreateSource(List<SourceRequest> requestList) {
Response<List<Integer>> response = ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList));
public List<BatchResult> batchCreateSource(List<SourceRequest> requestList) {
Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
Expand Down Expand Up @@ -51,7 +52,7 @@ public interface InlongGroupApi {
Call<Response<String>> createGroup(@Body InlongGroupRequest request);

@POST("group/batchSave")
Call<Response<List<String>>> batchCreateGroup(@Body List<InlongGroupRequest> requestList);
Call<Response<List<BatchResult>>> batchCreateGroup(@Body List<InlongGroupRequest> requestList);

@POST("group/update")
Call<Response<String>> updateGroup(@Body InlongGroupRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
Expand All @@ -42,7 +43,7 @@ public interface InlongStreamApi {
Call<Response<Integer>> createStream(@Body InlongStreamInfo stream);

@POST("stream/batchSave")
Call<Response<List<Integer>>> batchCreateStream(@Body List<InlongStreamInfo> streamInfos);
Call<Response<List<BatchResult>>> batchCreateStream(@Body List<InlongStreamInfo> streamInfos);

@GET("stream/exist/{groupId}/{streamId}")
Call<Response<Boolean>> isStreamExists(@Path("groupId") String groupId, @Path("streamId") String streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
Expand All @@ -42,7 +43,7 @@ public interface StreamSinkApi {
Call<Response<Integer>> save(@Body SinkRequest request);

@POST("sink/batchSave")
Call<Response<List<Integer>>> batchSave(@Body List<SinkRequest> requestList);
Call<Response<List<BatchResult>>> batchSave(@Body List<SinkRequest> requestList);

@POST("sink/update")
Call<Response<Boolean>> updateById(@Body SinkRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.client.api.service;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
Expand All @@ -39,7 +40,7 @@ public interface StreamSourceApi {
Call<Response<Integer>> createSource(@Body SourceRequest request);

@POST("source/batchSave")
Call<Response<List<Integer>>> batchCreateSource(@Body List<SourceRequest> requestList);
Call<Response<List<BatchResult>>> batchCreateSource(@Body List<SourceRequest> requestList);

@POST("source/update")
Call<Response<Boolean>> updateSource(@Body SourceRequest request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.common;

import org.apache.inlong.manager.common.enums.OperationTarget;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchResult {

// Unique identification of operation target
private String uniqueKey;
private boolean success;
private OperationTarget operationTarget;
private String errMsg;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.service.group;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
Expand Down Expand Up @@ -67,7 +68,7 @@ String save(@Valid @NotNull(message = "inlong group request cannot be null") Inl
* @param operator name of operator
* @return inlong group id list after saving
*/
List<String> batchSave(
List<BatchResult> batchSave(
@Valid @NotNull(message = "inlong group request list cannot be null") List<InlongGroupRequest> groupRequestList,
String operator);
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
Expand All @@ -42,6 +43,7 @@
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
Expand Down Expand Up @@ -218,13 +220,24 @@ public String save(InlongGroupRequest request, UserInfo opInfo) {

@Override
@Transactional(rollbackFor = Throwable.class)
public List<String> batchSave(List<InlongGroupRequest> groupRequestList, String operator) {
List<String> groupIdList = new ArrayList<>();
public List<BatchResult> batchSave(List<InlongGroupRequest> groupRequestList, String operator) {
List<BatchResult> resultList = new ArrayList<>();
for (InlongGroupRequest groupRequest : groupRequestList) {
String groupId = this.save(groupRequest, operator);
groupIdList.add(groupId);
BatchResult result = BatchResult.builder()
.uniqueKey(groupRequest.getInlongGroupId())
.operationTarget(OperationTarget.GROUP)
.build();
try {
this.save(groupRequest, operator);
result.setSuccess(true);
} catch (Exception e) {
LOGGER.error("failed to save inlong group for groupId={}", groupRequest.getInlongGroupId(), e);
result.setSuccess(false);
result.setErrMsg(e.getMessage());
}
resultList.add(result);
}
return groupIdList;
return resultList;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sink;

import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
Expand Down Expand Up @@ -66,7 +67,7 @@ public interface StreamSinkService {
* @param operator name of operator
* @return sink id list after saving
*/
List<Integer> batchSave(List<SinkRequest> requestList, String operator);
List<BatchResult> batchSave(List<SinkRequest> requestList, String operator);

/**
* Get stream sink info based on id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
Expand All @@ -34,6 +35,7 @@
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
Expand Down Expand Up @@ -235,11 +237,24 @@ public Integer save(SinkRequest request, UserInfo opInfo) {
}

@Override
public List<Integer> batchSave(List<SinkRequest> requestList, String operator) {
List<Integer> resultList = new ArrayList<>();
public List<BatchResult> batchSave(List<SinkRequest> requestList, String operator) {
List<BatchResult> resultList = new ArrayList<>();
for (SinkRequest request : requestList) {
int id = this.save(request, operator);
resultList.add(id);
BatchResult result = BatchResult.builder()
.uniqueKey(request.getSinkName() + "-" + request.getInlongGroupId() + "-"
+ request.getInlongStreamId())
.operationTarget(OperationTarget.SINK)
.build();
try {
this.save(request, operator);
result.setSuccess(true);
} catch (Exception e) {
LOGGER.error("failed to save save source info for sinkName={}, groupId={}, streamId={}",
request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId(), e);
result.setSuccess(false);
result.setErrMsg(e.getMessage());
}
resultList.add(result);
}
return resultList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.service.source;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
Expand Down Expand Up @@ -59,7 +60,7 @@ public interface StreamSourceService {
* @param operator Operator's name.
* @return source id list after saving.
*/
List<Integer> batchSave(List<SourceRequest> requestList, String operator);
List<BatchResult> batchSave(List<SourceRequest> requestList, String operator);

/**
* Query source information based on id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
Expand All @@ -33,6 +34,7 @@
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
Expand Down Expand Up @@ -163,11 +165,24 @@ public Integer save(SourceRequest request, UserInfo opInfo) {
}

@Override
public List<Integer> batchSave(List<SourceRequest> requestList, String operator) {
List<Integer> resultList = new ArrayList<>();
public List<BatchResult> batchSave(List<SourceRequest> requestList, String operator) {
List<BatchResult> resultList = new ArrayList<>();
for (SourceRequest request : requestList) {
int id = this.save(request, operator);
resultList.add(id);
BatchResult result = BatchResult.builder()
.uniqueKey(request.getSourceName() + "-" + request.getInlongGroupId() + "-"
+ request.getInlongStreamId())
.operationTarget(OperationTarget.SOURCE)
.build();
try {
this.save(request, operator);
result.setSuccess(true);
} catch (Exception e) {
LOGGER.error("failed to save save source info for sourceName={}, groupId={}, streamId={}",
request.getSourceName(), request.getInlongGroupId(), request.getInlongStreamId(), e);
result.setSuccess(false);
result.setErrMsg(e.getMessage());
}
resultList.add(result);
}
return resultList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.service.stream;

import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
Expand Down Expand Up @@ -57,7 +58,7 @@ public interface InlongStreamService {
* @param operator The name of operator.
* @return Id list after successful save.
*/
List<Integer> batchSave(List<InlongStreamRequest> requestList, String operator);
List<BatchResult> batchSave(List<InlongStreamRequest> requestList, String operator);

/**
* Save inlong stream information.
Expand Down
Loading

0 comments on commit 2ebabf7

Please sign in to comment.