Skip to content

Commit

Permalink
[INLONG-11473][Sort] Tube Connector source supports dirty data achieving
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 11, 2024
1 parent 87d0a92 commit 6b2622e
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.inlong.sdk.dirtydata;

import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.text.StringEscapeUtils;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.StringJoiner;

@Slf4j
@Builder
public class DirtyMessageWrapper {

Expand All @@ -32,16 +37,17 @@ public class DirtyMessageWrapper {

private String inlongGroupId;
private String inlongStreamId;
private String dataTime;
private long dataTime;
private String dataflowId;
private String serverType;
private String dirtyType;
private String dirtyMessage;
private String ext;
private String data;
private byte[] dataBytes;

public String format() {
String now = LocalDateTime.now().format(dateTimeFormatter);
String reportTime = LocalDateTime.now().format(dateTimeFormatter);
StringJoiner joiner = new StringJoiner(delimiter);
String formatData = null;
if (data != null) {
Expand All @@ -50,14 +56,19 @@ public String format() {
formatData = Base64.getEncoder().encodeToString(dataBytes);
}

return joiner.add(inlongGroupId)
.add(inlongStreamId)
.add(now)
.add(dataTime)
String dataTimeStr = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime),
ZoneId.systemDefault()).format(dateTimeFormatter);
return joiner
.add(dataflowId)
.add(inlongGroupId)
.add(inlongStreamId)
.add(reportTime)
.add(dataTimeStr)
.add(serverType)
.add(dirtyType)
.add(ext)
.add(formatData).toString();
.add(StringEscapeUtils.escapeXSI(dirtyMessage))
.add(StringEscapeUtils.escapeXSI(ext))
.add(StringEscapeUtils.escapeXSI(formatData))
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.inlong.sdk.dirtydata;

import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.MessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
Expand All @@ -28,19 +27,22 @@
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;

@Slf4j
@Builder
public class InlongSdkDirtySink {
public class InlongSdkDirtySender {

private String inlongGroupId;
private String inlongStreamId;
private String inlongManagerAddr;
private int inlongManagerPort;
private String authId;
private String authKey;
private boolean ignoreErrors;

private SendMessageCallback callback;
private MessageSender sender;
private DefaultMessageSender sender;

public void init() throws Exception {
Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be null");
Expand All @@ -51,8 +53,11 @@ public void init() throws Exception {

this.callback = new LogCallBack();
ProxyClientConfig proxyClientConfig =
new ProxyClientConfig(inlongManagerAddr, inlongGroupId, authId, authKey);
new ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey);
proxyClientConfig.setReadProxyIPFromLocal(false);
this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
this.sender.setMsgtype(7);
log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId);
}

Expand All @@ -61,6 +66,12 @@ public void sendDirtyMessage(DirtyMessageWrapper messageWrapper)
sender.asyncSendMessage(inlongGroupId, inlongStreamId, messageWrapper.format().getBytes(), callback);
}

public void close() {
if (sender != null) {
sender.close();
}
}

class LogCallBack implements SendMessageCallback {

@Override
Expand Down
5 changes: 5 additions & 0 deletions inlong-sort/sort-flink/base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>dirty-data-sdk</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.inlong</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public final class Constants {
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER =
ConfigOptions.key("dirty.side-output.field-delimiter")
.stringType()
.defaultValue(",")
.defaultValue("|")
.withDescription("The field-delimiter of dirty side-output");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
ConfigOptions.key("dirty.side-output.line-delimiter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,22 @@ public class DirtyData<T> {
* The row type of data, it is only used for 'RowData'
*/
private @Nullable final LogicalType rowType;
/**
* Dirty message data time
*/
private final long dataTime;
/**
* Dirty message ext params
*/
private @Nullable final String extParams;
/**
* The real dirty data
*/
private final T data;

public DirtyData(T data, String identifier, String labels,
String logTag, DirtyType dirtyType, String dirtyMessage,
@Nullable LogicalType rowType) {
@Nullable LogicalType rowType, long dataTime, String extParams) {
this.data = data;
this.dirtyType = dirtyType;
this.dirtyMessage = dirtyMessage;
Expand All @@ -87,7 +95,8 @@ public DirtyData(T data, String identifier, String labels,
this.labels = PatternReplaceUtils.replace(labels, paramMap);
this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
this.identifier = PatternReplaceUtils.replace(identifier, paramMap);

this.dataTime = dataTime == 0 ? System.currentTimeMillis() : dataTime;
this.extParams = extParams;
}

public static <T> Builder<T> builder() {
Expand Down Expand Up @@ -122,6 +131,18 @@ public String getIdentifier() {
return identifier;
}

public long getDataTime() {
return dataTime;
}

public String getExtParams() {
return extParams;
}

public String getDirtyMessage() {
return dirtyMessage;
}

@Nullable
public LogicalType getRowType() {
return rowType;
Expand All @@ -135,8 +156,20 @@ public static class Builder<T> {
private DirtyType dirtyType = DirtyType.UNDEFINED;
private String dirtyMessage;
private LogicalType rowType;
private long dataTime;
private String extParams;
private T data;

public Builder<T> setDirtyDataTime(long dataTime) {
this.dataTime = dataTime;
return this;
}

public Builder<T> setExtParams(String extParams) {
this.extParams = extParams;
return this;
}

public Builder<T> setDirtyType(DirtyType dirtyType) {
this.dirtyType = dirtyType;
return this;
Expand Down Expand Up @@ -173,7 +206,8 @@ public Builder<T> setRowType(LogicalType rowType) {
}

public DirtyData<T> build() {
return new DirtyData<>(data, identifier, labels, logTag, dirtyType, dirtyMessage, rowType);
return new DirtyData<>(data, identifier, labels, logTag, dirtyType,
dirtyMessage, rowType, dataTime, extParams);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@Data
@Builder
@Getter
public class InlongSdkOptions implements Serializable {
public class InlongSdkDirtyOptions implements Serializable {

private static final String DEFAULT_FORMAT = "csv";

Expand All @@ -36,9 +36,10 @@ public class InlongSdkOptions implements Serializable {
private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";

private String inlongGroupId;
private String inlongStreamId;
private String sendToGroupId;
private String sendToStreamId;
private String inlongManagerAddr;
private int inlongManagerPort;
private String inlongManagerAuthKey;
private String inlongManagerAuthId;
private String format = DEFAULT_FORMAT;
Expand Down
Loading

0 comments on commit 6b2622e

Please sign in to comment.