diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/IndicatorType.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/IndicatorType.java new file mode 100644 index 00000000000..b27c926e8f9 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/IndicatorType.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.common.enums; + +/** + * Indicator type of inlong audit + */ +public enum IndicatorType { + + RECEIVED_SUCCESS(0, "RECEIVED_SUCCESS", "Message received success"), + SEND_SUCCESS(1, "SEND_SUCCESS", "Message send success"), + RECEIVED_FAILED(2, "RECEIVED_FAILED", "Message received failed"), + SEND_FAILED(3, "SEND_FAILED", "Message send failed"), + RECEIVED_RETRY(4, "RECEIVED_RETRY", "Message received retry"), + SEND_RETRY(5, "SEND_RETRY", "Message send retry"), + RECEIVED_DISCARD(6, "RECEIVED_DISCARD", "Message received discard"), + SEND_DISCARD(7, "SEND_DISCARD", "Message send discard"), + + UNKNOWN_TYPE(Integer.MAX_VALUE, "UNKNOWN_TYPE", "Unknown type"); + + private final int code; + private final String name; + private final String desc; + + IndicatorType(int code, String name, String desc) { + this.code = code; + this.name = name; + this.desc = desc; + } + + public static IndicatorType valueOf(int value) { + for (IndicatorType code : IndicatorType.values()) { + if (code.getCode() == value) { + return code; + } + } + + return UNKNOWN_TYPE; + } + + public int getCode() { + return code; + } + + public String getName() { + return name; + } + + public String getDesc() { + return desc; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditBaseEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditBaseEntity.java index 5f77dd20d3b..e4ed92d236c 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditBaseEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditBaseEntity.java @@ -29,7 +29,7 @@ public class AuditBaseEntity { private Integer id; private String name; private String type; - private Integer isSent; + private Integer indicatorType; private String auditId; } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditBaseEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditBaseEntityMapper.java index ba634571a76..8e3c8fdb44e 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditBaseEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditBaseEntityMapper.java @@ -35,6 +35,7 @@ public interface AuditBaseEntityMapper { AuditBaseEntity selectByType(@Param("type") String type); - AuditBaseEntity selectByTypeAndIsSent(@Param("type") String type, @Param("isSent") Integer isSent); + AuditBaseEntity selectByTypeAndIndicatorType(@Param("type") String type, + @Param("indicatorType") Integer indicatorType); } diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/AuditBaseEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/AuditBaseEntityMapper.xml index cc947d26e0f..6ac240d2141 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/AuditBaseEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/AuditBaseEntityMapper.xml @@ -24,17 +24,17 @@ - + - id, name, type, is_sent, audit_id + id, name, type, indicator_type, audit_id - insert into audit_base (id, name, type, is_sent, audit_id) + insert into audit_base (id, name, type, indicator_type, audit_id) values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, - #{type,jdbcType=VARCHAR}, #{isSent,jdbcType=INTEGER}, + #{type,jdbcType=VARCHAR}, #{indicatorType,jdbcType=INTEGER}, #{auditId,jdbcType=VARCHAR}) @@ -61,12 +61,12 @@ - select from audit_base where type = #{type, jdbcType=VARCHAR} - and is_sent = #{isSent, jdbcType=INTEGER} + and indicator_type = #{indicatorType, jdbcType=INTEGER} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java index 74da6aa46a8..a404b702e49 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditBaseResponse.java @@ -35,8 +35,8 @@ public class AuditBaseResponse { @ApiModelProperty(value = "Audit type") private String type; - @ApiModelProperty(value = "is sent") - private Integer isSent; + @ApiModelProperty(value = "Indicator type") + private Integer indicatorType; @ApiModelProperty(value = "Audit id") private String auditId; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java index 08991917b6e..d60d4bebb34 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.core; +import org.apache.inlong.common.enums.IndicatorType; import org.apache.inlong.manager.pojo.audit.AuditBaseResponse; import org.apache.inlong.manager.pojo.audit.AuditRequest; import org.apache.inlong.manager.pojo.audit.AuditSourceRequest; @@ -49,13 +50,13 @@ public interface AuditService { List getAuditBases(); /** - * Get audit id by type and isSent. + * Get audit id by type and indicator type. * * @param type audit type. - * @param isSent Whether to receive or send + * @param indicatorType indicator type * @return Audit id. */ - String getAuditId(String type, boolean isSent); + String getAuditId(String type, IndicatorType indicatorType); /** * Refresh the base item of audit cache. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 2143cce6522..079812536e0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.core.impl; +import org.apache.inlong.common.enums.IndicatorType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.AuditQuerySource; @@ -132,10 +133,8 @@ public class AuditServiceImpl implements AuditService { private static final String DELAY = "delay"; private static final String TERMS = "terms"; - // key: type of audit base item, value: entity of audit base item - private final Map auditSentItemMap = new ConcurrentHashMap<>(); - - private final Map auditReceivedItemMap = new ConcurrentHashMap<>(); + // key 1: type of audit, like pulsar, hive, key 2: indicator type, value : entity of audit base item + private final Map> auditIndicatorMap = new ConcurrentHashMap<>(); private final Map auditItemMap = new ConcurrentHashMap<>(); @@ -186,11 +185,8 @@ public Boolean refreshBaseItemCache() { for (AuditBaseEntity auditBaseEntity : auditBaseEntities) { auditItemMap.put(auditBaseEntity.getAuditId(), auditBaseEntity); String type = auditBaseEntity.getType(); - if (auditBaseEntity.getIsSent() == 1) { - auditSentItemMap.put(type, auditBaseEntity); - } else { - auditReceivedItemMap.put(type, auditBaseEntity); - } + Map itemMap = auditIndicatorMap.computeIfAbsent(type, v -> new HashMap<>()); + itemMap.put(auditBaseEntity.getIndicatorType(), auditBaseEntity); } } catch (Throwable t) { LOGGER.error("failed to reload audit base item info", t); @@ -240,22 +236,19 @@ public AuditSourceResponse getAuditSource() { } @Override - public String getAuditId(String type, boolean isSent) { + public String getAuditId(String type, IndicatorType indicatorType) { if (StringUtils.isBlank(type)) { return null; } - AuditBaseEntity auditBaseEntity = isSent ? auditSentItemMap.get(type) : auditReceivedItemMap.get(type); + Map itemMap = auditIndicatorMap.computeIfAbsent(type, v -> new HashMap<>()); + AuditBaseEntity auditBaseEntity = itemMap.get(indicatorType.getCode()); if (auditBaseEntity != null) { return auditBaseEntity.getAuditId(); } - auditBaseEntity = auditBaseMapper.selectByTypeAndIsSent(type, isSent ? 1 : 0); + auditBaseEntity = auditBaseMapper.selectByTypeAndIndicatorType(type, indicatorType.getCode()); Preconditions.expectNotNull(auditBaseEntity, ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type)); - if (isSent) { - auditSentItemMap.put(type, auditBaseEntity); - } else { - auditReceivedItemMap.put(type, auditBaseEntity); - } + itemMap.put(auditBaseEntity.getIndicatorType(), auditBaseEntity); return auditBaseEntity.getAuditId(); } @@ -293,15 +286,15 @@ public List listByCondition(AuditRequest request) throws Exception { sourceNodeType = sourceEntityList.get(0).getSourceType(); } - auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType); + auditIdMap.put(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS), sinkNodeType); if (CollectionUtils.isEmpty(request.getAuditIds())) { // properly overwrite audit ids by role and stream config if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { - auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); + auditIdMap.put(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType); request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); } else { - auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType); + auditIdMap.put(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS), sinkNodeType); request.setAuditIds(getAuditIds(groupId, streamId, null, sinkNodeType)); } } @@ -456,14 +449,14 @@ private List getAuditIds(String groupId, String streamId, String sourceN // if no sink is configured, return data-proxy output instead of sort if (sinkNodeType == null) { - auditSet.add(getAuditId(ClusterType.DATAPROXY, true)); + auditSet.add(getAuditId(ClusterType.DATAPROXY, IndicatorType.SEND_SUCCESS)); } else { - auditSet.add(getAuditId(sinkNodeType, true)); + auditSet.add(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS)); InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId); if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) { - auditSet.add(getAuditId(sourceNodeType, false)); + auditSet.add(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS)); } else { - auditSet.add(getAuditId(sinkNodeType, false)); + auditSet.add(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS)); } } @@ -472,9 +465,9 @@ private List getAuditIds(String groupId, String streamId, String sourceN if (CollectionUtils.isEmpty(sourceList) || sourceList.stream().allMatch(s -> SourceType.AUTO_PUSH.equals(s.getSourceType()))) { // need data_proxy received type when agent has received type - boolean dpReceivedNeeded = auditSet.contains(getAuditId(ClusterType.AGENT, false)); + boolean dpReceivedNeeded = auditSet.contains(getAuditId(ClusterType.AGENT, IndicatorType.RECEIVED_SUCCESS)); if (dpReceivedNeeded) { - auditSet.add(getAuditId(ClusterType.DATAPROXY, false)); + auditSet.add(getAuditId(ClusterType.DATAPROXY, IndicatorType.RECEIVED_SUCCESS)); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 94c8015ad16..b332c64f52b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.resource.sort; +import org.apache.inlong.common.enums.IndicatorType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -145,7 +146,7 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlon List sources = sourceMap.get(streamId); for (StreamSink sinkInfo : sinkInfos) { CommonBeanUtils.copyProperties(inlongStreamInfo, sinkInfo, true); - addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), true); + addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), IndicatorType.SEND_SUCCESS); } for (StreamSource source : sources) { @@ -167,7 +168,8 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlon } for (int i = 0; i < sources.size(); i++) { - addAuditId(sources.get(i).getProperties(), sinkInfos.get(0).getSinkType(), false); + addAuditId(sources.get(i).getProperties(), sinkInfos.get(0).getSinkType(), + IndicatorType.RECEIVED_SUCCESS); } } else { if (CollectionUtils.isNotEmpty(transformResponses)) { @@ -183,7 +185,7 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo inlon } for (StreamSource source : sources) { - addAuditId(source.getProperties(), source.getSourceType(), false); + addAuditId(source.getProperties(), source.getSourceType(), IndicatorType.RECEIVED_SUCCESS); } } @@ -294,9 +296,9 @@ private void addToStreamExt(InlongStreamInfo streamInfo, String value) { streamInfo.getExtList().add(extInfo); } - private void addAuditId(Map properties, String type, boolean isSent) { + private void addAuditId(Map properties, String type, IndicatorType indicatorType) { try { - String auditId = auditService.getAuditId(type, isSent); + String auditId = auditService.getAuditId(type, indicatorType); properties.putIfAbsent("metrics.audit.key", auditId); properties.putIfAbsent("metrics.audit.proxy.hosts", auditHost); } catch (Exception e) { diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 8ee1d24145d..e273c976bb5 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -809,17 +809,17 @@ CREATE TABLE IF NOT EXISTS `audit_base` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `name` varchar(256) NOT NULL COMMENT 'Audit base name', `type` varchar(20) NOT NULL COMMENT 'Audit base item type, such as: AGENT, DATAPROXY, etc', - `is_sent` int(4) NOT NULL DEFAULT '0' COMMENT '0: received, 1: sent', + `indicator_type` int(4) DEFAULT NULL COMMENT 'Indicator type for audit', `audit_id` varchar(11) NOT NULL COMMENT 'Audit ID mapping of audit name', PRIMARY KEY (`id`), - UNIQUE KEY `unique_audit_base_type` (`type`, `is_sent`), + UNIQUE KEY `unique_audit_base_type` (`type`, `indicator_type`), UNIQUE KEY `unique_audit_base_name` (`name`) ); -- ---------------------------- -- Insert audit_base item -- ---------------------------- -INSERT INTO `audit_base`(`name`, `type`, `is_sent`, `audit_id`) +INSERT INTO `audit_base`(`name`, `type`, `indicator_type`, `audit_id`) VALUES ('audit_sdk_collect', 'SDK', 0, '1'), ('audit_sdk_sent', 'SDK', 1, '2'), ('audit_agent_collect', 'AGENT', 0, '3'), diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 38bc6ed8e52..235711a9d04 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -856,10 +856,10 @@ CREATE TABLE IF NOT EXISTS `audit_base` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `name` varchar(256) NOT NULL COMMENT 'Audit base name', `type` varchar(20) NOT NULL COMMENT 'Audit base item type, such as: AGENT, DATAPROXY, etc', - `is_sent` int(4) NOT NULL DEFAULT '0' COMMENT '0: received, 1: sent', + `indicator_type` int(4) DEFAULT NULL COMMENT 'Indicator type for audit', `audit_id` varchar(11) NOT NULL COMMENT 'Audit ID mapping of audit name', PRIMARY KEY (`id`), - UNIQUE KEY `unique_audit_base_type` (`type`, `is_sent`), + UNIQUE KEY `unique_audit_base_type` (`type`, `indicator_type`), UNIQUE KEY `unique_audit_base_name` (`name`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT ='Audit base item table'; @@ -867,7 +867,7 @@ CREATE TABLE IF NOT EXISTS `audit_base` -- ---------------------------- -- Insert audit_base item -- ---------------------------- -INSERT INTO `audit_base`(`name`, `type`, `is_sent`, `audit_id`) +INSERT INTO `audit_base`(`name`, `type`, `indicator_type`, `audit_id`) VALUES ('audit_sdk_collect', 'SDK', 0, '1'), ('audit_sdk_sent', 'SDK', 1, '2'), ('audit_agent_read', 'AGENT', 0, '3'), diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql b/inlong-manager/manager-web/sql/changes-1.12.0.sql index b0f2ac8bf60..3c6c2bbe436 100644 --- a/inlong-manager/manager-web/sql/changes-1.12.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.12.0.sql @@ -68,3 +68,6 @@ CREATE TABLE IF NOT EXISTS `package_config` ( ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = 'Package config table' +DROP INDEX `unique_audit_base_type` ON `audit_base`; +ALTER TABLE `audit_base` CHANGE is_sent indicator_type int(4) DEFAULT NULL COMMENT 'Indicator type for audit'; +ALTER TABLE `audit_base` ADD UNIQUE KEY unique_audit_base_type (`indicator_type`,`type` ) \ No newline at end of file