Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9976][Manager] Support multiple types of audit indicator queries #9979

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.enums;

/**
* Indicator type of inlong audit
*/
public enum IndicatorType {

RECEIVED_SUCCESS(0, "RECEIVED_SUCCESS", "Message received success"),
SEND_SUCCESS(1, "SEND_SUCCESS", "Message send success"),
RECEIVED_FAILED(2, "RECEIVED_FAILED", "Message received failed"),
SEND_FAILED(3, "SEND_FAILED", "Message send failed"),
RECEIVED_RETRY(4, "RECEIVED_RETRY", "Message received retry"),
SEND_RETRY(5, "SEND_RETRY", "Message send retry"),
RECEIVED_DISCARD(6, "RECEIVED_DISCARD", "Message received discard"),
SEND_DISCARD(7, "SEND_DISCARD", "Message send discard"),

UNKNOWN_TYPE(Integer.MAX_VALUE, "UNKNOWN_TYPE", "Unknown type");

private final int code;
private final String name;
private final String desc;

IndicatorType(int code, String name, String desc) {
this.code = code;
this.name = name;
this.desc = desc;
}

public static IndicatorType valueOf(int value) {
for (IndicatorType code : IndicatorType.values()) {
if (code.getCode() == value) {
return code;
}
}

return UNKNOWN_TYPE;
}

public int getCode() {
return code;
}

public String getName() {
return name;
}

public String getDesc() {
return desc;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class AuditBaseEntity {
private Integer id;
private String name;
private String type;
private Integer isSent;
private Integer indicatorType;
private String auditId;

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface AuditBaseEntityMapper {

AuditBaseEntity selectByType(@Param("type") String type);

AuditBaseEntity selectByTypeAndIsSent(@Param("type") String type, @Param("isSent") Integer isSent);
AuditBaseEntity selectByTypeAndIndicatorType(@Param("type") String type,
@Param("indicatorType") Integer indicatorType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
<id column="id" jdbcType="INTEGER" property="id"/>
<result column="name" jdbcType="VARCHAR" property="name"/>
<result column="type" jdbcType="VARCHAR" property="type"/>
<result column="is_sent" jdbcType="INTEGER" property="isSent"/>
<result column="indicator_type" jdbcType="INTEGER" property="indicatorType"/>
<result column="audit_id" jdbcType="VARCHAR" property="auditId"/>
</resultMap>
<sql id="Base_Column_List">
id, name, type, is_sent, audit_id
id, name, type, indicator_type, audit_id
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.AuditBaseEntity">
insert into audit_base (id, name, type, is_sent, audit_id)
insert into audit_base (id, name, type, indicator_type, audit_id)
values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR},
#{type,jdbcType=VARCHAR}, #{isSent,jdbcType=INTEGER},
#{type,jdbcType=VARCHAR}, #{indicatorType,jdbcType=INTEGER},
#{auditId,jdbcType=VARCHAR})
</insert>

Expand All @@ -61,12 +61,12 @@
</if>
</where>
</select>
<select id="selectByTypeAndIsSent" resultType="org.apache.inlong.manager.dao.entity.AuditBaseEntity">
<select id="selectByTypeAndIndicatorType" resultType="org.apache.inlong.manager.dao.entity.AuditBaseEntity">
select
<include refid="Base_Column_List"/>
from audit_base
where type = #{type, jdbcType=VARCHAR}
and is_sent = #{isSent, jdbcType=INTEGER}
and indicator_type = #{indicatorType, jdbcType=INTEGER}
</select>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class AuditBaseResponse {
@ApiModelProperty(value = "Audit type")
private String type;

@ApiModelProperty(value = "is sent")
private Integer isSent;
@ApiModelProperty(value = "Indicator type")
private Integer indicatorType;

@ApiModelProperty(value = "Audit id")
private String auditId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.service.core;

import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
Expand Down Expand Up @@ -49,13 +50,13 @@ public interface AuditService {
List<AuditBaseResponse> getAuditBases();

/**
* Get audit id by type and isSent.
* Get audit id by type and indicator type.
*
* @param type audit type.
* @param isSent Whether to receive or send
* @param indicatorType indicator type
* @return Audit id.
*/
String getAuditId(String type, boolean isSent);
String getAuditId(String type, IndicatorType indicatorType);

/**
* Refresh the base item of audit cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.service.core.impl;

import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
Expand Down Expand Up @@ -133,9 +134,7 @@ public class AuditServiceImpl implements AuditService {
private static final String TERMS = "terms";

// key: type of audit base item, value: entity of audit base item
private final Map<String, AuditBaseEntity> auditSentItemMap = new ConcurrentHashMap<>();

private final Map<String, AuditBaseEntity> auditReceivedItemMap = new ConcurrentHashMap<>();
private final Map<String, Map<Integer, AuditBaseEntity>> auditIndicatorMap = new ConcurrentHashMap<>();
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved

private final Map<String, AuditBaseEntity> auditItemMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -186,11 +185,8 @@ public Boolean refreshBaseItemCache() {
for (AuditBaseEntity auditBaseEntity : auditBaseEntities) {
auditItemMap.put(auditBaseEntity.getAuditId(), auditBaseEntity);
String type = auditBaseEntity.getType();
if (auditBaseEntity.getIsSent() == 1) {
auditSentItemMap.put(type, auditBaseEntity);
} else {
auditReceivedItemMap.put(type, auditBaseEntity);
}
Map<Integer, AuditBaseEntity> itemMap = auditIndicatorMap.computeIfAbsent(type, v -> new HashMap<>());
itemMap.put(auditBaseEntity.getIndicatorType(), auditBaseEntity);
}
} catch (Throwable t) {
LOGGER.error("failed to reload audit base item info", t);
Expand Down Expand Up @@ -240,22 +236,19 @@ public AuditSourceResponse getAuditSource() {
}

@Override
public String getAuditId(String type, boolean isSent) {
public String getAuditId(String type, IndicatorType indicatorType) {
if (StringUtils.isBlank(type)) {
return null;
}
AuditBaseEntity auditBaseEntity = isSent ? auditSentItemMap.get(type) : auditReceivedItemMap.get(type);
Map<Integer, AuditBaseEntity> itemMap = auditIndicatorMap.computeIfAbsent(type, v -> new HashMap<>());
AuditBaseEntity auditBaseEntity = itemMap.get(indicatorType.getCode());
if (auditBaseEntity != null) {
return auditBaseEntity.getAuditId();
}
auditBaseEntity = auditBaseMapper.selectByTypeAndIsSent(type, isSent ? 1 : 0);
auditBaseEntity = auditBaseMapper.selectByTypeAndIndicatorType(type, indicatorType.getCode());
Preconditions.expectNotNull(auditBaseEntity, ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED,
String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type));
if (isSent) {
auditSentItemMap.put(type, auditBaseEntity);
} else {
auditReceivedItemMap.put(type, auditBaseEntity);
}
itemMap.put(auditBaseEntity.getIndicatorType(), auditBaseEntity);
return auditBaseEntity.getAuditId();
}

Expand Down Expand Up @@ -293,15 +286,15 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
sourceNodeType = sourceEntityList.get(0).getSourceType();
}

auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
auditIdMap.put(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS), sinkNodeType);

if (CollectionUtils.isEmpty(request.getAuditIds())) {
// properly overwrite audit ids by role and stream config
if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType);
auditIdMap.put(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType));
} else {
auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
auditIdMap.put(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS), sinkNodeType);
request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType));
}
}
Expand Down Expand Up @@ -456,14 +449,14 @@ private List<String> getAuditIds(String groupId, String streamId, String sourceN

// if no sink is configured, return data-proxy output instead of sort
if (sinkNodeType == null) {
auditSet.add(getAuditId(ClusterType.DATAPROXY, true));
auditSet.add(getAuditId(ClusterType.DATAPROXY, IndicatorType.SEND_SUCCESS));
} else {
auditSet.add(getAuditId(sinkNodeType, true));
auditSet.add(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS));
InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId);
if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
auditSet.add(getAuditId(sourceNodeType, false));
auditSet.add(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS));
} else {
auditSet.add(getAuditId(sinkNodeType, false));
auditSet.add(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS));
}
}

Expand All @@ -472,9 +465,9 @@ private List<String> getAuditIds(String groupId, String streamId, String sourceN
if (CollectionUtils.isEmpty(sourceList)
|| sourceList.stream().allMatch(s -> SourceType.AUTO_PUSH.equals(s.getSourceType()))) {
// need data_proxy received type when agent has received type
boolean dpReceivedNeeded = auditSet.contains(getAuditId(ClusterType.AGENT, false));
boolean dpReceivedNeeded = auditSet.contains(getAuditId(ClusterType.AGENT, IndicatorType.RECEIVED_SUCCESS));
if (dpReceivedNeeded) {
auditSet.add(getAuditId(ClusterType.DATAPROXY, false));
auditSet.add(getAuditId(ClusterType.DATAPROXY, IndicatorType.RECEIVED_SUCCESS));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.service.resource.sort;

import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
Expand Down Expand Up @@ -145,7 +146,7 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlon
List<StreamSource> sources = sourceMap.get(streamId);
for (StreamSink sinkInfo : sinkInfos) {
CommonBeanUtils.copyProperties(inlongStreamInfo, sinkInfo, true);
addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), true);
addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), IndicatorType.SEND_SUCCESS);
}

for (StreamSource source : sources) {
Expand All @@ -167,7 +168,8 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlon
}

for (int i = 0; i < sources.size(); i++) {
addAuditId(sources.get(i).getProperties(), sinkInfos.get(0).getSinkType(), false);
addAuditId(sources.get(i).getProperties(), sinkInfos.get(0).getSinkType(),
IndicatorType.RECEIVED_SUCCESS);
}
} else {
if (CollectionUtils.isNotEmpty(transformResponses)) {
Expand All @@ -183,7 +185,7 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlon
}

for (StreamSource source : sources) {
addAuditId(source.getProperties(), source.getSourceType(), false);
addAuditId(source.getProperties(), source.getSourceType(), IndicatorType.RECEIVED_SUCCESS);
}
}

Expand Down Expand Up @@ -294,9 +296,9 @@ private void addToStreamExt(InlongStreamInfo streamInfo, String value) {
streamInfo.getExtList().add(extInfo);
}

private void addAuditId(Map<String, Object> properties, String type, boolean isSent) {
private void addAuditId(Map<String, Object> properties, String type, IndicatorType indicatorType) {
try {
String auditId = auditService.getAuditId(type, isSent);
String auditId = auditService.getAuditId(type, indicatorType);
properties.putIfAbsent("metrics.audit.key", auditId);
properties.putIfAbsent("metrics.audit.proxy.hosts", auditHost);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,17 +809,17 @@ CREATE TABLE IF NOT EXISTS `audit_base`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`name` varchar(256) NOT NULL COMMENT 'Audit base name',
`type` varchar(20) NOT NULL COMMENT 'Audit base item type, such as: AGENT, DATAPROXY, etc',
`is_sent` int(4) NOT NULL DEFAULT '0' COMMENT '0: received, 1: sent',
`indicator_type` int(11) DEFAULT NULL COMMENT 'Indicator type for audit',
`audit_id` varchar(11) NOT NULL COMMENT 'Audit ID mapping of audit name',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_audit_base_type` (`type`, `is_sent`),
UNIQUE KEY `unique_audit_base_type` (`type`, `indicator_type`),
UNIQUE KEY `unique_audit_base_name` (`name`)
);

-- ----------------------------
-- Insert audit_base item
-- ----------------------------
INSERT INTO `audit_base`(`name`, `type`, `is_sent`, `audit_id`)
INSERT INTO `audit_base`(`name`, `type`, `indicator_type`, `audit_id`)
VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
('audit_sdk_sent', 'SDK', 1, '2'),
('audit_agent_collect', 'AGENT', 0, '3'),
Expand Down
6 changes: 3 additions & 3 deletions inlong-manager/manager-web/sql/apache_inlong_manager.sql
Original file line number Diff line number Diff line change
Expand Up @@ -856,18 +856,18 @@ CREATE TABLE IF NOT EXISTS `audit_base`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`name` varchar(256) NOT NULL COMMENT 'Audit base name',
`type` varchar(20) NOT NULL COMMENT 'Audit base item type, such as: AGENT, DATAPROXY, etc',
`is_sent` int(4) NOT NULL DEFAULT '0' COMMENT '0: received, 1: sent',
`indicator_type` int(11) DEFAULT NULL COMMENT 'Indicator type for audit',
`audit_id` varchar(11) NOT NULL COMMENT 'Audit ID mapping of audit name',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_audit_base_type` (`type`, `is_sent`),
UNIQUE KEY `unique_audit_base_type` (`type`, `indicator_type`),
UNIQUE KEY `unique_audit_base_name` (`name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 COMMENT ='Audit base item table';

-- ----------------------------
-- Insert audit_base item
-- ----------------------------
INSERT INTO `audit_base`(`name`, `type`, `is_sent`, `audit_id`)
INSERT INTO `audit_base`(`name`, `type`, `indicator_type`, `audit_id`)
VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
('audit_sdk_sent', 'SDK', 1, '2'),
('audit_agent_read', 'AGENT', 0, '3'),
Expand Down
3 changes: 3 additions & 0 deletions inlong-manager/manager-web/sql/changes-1.12.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ CREATE TABLE IF NOT EXISTS `package_config` (
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT = 'Package config table'

DROP INDEX `unique_audit_base_type` ON `audit_base`;
ALTER TABLE `audit_base` CHANGE is_sent indicator_type int(11) DEFAULT NULL COMMENT 'Indicator type for audit';
ALTER TABLE `audit_base` ADD UNIQUE KEY unique_audit_base_type (`indicator_type`,`type` )
Loading