Skip to content

Commit

Permalink
Merge branch 'apache:master' into INLONG-11349
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo authored Nov 14, 2024
2 parents bcc449d + f493862 commit 4fa8b9b
Show file tree
Hide file tree
Showing 177 changed files with 3,290 additions and 3,385 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ jobs:

- name: Build with Maven
run: |
mvn --batch-mode --update-snapshots -e -V clean install -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
mvn -B -U -ntp -e -V -T 1C clean install -pl '!inlong-distribution' -am -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
mvn install -pl inlong-distribution -am -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true
env:
CI: false

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ jobs:
sudo swapon /swapfile
- name: Build with Maven
run: mvn --batch-mode --update-snapshots -e -V clean install -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
run: mvn -B -U -ntp -e -V clean install -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true -pl '!inlong-distribution' -am -T 1C -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

- name: Unit test with Maven
run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13,!:sort-end-to-end-tests-v1.18
run: mvn -B -U -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13,!:sort-end-to-end-tests-v1.18
env:
CI: false

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci_ut_flink13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ jobs:
restore-keys: ${{ runner.os }}-inlong-flink13

- name: Build for Flink 1.13 with Maven
run: mvn --update-snapshots -e -V clean install -U -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
run: mvn -B -U -ntp -e -V -T 1C clean install -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13 -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

- name: Unit test for Flink 1.13 with Maven
run: mvn --update-snapshots -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13
run: mvn -U -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13
env:
CI: false

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci_ut_flink15.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ jobs:
restore-keys: ${{ runner.os }}-inlong-flink15

- name: Build for Flink 1.15 with Maven
run: mvn --update-snapshots -e -V clean install -U -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
run: mvn -B -U -ntp -e -V -T 1C clean install -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15 -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

- name: Unit test for Flink 1.15 with Maven
run: mvn --update-snapshots -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15
run: mvn -U -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15
env:
CI: false

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci_ut_flink18.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ jobs:
restore-keys: ${{ runner.os }}-inlong-flink18

- name: Build for Flink 1.18 with Maven
run: mvn --update-snapshots -e -V clean install -U -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
run: mvn -B -U -ntp -e -V -T 1C clean install -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18 -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

- name: Unit test for Flink 1.18 with Maven
run: mvn --update-snapshots -e -V verify -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18
run: mvn -U -e -V verify -pl :sort-end-to-end-tests-v1.18 -am -Pv1.18
env:
CI: false

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:

- name: Build with Maven
run: |
mvn --batch-mode --update-snapshots -e -V clean install -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
mvn -B -U -ntp -e -V -T 1C clean install -pl '!inlong-distribution' -am -DskipTests -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ public class CommonConstants {
public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;

public static final String PROXY_IS_FILE = "proxy.isFile";
public static final boolean DEFAULT_IS_FILE = false;

public static final String PROXY_CLIENT_IO_THREAD_NUM = "client.iothread.num";
public static final int DEFAULT_PROXY_CLIENT_IO_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
Expand Down Expand Up @@ -168,7 +167,7 @@ private void doSendStatusMsg(DefaultMessageSender sender) {
INLONG_AGENT_SYSTEM,
INLONG_AGENT_STATUS,
AgentUtils.getCurrentTime(),
"", 30, TimeUnit.SECONDS);
"");
if (ret != SendResult.OK) {
LOGGER.error("send status failed: ret {}", ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
Expand Down Expand Up @@ -135,7 +134,7 @@ private void doSendStaticMsg(DefaultMessageSender sender) {
INLONG_AGENT_SYSTEM,
INLONG_FILE_STATIC,
AgentUtils.getCurrentTime(),
"", 30, TimeUnit.SECONDS);
"");
if (ret != SendResult.OK) {
LOGGER.error("send static failed: ret {}", ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ private HeartbeatManager(AgentManager agentManager) {
httpManager = new HttpManager(conf);
baseManagerUrl = httpManager.getBaseUrl();
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
createMessageSender();
}

public static HeartbeatManager getInstance(AgentManager agentManager) {
Expand Down Expand Up @@ -121,9 +120,6 @@ private Runnable heartbeatReportThread() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" {} report heartbeat to manager", heartbeatMsg);
}
if (sender == null) {
createMessageSender();
}
AgentStatusManager.sendStatusMsg(sender);
FileStaticManager.sendStaticMsg(sender);
} catch (Throwable e) {
Expand Down Expand Up @@ -205,6 +201,7 @@ private void createMessageSender() {
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ModuleManager extends AbstractDaemon {
public static final String LOCAL_CONFIG_FILE = "modules.json";
private static final Logger LOGGER = LoggerFactory.getLogger(ModuleManager.class);
public static final int MAX_MODULE_SIZE = 10;
public static final int CHECK_PROCESS_TIMES = 20;
private final InstallerConfiguration conf;
private final String confPath;
private final BlockingQueue<ConfigResult> configQueue;
Expand Down Expand Up @@ -345,7 +346,7 @@ private void checkModules() {
}
break;
case INSTALLED:
if (!isProcessAllStarted(module)) {
if (!isProcessAllStarted(module, CHECK_PROCESS_TIMES)) {
LOGGER.info("module {}({}) process not all started try to start", module.getId(),
module.getName());
if (!startModule(module)) {
Expand Down Expand Up @@ -495,7 +496,7 @@ private boolean startModule(ModuleConfig module) {
String ret = ExcuteLinux.exeCmd(module.getStartCommand());
LOGGER.info("start module {}({}) proc[{}] return {} ", module.getId(), module.getName(), i, ret);
}
if (isProcessAllStarted(module)) {
if (isProcessAllStarted(module, CHECK_PROCESS_TIMES)) {
LOGGER.info("start module {}({}) success", module.getId(), module.getName());
return true;
} else {
Expand All @@ -517,21 +518,27 @@ private void uninstallModule(ModuleConfig module) {
LOGGER.info("uninstall module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private boolean isProcessAllStarted(ModuleConfig module) {
String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
if (ret == null) {
LOGGER.error("get module {}({}) process num failed", module.getId(), module.getName());
return false;
}
String[] processArray = ret.split("\n");
int cnt = 0;
for (int i = 0; i < processArray.length; i++) {
if (processArray[i].length() > 0) {
cnt++;
private boolean isProcessAllStarted(ModuleConfig module, int times) {
for (int check = 0; check < times; check++) {
AgentUtils.silenceSleepInSeconds(1);
String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
if (ret == null) {
LOGGER.error("[{}] get module {}({}) process num failed", check, module.getId(), module.getName());
continue;
}
String[] processArray = ret.split("\n");
int cnt = 0;
for (int i = 0; i < processArray.length; i++) {
if (processArray[i].length() > 0) {
cnt++;
}
}
LOGGER.info("[{}] get module {}({}) process num {}", check, module.getId(), module.getName(), cnt);
if (cnt >= module.getProcessesNum()) {
return true;
}
}
LOGGER.info("get module {}({}) process num {}", module.getId(), module.getName(), cnt);
return cnt >= module.getProcessesNum();
return false;
}

private boolean downloadModule(ModuleConfig module) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void mockFunctions() {
PowerMockito.doAnswer(invocation -> {
ModuleConfig module = invocation.getArgument(0);
return true;
}).when(manager, "isProcessAllStarted", Mockito.any());
}).when(manager, "isProcessAllStarted", Mockito.any(), Mockito.anyInt());

PowerMockito.doReturn(null).when(manager, "getHttpManager", Mockito.any());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public class SenderManager {
private final int aliveConnectionNum;
private final boolean isCompress;
private final int msgType;
private final boolean isFile;
private final long maxSenderTimeout;
private final int maxSenderRetry;
private final long retrySleepTime;
Expand Down Expand Up @@ -133,7 +132,6 @@ public SenderManager(InstanceProfile profile, String inlongGroupId, String sourc
CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
retrySleepTime = agentConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE);
ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
Expand Down Expand Up @@ -200,7 +198,6 @@ private void createMessageSender() throws Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);

proxyClientConfig.setIoThreadNum(ioThreadNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ public static String httpGet(String component, String url, String secretId, Stri
return responseStr;
}
}
} catch (Exception e) {
LOGGER.error("Send get request has exception", e);
} catch (Throwable e) {
LOGGER.error("Http request url = {}, secretId = {}, secretKey = {}, component = {} has exception!", url,
secretId, secretKey, component, e);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ public void closeSocket() {

public boolean checkSocket() {
if (socket.isClosed() || !socket.isConnected()) {
InetSocketAddress inetSocketAddress = null;
try {
InetSocketAddress inetSocketAddress = ProxyManager.getInstance().getInetSocketAddress();
inetSocketAddress = ProxyManager.getInstance().getInetSocketAddress();
if (inetSocketAddress == null) {
LOGGER.error("Audit proxy address is null!");
return false;
}
reconnect(inetSocketAddress, auditConfig.getSocketTimeout());
} catch (IOException exception) {
LOGGER.error("Connect to audit proxy {} has exception!", socket.getInetAddress(), exception);
LOGGER.error("Connect to audit proxy {} has exception!", inetSocketAddress, exception);
return false;
}
}
Expand Down
2 changes: 1 addition & 1 deletion inlong-audit/audit-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
Expand Down
2 changes: 1 addition & 1 deletion inlong-audit/audit-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface AttributeConstants {

String SEPARATOR = "&";
String KEY_VALUE_SEPARATOR = "=";
String LINE_FEED_SEP = "\n";

/**
* group id
Expand Down
5 changes: 5 additions & 0 deletions inlong-dashboard/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions inlong-dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"nprogress": "^0.2.0",
"path-to-regexp": "^6.2.0",
"react": "17.0.1",
"react-csv": "^2.2.2",
"react-dom": "17.0.1",
"react-i18next": "^11.10.0",
"react-redux": "^7.2.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ export class SourceDefaultInfo implements DataWithBackend, RenderRow, RenderList
@I18n('meta.Sources.Type')
sourceType: string;

@FieldDecorator({
type: 'select',
hidden: true,
})
@IngestionField()
@I18n('meta.Sources.File.ClusterName')
readonly clusterTag: string;

@FieldDecorator({
type: 'input',
rules: [
Expand Down
9 changes: 8 additions & 1 deletion inlong-dashboard/src/plugins/sources/defaults/File.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export default class PulsarSource
data: {
keyword,
type: 'AGENT',
clusterTag: values.clusterTag,
pageNum: 1,
pageSize: 10,
},
Expand Down Expand Up @@ -138,7 +139,13 @@ export default class PulsarSource
@FieldDecorator({
type: 'input',
tooltip: i18n.t('meta.Sources.File.FilePathHelp'),
rules: [{ required: true }],
rules: [
{ required: true },
{
pattern: /^\S*$/,
message: i18n.t('meta.Sources.File.FilePathPatternHelp'),
},
],
props: values => ({
disabled: Boolean(values.id),
}),
Expand Down
1 change: 1 addition & 0 deletions inlong-dashboard/src/plugins/sources/defaults/Kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default class KafkaSource
data: {
keyword,
type: 'AGENT',
clusterTag: values.clusterTag,
pageNum: 1,
pageSize: 10,
},
Expand Down
1 change: 1 addition & 0 deletions inlong-dashboard/src/plugins/sources/defaults/Mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default class MongodbSource
data: {
keyword,
type: 'AGENT',
clusterTag: values.clusterTag,
pageNum: 1,
pageSize: 10,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default class PostgreSQLSource
data: {
keyword,
type: 'AGENT',
clusterTag: values.clusterTag,
pageNum: 1,
pageSize: 10,
},
Expand Down
1 change: 1 addition & 0 deletions inlong-dashboard/src/plugins/sources/defaults/Pulsar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export default class PulsarSource
data: {
keyword,
type: 'AGENT',
clusterTag: values.clusterTag,
pageNum: 1,
pageSize: 10,
},
Expand Down
Loading

0 comments on commit 4fa8b9b

Please sign in to comment.