Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11719][SDK] Replace the Sender object in the InlongSdkDirtySender class with TcpMsgSender #11724

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.inlong.sdk.dirtydata;

import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -48,7 +50,8 @@ public class InlongSdkDirtySender {
private boolean closed = false;

private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
private DefaultMessageSender sender;
private TcpMsgSender sender;
private MsgSenderSingleFactory messageSenderFactory;
private Executor executor;

public void init() throws Exception {
Expand All @@ -57,14 +60,13 @@ public void init() throws Exception {
Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr cannot be null");
Preconditions.checkNotNull(authId, "authId cannot be null");
Preconditions.checkNotNull(authKey, "authKey cannot be null");

// build sender configure
TcpMsgSenderConfig proxyClientConfig =
new TcpMsgSenderConfig(true,
new TcpMsgSenderConfig(false,
inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey);
proxyClientConfig.setOnlyUseLocalProxyConfig(false);
proxyClientConfig.setTotalAsyncCallbackSize(maxCallbackSize);
this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);

// build sender factory
this.messageSenderFactory = new MsgSenderSingleFactory();
this.sender = this.messageSenderFactory.genTcpSenderByClusterId(proxyClientConfig);
this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize);
this.executor = Executors.newSingleThreadExecutor();
executor.execute(this::doSendDirtyMessage);
Expand All @@ -80,6 +82,7 @@ public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) {
}

private void doSendDirtyMessage() {
ProcessResult procResult = new ProcessResult();
while (!closed) {
try {
DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll();
Expand All @@ -93,30 +96,30 @@ private void doSendDirtyMessage() {
messageWrapper);
continue;
}

sender.asyncSendMessage(inlongGroupId, inlongStreamId,
messageWrapper.format().getBytes(), new LogCallBack(messageWrapper));

if (!sender.asyncSendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId,
System.currentTimeMillis(), null, messageWrapper.format().getBytes()),
new LogCallBack(messageWrapper), procResult)) {
dirtyDataQueue.offer(messageWrapper);
}
} catch (Throwable t) {
log.error("failed to send inlong dirty message", t);
if (!ignoreErrors) {
throw new RuntimeException("writing dirty message to inlong sdk failed", t);
}
}

}
}

public void close() {
closed = true;
dirtyDataQueue.clear();
if (sender != null) {
sender.close();
if (messageSenderFactory != null) {
messageSenderFactory.shutdownAll();
}
}

@Getter
class LogCallBack implements SendMessageCallback {
class LogCallBack implements MsgSendCallback {

private final DirtyMessageWrapper wrapper;

Expand All @@ -125,8 +128,8 @@ public LogCallBack(DirtyMessageWrapper wrapper) {
}

@Override
public void onMessageAck(SendResult result) {
if (SendResult.OK != result) {
public void onMessageAck(ProcessResult result) {
if (!result.isSuccess()) {
dirtyDataQueue.offer(wrapper);
}
}
Expand Down
Loading