diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index 9ef0a1634ae..a0fada67c15 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -81,8 +81,8 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) { primaryKey, pulsarSource.getSubscription(), scanStartupSubStartOffset, - "", - ""); + pulsarSource.getClientAuthPluginClassName(), + pulsarSource.getClientAuthParams()); } @Override diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java index 8c14d8118b9..884100c988a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java @@ -88,6 +88,12 @@ public class PulsarSource extends StreamSource { @Builder.Default private String wrapType = MessageWrapType.INLONG_MSG_V0.getName(); + @ApiModelProperty(value = "Client auth plugin class name") + private String clientAuthPluginClassName; + + @ApiModelProperty(value = "Client auth params") + private String clientAuthParams; + @ApiModelProperty("Reset subscription time") private Long resetTime; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java index 6c0ba662082..2f9c9b1ab16 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java @@ -80,6 +80,12 @@ public class PulsarSourceDTO { @ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc") private String wrapType; + @ApiModelProperty(value = "Client auth plugin class name") + private String clientAuthPluginClassName; + + @ApiModelProperty(value = "Client auth params") + private String clientAuthParams; + @ApiModelProperty("Reset subscription time") private Long resetTime; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java index c9ebf258f0b..0c6946bc6e6 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java @@ -76,6 +76,12 @@ public class PulsarSourceRequest extends SourceRequest { + " Available options are earliest, latest, external-subscription, and specific-offsets.") private String scanStartupMode = "earliest"; + @ApiModelProperty(value = "Client auth plugin class name") + private String clientAuthPluginClassName; + + @ApiModelProperty(value = "Client auth params") + private String clientAuthParams; + @ApiModelProperty("Reset subscription time") private Long resetTime;