From c074de2d200bef6f7f23f8222a96a83c91a0cef8 Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Wed, 15 Mar 2023 17:21:34 +0800 Subject: [PATCH 1/7] init RocketMQ sink v1 https://github.com/bytedance/bitsail/issues/137 --- .../rocketmq/contant/RocketMQConstants.java | 24 ++ .../rocketmq/error/RocketMQErrorCode.java | 4 +- .../option/RocketMQWriterOptions.java | 98 +++++++ .../rocketmq/sink/HashQueueSelector.java | 48 ++++ .../rocketmq/sink/RocketMQProducer.java | 255 ++++++++++++++++++ .../connector/rocketmq/sink/RocketMQSink.java | 63 +++++ .../rocketmq/sink/RocketMQWriter.java | 165 ++++++++++++ .../sink/config/RocketMQSinkConfig.java | 93 +++++++ .../format/RocketMQSerializationFactory.java | 50 ++++ .../format/RocketMQSerializationSchema.java | 32 +++ .../sink/format/RocketMQSinkFormat.java | 21 ++ .../format/json/JsonSerializationSchema.java | 90 +++++++ 12 files changed, 942 insertions(+), 1 deletion(-) create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java new file mode 100644 index 000000000..ef438e721 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.contant; + +public class RocketMQConstants { + + public static final String CONNECTOR_NAME = "rocketmq"; + + public static final int MAX_PARALLELISM_OUTPUT_ROCKETMQ = 5; +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java index 86988d79e..c9a19a6f9 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java @@ -22,7 +22,9 @@ public enum RocketMQErrorCode implements ErrorCode { CONSUMER_CREATE_FAILED("RocketMQ-1", "RocketMQ Consumer create failed."), CONSUMER_FETCH_OFFSET_FAILED("RocketMQ-2", "RocketMQ Consumer fetch offset failed."), - CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."); + CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."), + REQUIRED_VALUE("RocketMQ-4", "You missed parameter which is required, please check your configuration."), + UNSUPPORTED_FORMAT("RocketMQ-5", "Unsupported output format."); public String code; public String description; diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java new file mode 100644 index 000000000..02fc5910d --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java @@ -0,0 +1,98 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.WriterOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX; + +public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions { + + @Essential + ConfigOption NAME_SERVER_ADDRESS = + key(WRITER_PREFIX + "name_server_address") + .noDefaultValue(String.class); + + ConfigOption PRODUCER_GROUP = + key(WRITER_PREFIX + "producer_group") + .noDefaultValue(String.class); + + @Essential + ConfigOption TOPIC = + key(WRITER_PREFIX + "topic") + .noDefaultValue(String.class); + + ConfigOption TAG = + key(WRITER_PREFIX + "tag") + .noDefaultValue(String.class); + + ConfigOption ENABLE_BATCH_FLUSH = + key(WRITER_PREFIX + "enable_batch_flush") + .defaultValue(true); + + ConfigOption BATCH_SIZE = + key(WRITER_PREFIX + "batch_size") + .defaultValue(100); + + /** + * when encounter errors while sending:
+ * true: log the error
+ * false: throw exceptions + */ + ConfigOption LOG_FAILURES_ONLY = + key(WRITER_PREFIX + "log_failures_only") + .defaultValue(false); + + ConfigOption ENABLE_SYNC_SEND = + key(WRITER_PREFIX + "enable_sync_send") + .defaultValue(false); + + ConfigOption ACCESS_KEY = + key(WRITER_PREFIX + "access_key") + .noDefaultValue(String.class); + + ConfigOption SECRET_KEY = + key(WRITER_PREFIX + "secret_key") + .noDefaultValue(String.class); + + ConfigOption SEND_FAILURE_RETRY_TIMES = + key(WRITER_PREFIX + "send_failure_retry_times") + .defaultValue(3); + + ConfigOption SEND_MESSAGE_TIMEOUT = + key(WRITER_PREFIX + "send_message_timeout_ms") + .defaultValue(3000); + + ConfigOption MAX_MESSAGE_SIZE = + key(WRITER_PREFIX + "max_message_size_bytes") + .defaultValue(4194304); + + ConfigOption KEY_FIELDS = + key(WRITER_PREFIX + "key") + .noDefaultValue(String.class); + + ConfigOption PARTITION_FIELDS = + key(WRITER_PREFIX + "partition_fields") + .noDefaultValue(String.class); + + ConfigOption FORMAT = + key(WRITER_PREFIX + "format") + .defaultValue("json"); +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java new file mode 100644 index 000000000..4bb48890c --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + +public class HashQueueSelector implements MessageQueueSelector { + + private int nullKeyCount; + + public HashQueueSelector() { + super(); + nullKeyCount = 0; + } + + @Override + public MessageQueue select(List mqList, Message message, Object partitionKeys) { + int queueId; + + if (partitionKeys != null) { + queueId = partitionKeys.hashCode() % mqList.size(); + } else { + queueId = nullKeyCount % mqList.size(); + nullKeyCount = (nullKeyCount + 1) % mqList.size(); + } + + return mqList.get(queueId); + } +} + diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java new file mode 100644 index 000000000..a2e6798d1 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java @@ -0,0 +1,255 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import com.bytedance.bitsail.common.util.Preconditions; +import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; + +import lombok.Setter; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class RocketMQProducer implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQProducer.class); + + private static final long serialVersionUID = 1L; + + /* necessary properties for @{link DefaultMQProducer} */ + private final String nameServerAddress; + private final String producerGroup; + + /* flush settings */ + private final int batchSize; + private final boolean batchFlushEnable; + private final boolean logFailuresOnly; + private final boolean enableSyncSend; + + /* acl control */ + private final String accessKey; + private final String secretKey; + + /* props for producer */ + private final int failureRetryTimes; + private final int sendMsgTimeout; + private final int maxMessageSize; + + /* messages and producer */ + private final List messageList; + private transient DefaultMQProducer producer; + + private transient SendCallback callBack; + private transient volatile Exception flushException; + + @Setter + private boolean enableQueueSelector; + private transient HashQueueSelector queueSelector; + + public RocketMQProducer(RocketMQSinkConfig sinkConfig) { + this.nameServerAddress = sinkConfig.getNameServerAddress(); + this.producerGroup = sinkConfig.getProducerGroup(); + this.accessKey = sinkConfig.getAccessKey(); + this.secretKey = sinkConfig.getSecretKey(); + this.failureRetryTimes = sinkConfig.getFailureRetryTimes(); + this.sendMsgTimeout = sinkConfig.getSendMsgTimeout(); + this.maxMessageSize = sinkConfig.getMaxMessageSize(); + + this.batchSize = sinkConfig.getBatchSize(); + this.batchFlushEnable = sinkConfig.isEnableBatchFlush(); + this.logFailuresOnly = sinkConfig.isLogFailuresOnly(); + this.enableSyncSend = sinkConfig.isEnableSyncSend(); + + this.enableQueueSelector = false; + this.messageList = new ArrayList<>(); + } + + /** + * initialize message list, open rocketmq producer + */ + public void open() { + // step1: initialize callback and queue-selector + if (logFailuresOnly) { + callBack = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) {} + + @Override + public void onException(Throwable throwable) { + LOG.error("Error while sending record to rocketmq: " + throwable.getMessage(), throwable); + } + }; + } else { + callBack = new SendCallback() { + @Override + public void onSuccess(SendResult result) {} + + @Override + public void onException(Throwable e) { + if (flushException == null) { + flushException = new IOException(e.getMessage(), e); + } + } + }; + } + + if (enableQueueSelector) { + this.queueSelector = new HashQueueSelector(); + } + + // step2: open a producer + if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { + AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + this.producer = new DefaultMQProducer(producerGroup, rpcHook); + } else { + this.producer = new DefaultMQProducer(producerGroup); + } + + producer.setNamesrvAddr(nameServerAddress); + producer.setRetryTimesWhenSendFailed(failureRetryTimes); + producer.setSendMsgTimeout(sendMsgTimeout); + producer.setMaxMessageSize(maxMessageSize); + + try { + producer.start(); + } catch (MQClientException e) { + LOG.error("Failed to start rocketmq producer.", e); + throw new RuntimeException(e); + } + } + + public void validateParams() { + if (enableQueueSelector && batchFlushEnable) { + LOG.warn("Queue selector can not be used int batch flush mode!"); + } + Preconditions.checkNotNull(nameServerAddress); + Preconditions.checkNotNull(producerGroup); + } + + /** + * send row to rocketmq + */ + public void send(Message message, String partitionKeys) throws Exception { + checkErroneous(); + if (batchFlushEnable) { + synchronized (messageList) { + messageList.add(message); + if (messageList.size() >= batchSize) { + flushMessages(enableSyncSend); + } + } + } else { + sendSingleMessage(message, partitionKeys, enableSyncSend); + } + } + + /** + * flush message list to rocketmq producer + */ + public void flushMessages(boolean syncSend) { + if (messageList.isEmpty()) { + return; + } + + try { + sendMessageBatch(messageList, syncSend); + } catch (Exception e) { + LOG.error("Failed to flush.", e); + throw new RuntimeException("Rocketmq flush exception", e); + } + messageList.clear(); + } + + /** + * close producer + */ + public void close() throws Exception { + LOG.info("Closing RocketMQProducer."); + + if (producer != null) { + try { + flushMessages(true); + } catch (Exception e) { + LOG.error("Failed to send last batch of message.", e); + throw new RuntimeException("Failed to send last batch of message.", e); + } + + producer.shutdown(); + } + + checkErroneous(); + } + + private void checkErroneous() throws IOException { + Exception e = flushException; + if (e != null) { + // prevent double throwing + flushException = null; + throw new IOException("Failed to send data to rocketmq: " + e.getMessage(), e); + } + } + + /** + * Send single message. + */ + private void sendSingleMessage(Message message, String partitionKeys, boolean enableSyncSend) throws Exception { + if (enableSyncSend) { + SendResult result; + if (enableQueueSelector) { + result = producer.send(message, queueSelector, partitionKeys); + } else { + result = producer.send(message); + } + if (result.getSendStatus() != SendStatus.SEND_OK) { + throw new RemotingException(result.toString()); + } + } else { + if (enableQueueSelector) { + producer.send(message, queueSelector, partitionKeys, callBack); + } else { + producer.send(message, callBack); + } + } + } + + /** + * Send multi message. + */ + private void sendMessageBatch(List messages, boolean syncSend) throws Exception { + if (syncSend) { + SendResult result = producer.send(messages); + if (result.getSendStatus() != SendStatus.SEND_OK) { + throw new RemotingException(result.toString()); + } + } else { + producer.send(messages, callBack); + } + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java new file mode 100644 index 000000000..b69c272da --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import com.bytedance.bitsail.base.connector.writer.v1.Sink; +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.connector.rocketmq.contant.RocketMQConstants; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +public class RocketMQSink implements Sink { + private BitSailConfiguration commonConf; + + private BitSailConfiguration writerConf; + + @Override + public String getWriterName() { + return RocketMQConstants.CONNECTOR_NAME; + } + + @Override + public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception { + this.commonConf = commonConfiguration; + this.writerConf = writerConfiguration; + } + + @Override + public Writer createWriter(Writer.Context context) throws IOException { + return new RocketMQWriter<>(commonConf, writerConf, context); + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new BitSailTypeInfoConverter(); + } + + @Override + public Optional> createCommitter() { + return Optional.empty(); + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java new file mode 100644 index 000000000..d674d8e31 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java @@ -0,0 +1,165 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.util.Preconditions; +import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; +import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSerializationFactory; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSerializationSchema; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class RocketMQWriter implements Writer { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQWriter.class); + + private final RocketMQProducer rocketmqProducer; + + private RocketMQSinkConfig sinkConfig; + + private final String topic; + private final String tag; + + private List partitionIndices; + + private final transient Writer.Context context; + + protected Map optionalConfig; + + + /* serialize row(s) based on user-defined format */ + private RocketMQSerializationSchema serializationSchema; + + public RocketMQWriter(BitSailConfiguration commonConf, BitSailConfiguration writerConf, + Context context) { + this.context = context; + this.sinkConfig = new RocketMQSinkConfig(writerConf); + this.topic = sinkConfig.getTopic(); + this.tag = sinkConfig.getTag(); + + List columns = writerConf.getNecessaryOption(RocketMQWriterOptions.COLUMNS, + RocketMQErrorCode.REQUIRED_VALUE); + + // get partition fields + String partitionFields = writerConf.getNecessaryOption(RocketMQWriterOptions.PARTITION_FIELDS, RocketMQErrorCode.REQUIRED_VALUE); + this.partitionIndices = getIndicesByFieldNames(columns, partitionFields); + + // get key index + String keyFields = writerConf.getNecessaryOption(RocketMQWriterOptions.KEY_FIELDS, RocketMQErrorCode.REQUIRED_VALUE); + List keyIndices = getIndicesByFieldNames(columns, keyFields); + + // get output format type (support only json now) + String formatType = writerConf.getNecessaryOption(RocketMQWriterOptions.FORMAT, RocketMQErrorCode.REQUIRED_VALUE); + RocketMQSinkFormat sinkFormat = RocketMQSinkFormat.valueOf(formatType.toUpperCase()); + + LOG.info("RocketMQ producer settings: " + sinkConfig); + LOG.info("RocketMQ partition fields indices: " + partitionIndices); + LOG.info("RocketMQ key indices: " + keyIndices); + LOG.info("RocketMQ sink format type: " + sinkFormat); + this.rocketmqProducer = new RocketMQProducer(sinkConfig); + + //todo + RocketMQSerializationFactory factory = new RocketMQSerializationFactory(context.getRowTypeInfo(), partitionIndices, keyIndices); + this.serializationSchema = factory.getSerializationSchemaByFormat(writerConf, sinkFormat); + } + + @Override + public void write(Row row) throws IOException { + Message message = prepareMessage(row); + String partitionKeys = serializationSchema.getPartitionKey(row); + try { + rocketmqProducer.send(message, partitionKeys); + } catch (Exception e) { + throw new IOException("failed to send record to rocketmq: " + e.getMessage(), e); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException { + synchronized (this) { + if (Objects.nonNull(rocketmqProducer)) { + rocketmqProducer.flushMessages(endOfInput); + } + if (endOfInput) { + LOG.info("all records are sent to commit buffer."); + } + } + } + + @Override + public List prepareCommit() throws IOException { + return null; + } + + /** + * transform BitSail Row to RocketMQ Message (value, topic, tag) + */ + private Message prepareMessage(Row row) { + byte[] k = serializationSchema.serializeKey(row); + byte[] value = serializationSchema.serializeValue(row); + String key = k != null ? new String(k, StandardCharsets.UTF_8) : ""; + + Preconditions.checkNotNull(topic, "the message topic is null"); + Preconditions.checkNotNull(value, "the message body is null"); + + Message msg = new Message(topic, value); + msg.setKeys(key); + msg.setTags(tag); + + return msg; + } + + /** + * get indices by field names + */ + private List getIndicesByFieldNames(List columns, String fieldNames) { + if (StringUtils.isEmpty(fieldNames)) { + return null; + } + + List fields = Arrays.asList(fieldNames.split(",\\s*")); + List indices = fields.stream().map(field -> { + for (int i = 0; i < columns.size(); ++i) { + String columnName = columns.get(i).getName().trim(); + if (columnName.equals(field)) { + return i; + } + } + throw new IllegalArgumentException("Field " + field + " not found in columns! All fields are: " + fieldNames); + }).collect(Collectors.toList()); + return indices.isEmpty() ? null : indices; + } + +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java new file mode 100644 index 000000000..63dde84a6 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; + +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +@Data +public class RocketMQSinkConfig implements Serializable { + private static final long serialVersionUID = 2L; + + private String nameServerAddress; + private String producerGroup; + private String topic; + private String tag; + + private boolean enableBatchFlush; + private int batchSize; + + private String accessKey; + private String secretKey; + + private boolean logFailuresOnly; + private boolean enableSyncSend; + + private int failureRetryTimes; + private int sendMsgTimeout; + private int maxMessageSize; + + public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { + this.nameServerAddress = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.NAME_SERVER_ADDRESS, + RocketMQErrorCode.REQUIRED_VALUE); + this.producerGroup = outputSliceConfig.getUnNecessaryOption(RocketMQWriterOptions.PRODUCER_GROUP, + UUID.randomUUID().toString()); + this.topic = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.TOPIC, + RocketMQErrorCode.REQUIRED_VALUE); + this.tag = outputSliceConfig.get(RocketMQWriterOptions.TAG); + + this.enableBatchFlush = outputSliceConfig.get(RocketMQWriterOptions.ENABLE_BATCH_FLUSH); + this.batchSize = outputSliceConfig.get(RocketMQWriterOptions.BATCH_SIZE); + + this.accessKey = outputSliceConfig.get(RocketMQWriterOptions.ACCESS_KEY); + this.secretKey = outputSliceConfig.get(RocketMQWriterOptions.SECRET_KEY); + + this.logFailuresOnly = outputSliceConfig.get(RocketMQWriterOptions.LOG_FAILURES_ONLY); + this.enableSyncSend = outputSliceConfig.get(RocketMQWriterOptions.ENABLE_SYNC_SEND); + + this.failureRetryTimes = outputSliceConfig.get(RocketMQWriterOptions.SEND_FAILURE_RETRY_TIMES); + this.sendMsgTimeout = outputSliceConfig.get(RocketMQWriterOptions.SEND_MESSAGE_TIMEOUT); + this.maxMessageSize = outputSliceConfig.get(RocketMQWriterOptions.MAX_MESSAGE_SIZE); + } + + @Override + public String toString() { + Map configMap = new HashMap<>(); + configMap.put("nameServerAddress", this.nameServerAddress); + configMap.put("producerGroup", this.producerGroup); + configMap.put("topic", this.topic); + configMap.put("tag", this.tag); + configMap.put("batchSize", this.batchSize + ""); + configMap.put("enableBatchFlush", this.enableBatchFlush + ""); + configMap.put("logFailuresOnly", this.logFailuresOnly + ""); + configMap.put("enableSyncSend", this.enableSyncSend + ""); + configMap.put("failureRetryTimes", this.failureRetryTimes + ""); + configMap.put("sendMsgTimeout", this.sendMsgTimeout + ""); + configMap.put("maxMessageSize", this.maxMessageSize + ""); + return configMap.toString(); + } + +} + diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java new file mode 100644 index 000000000..d8f362790 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.format; + +import com.bytedance.bitsail.base.format.SerializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema; +import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.sink.format.json.JsonSerializationSchema; + +import java.util.List; + +public class RocketMQSerializationFactory { + + RowTypeInfo rowTypeInfo; + List partitionIndices; + List keyIndices; + + public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List partitionIndices, List keyIndices) { + this.rowTypeInfo = rowTypeInfo; + this.partitionIndices = partitionIndices; + this.keyIndices = keyIndices; + } + + public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) { + if (format == RocketMQSinkFormat.JSON) { + SerializationSchema jsonSerialSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo); + return new JsonSerializationSchema(jsonSerialSchema, partitionIndices, keyIndices); + } + throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT, + "unsupported sink format: " + format); + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java new file mode 100644 index 000000000..8d03bef3c --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.format; + +import com.bytedance.bitsail.common.row.Row; + +import java.io.Serializable; + +public interface RocketMQSerializationSchema extends Serializable { + + byte[] serializeKey(Row row); + + byte[] serializeValue(Row row); + + default String getPartitionKey(Row row) { + return null; + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java new file mode 100644 index 000000000..37b6bdcf0 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java @@ -0,0 +1,21 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.format; + +public enum RocketMQSinkFormat { + JSON +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java new file mode 100644 index 000000000..8f6b2299a --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java @@ -0,0 +1,90 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.format.json; + +import com.bytedance.bitsail.base.format.SerializationSchema; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSerializationSchema; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +public class JsonSerializationSchema implements RocketMQSerializationSchema { + private static final Logger LOG = LoggerFactory.getLogger(JsonSerializationSchema.class); + + private static final long serialVersionUID = 3L; + + private final SerializationSchema serializationSchema; + private final List partitionKeyIndices; + private final List keyIndices; + + public JsonSerializationSchema(SerializationSchema serializationSchema, List partitionKeyIndices, List keyIndices) { + this.serializationSchema = serializationSchema; + this.partitionKeyIndices = partitionKeyIndices; + this.keyIndices = keyIndices; + } + + @Override + public byte[] serializeKey(Row row) { + if (keyIndices != null) { + String key = this.keyIndices.stream() + .map(i -> { + Object keyField = row.getField(i); + if (keyField != null) { + return keyField.toString(); + } + LOG.warn("Found null key in row: [{}]", row); + return null; + }) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.joining()); + return key.getBytes(StandardCharsets.UTF_8); + } else { + return null; + } + } + + @Override + public byte[] serializeValue(Row row) { + return serializationSchema.serialize(row); + } + + @Override + public String getPartitionKey(Row row) { + if (partitionKeyIndices != null) { + return this.partitionKeyIndices.stream() + .map(i -> { + Object partitionField = row.getField(i); + if (partitionField != null) { + return partitionField.toString(); + } + LOG.warn("Found null key in row: [{}]", row); + return null; + }) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.joining()); + } else { + return null; + } + } +} + From e21278839601e61f477c35020ff9f6d50d849b9e Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Thu, 16 Mar 2023 11:58:36 +0800 Subject: [PATCH 2/7] modify RocketMQ writer --- .../rocketmq/sink/RocketMQWriter.java | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java index d674d8e31..cba68e87e 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java @@ -38,7 +38,6 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -47,40 +46,31 @@ public class RocketMQWriter implements Writer private final RocketMQProducer rocketmqProducer; - private RocketMQSinkConfig sinkConfig; - private final String topic; private final String tag; - private List partitionIndices; - - private final transient Writer.Context context; - - protected Map optionalConfig; - + private final List partitionIndices; /* serialize row(s) based on user-defined format */ - private RocketMQSerializationSchema serializationSchema; + private final RocketMQSerializationSchema serializationSchema; public RocketMQWriter(BitSailConfiguration commonConf, BitSailConfiguration writerConf, - Context context) { - this.context = context; - this.sinkConfig = new RocketMQSinkConfig(writerConf); + Context context) throws IOException { + RocketMQSinkConfig sinkConfig = new RocketMQSinkConfig(writerConf); this.topic = sinkConfig.getTopic(); this.tag = sinkConfig.getTag(); + this.rocketmqProducer = new RocketMQProducer(sinkConfig); List columns = writerConf.getNecessaryOption(RocketMQWriterOptions.COLUMNS, RocketMQErrorCode.REQUIRED_VALUE); - // get partition fields String partitionFields = writerConf.getNecessaryOption(RocketMQWriterOptions.PARTITION_FIELDS, RocketMQErrorCode.REQUIRED_VALUE); this.partitionIndices = getIndicesByFieldNames(columns, partitionFields); - // get key index String keyFields = writerConf.getNecessaryOption(RocketMQWriterOptions.KEY_FIELDS, RocketMQErrorCode.REQUIRED_VALUE); List keyIndices = getIndicesByFieldNames(columns, keyFields); + this.open(); - // get output format type (support only json now) String formatType = writerConf.getNecessaryOption(RocketMQWriterOptions.FORMAT, RocketMQErrorCode.REQUIRED_VALUE); RocketMQSinkFormat sinkFormat = RocketMQSinkFormat.valueOf(formatType.toUpperCase()); @@ -88,13 +78,24 @@ public RocketMQWriter(BitSailConfiguration commonConf, BitSailConfiguration writ LOG.info("RocketMQ partition fields indices: " + partitionIndices); LOG.info("RocketMQ key indices: " + keyIndices); LOG.info("RocketMQ sink format type: " + sinkFormat); - this.rocketmqProducer = new RocketMQProducer(sinkConfig); - //todo RocketMQSerializationFactory factory = new RocketMQSerializationFactory(context.getRowTypeInfo(), partitionIndices, keyIndices); this.serializationSchema = factory.getSerializationSchemaByFormat(writerConf, sinkFormat); } + public void open() throws IOException { + if (partitionIndices != null && !partitionIndices.isEmpty()) { + rocketmqProducer.setEnableQueueSelector(true); + } + rocketmqProducer.validateParams(); + + try { + this.rocketmqProducer.open(); + } catch (Exception e) { + throw new IOException("failed to open rocketmq producer: " + e.getMessage(), e); + } + } + @Override public void write(Row row) throws IOException { Message message = prepareMessage(row); @@ -110,7 +111,7 @@ public void write(Row row) throws IOException { public void flush(boolean endOfInput) throws IOException { synchronized (this) { if (Objects.nonNull(rocketmqProducer)) { - rocketmqProducer.flushMessages(endOfInput); + rocketmqProducer.flushMessages(!endOfInput); } if (endOfInput) { LOG.info("all records are sent to commit buffer."); From 617699fc9af775d34198790ce7e6295ee9f368a9 Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Mon, 20 Mar 2023 11:15:44 +0800 Subject: [PATCH 3/7] add test for RocketMQSink --- .../RocketMQConstants.java | 3 +- .../connector/rocketmq/sink/RocketMQSink.java | 2 +- .../rocketmq/sink/RocketMQWriter.java | 2 +- .../rocketmq/RocketMQSinkITCase.java | 83 +++++++++++++++++++ .../src/test/resources/fake_to_rocketmq.json | 70 ++++++++++++++++ 5 files changed, 156 insertions(+), 4 deletions(-) rename bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/{contant => constants}/RocketMQConstants.java (85%) create mode 100644 bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java create mode 100644 bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java similarity index 85% rename from bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java rename to bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java index ef438e721..7c759375a 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/contant/RocketMQConstants.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java @@ -14,11 +14,10 @@ * limitations under the License. */ -package com.bytedance.bitsail.connector.rocketmq.contant; +package com.bytedance.bitsail.connector.rocketmq.constants; public class RocketMQConstants { public static final String CONNECTOR_NAME = "rocketmq"; - public static final int MAX_PARALLELISM_OUTPUT_ROCKETMQ = 5; } diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java index b69c272da..e6fbcc5a7 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java @@ -24,7 +24,7 @@ import com.bytedance.bitsail.common.row.Row; import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; import com.bytedance.bitsail.common.type.TypeInfoConverter; -import com.bytedance.bitsail.connector.rocketmq.contant.RocketMQConstants; +import com.bytedance.bitsail.connector.rocketmq.constants.RocketMQConstants; import java.io.IOException; import java.io.Serializable; diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java index cba68e87e..6306a3bfe 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java @@ -71,7 +71,7 @@ public RocketMQWriter(BitSailConfiguration commonConf, BitSailConfiguration writ List keyIndices = getIndicesByFieldNames(columns, keyFields); this.open(); - String formatType = writerConf.getNecessaryOption(RocketMQWriterOptions.FORMAT, RocketMQErrorCode.REQUIRED_VALUE); + String formatType = writerConf.get(RocketMQWriterOptions.FORMAT); RocketMQSinkFormat sinkFormat = RocketMQSinkFormat.valueOf(formatType.toUpperCase()); LOG.info("RocketMQ producer settings: " + sinkConfig); diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java new file mode 100644 index 000000000..ae669e616 --- /dev/null +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java @@ -0,0 +1,83 @@ +package com.bytedance.bitsail.test.integration.rocketmq; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.fake.option.FakeReaderOptions; +import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; +import com.bytedance.bitsail.test.integration.AbstractIntegrationTest; +import com.bytedance.bitsail.test.integration.utils.JobConfUtils; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Ignore("Ignore unbounded streaming task.") +public class RocketMQSinkITCase extends AbstractIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSinkITCase.class); + + private static final int TOTAL_SEND_COUNT = 300; + + private static final String NAME_SERVER_ADDRESS = "127.0.0.1:10911"; + private static final String TOPIC_NAME = "test_topic"; + private static final String PRODUCER_GROUP = "test_producer_group"; + private static final String TAG = "itcase_test"; + + @Test + public void testFakeToRocketMQ() throws Exception { + BitSailConfiguration configuration = JobConfUtils.fromClasspath("fake_to_rocketmq.json"); + updateConfiguration(configuration); + submitJob(configuration); + + Queue messages = consumeTopic(); + Assert.assertEquals(TOTAL_SEND_COUNT, messages.size()); + } + + private Queue consumeTopic() throws Exception { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); + consumer.setNamesrvAddr(NAME_SERVER_ADDRESS); + consumer.subscribe(TOPIC_NAME, TAG); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + Queue messageQ = new ConcurrentLinkedQueue<>(); + + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + messageQ.addAll(msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + try { + consumer.start(); + } catch (MQClientException e) { + LOG.info("send message failed. {}", e.toString()); + } + Thread.sleep(5000); + consumer.shutdown(); + + for (MessageExt msg : messageQ) { + LOG.debug(new String(msg.getBody())); + } + + LOG.info("Total get {} messages.", messageQ.size()); + return messageQ; + } + + protected void updateConfiguration(BitSailConfiguration jobConfiguration) { + jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT); + jobConfiguration.set(RocketMQWriterOptions.NAME_SERVER_ADDRESS, NAME_SERVER_ADDRESS); + jobConfiguration.set(RocketMQWriterOptions.TOPIC, TOPIC_NAME); + jobConfiguration.set(RocketMQWriterOptions.PRODUCER_GROUP, PRODUCER_GROUP); + jobConfiguration.set(RocketMQWriterOptions.TAG, TAG); + jobConfiguration.set(RocketMQWriterOptions.PARTITION_FIELDS, "id"); + jobConfiguration.set(RocketMQWriterOptions.ENABLE_BATCH_FLUSH, false); + jobConfiguration.set(RocketMQWriterOptions.FORMAT, "json"); + } +} diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json new file mode 100644 index 000000000..aa0b90675 --- /dev/null +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json @@ -0,0 +1,70 @@ +{ + "job": { + "common": { + "job_id": -2413, + "job_name": "bitsail_test_integration_fake_to_rockermq", + "instance_id": -20413, + "user_name": "user" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.fake.source.FakeSource", + "total_count": 300, + "rate": 100000, + "random_null_rate": 0, + "unique_fields": "id", + "columns": [ + { + "name": "id", + "type": "string" + }, + { + "name": "string_field", + "type": "string" + }, + { + "name": "int_field", + "type": "bigint" + }, + { + "name": "double_field", + "type": "double" + }, + { + "name": "date_field", + "type": "date.date" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.rocketmq.sink.RocketMQSink", + "name_server_address": "127.0.0.1:10911", + "producer_group": "test_producer_group", + "topic": "test_topic", + "tag": "itcase_test", + "key": "id", + "partition_fields": "id,date_field", + "columns": [ + { + "name": "id", + "type": "string" + }, + { + "name": "string_field", + "type": "string" + }, + { + "name": "int_field", + "type": "long" + }, + { + "name": "double_field", + "type": "double" + }, + { + "name": "date_field", + "type": "date" + } + ] + } + } +} From 7c3d27b4c8903121470c2752ae63d23409ef3603 Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Mon, 20 Mar 2023 11:47:38 +0800 Subject: [PATCH 4/7] add header --- .../integration/rocketmq/RocketMQSinkITCase.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java index ae669e616..e492dced0 100644 --- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java @@ -1,3 +1,19 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.bytedance.bitsail.test.integration.rocketmq; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; From c07928a12f2525e4d87ddd24bf7ab8b0869858f3 Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Wed, 22 Mar 2023 17:43:08 +0800 Subject: [PATCH 5/7] add unit test for HashQueueSelector and optimize producer config and optimize RocketMQSerizationSchema --- .../constants/OptionalProducerConfig.java | 25 +++++++ .../JsonRocketMQSerializationSchema.java} | 20 +++--- .../format/RocketMQSerializationFactory.java | 10 +-- .../format/RocketMQSerializationSchema.java | 6 +- .../option/RocketMQWriterOptions.java | 9 +++ .../rocketmq/sink/RocketMQProducer.java | 17 +++++ .../rocketmq/sink/RocketMQWriter.java | 5 +- .../sink/config/RocketMQSinkConfig.java | 33 +++++---- .../rocketmq/sink/HashQueueSelectorTest.java | 69 +++++++++++++++++++ 9 files changed, 156 insertions(+), 38 deletions(-) create mode 100644 bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java rename bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/{sink/format/json/JsonSerializationSchema.java => format/JsonRocketMQSerializationSchema.java} (73%) rename bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/{sink => }/format/RocketMQSerializationFactory.java (74%) rename bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/{sink => }/format/RocketMQSerializationSchema.java (83%) create mode 100644 bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java new file mode 100644 index 000000000..f1585ac3d --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.constants; + +public class OptionalProducerConfig { + public static final String INSTANCE_NAME = "instanceName"; + public static final String VIP_CHANNEL = "vipChannelEnabled"; + public static final String DEFAULT_TOPIC_QUEUE_NUMS = "defaultTopicQueueNums"; + public static final String COMPRESS_SG_BODY_OVER = "compressMsgBodyOverHowmuch"; + public static final String HEARTBEAT_BROKER_INTERVAL = "heartbeatBrokerInterval"; +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java similarity index 73% rename from bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java rename to bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java index 8f6b2299a..375158854 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/json/JsonSerializationSchema.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java @@ -14,11 +14,12 @@ * limitations under the License. */ -package com.bytedance.bitsail.connector.rocketmq.sink.format.json; +package com.bytedance.bitsail.connector.rocketmq.format; -import com.bytedance.bitsail.base.format.SerializationSchema; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSerializationSchema; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -28,19 +29,20 @@ import java.util.List; import java.util.stream.Collectors; -public class JsonSerializationSchema implements RocketMQSerializationSchema { - private static final Logger LOG = LoggerFactory.getLogger(JsonSerializationSchema.class); +public class JsonRocketMQSerializationSchema implements RocketMQSerializationSchema { + private static final Logger LOG = LoggerFactory.getLogger(JsonRocketMQSerializationSchema.class); private static final long serialVersionUID = 3L; - private final SerializationSchema serializationSchema; private final List partitionKeyIndices; private final List keyIndices; + private final transient JsonRowSerializationSchema rowSerializationSchema; - public JsonSerializationSchema(SerializationSchema serializationSchema, List partitionKeyIndices, List keyIndices) { - this.serializationSchema = serializationSchema; + public JsonRocketMQSerializationSchema(BitSailConfiguration bitSailConfiguration, RowTypeInfo rowTypeInfo, + List partitionKeyIndices, List keyIndices) { this.partitionKeyIndices = partitionKeyIndices; this.keyIndices = keyIndices; + this.rowSerializationSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo); } @Override @@ -65,7 +67,7 @@ public byte[] serializeKey(Row row) { @Override public byte[] serializeValue(Row row) { - return serializationSchema.serialize(row); + return rowSerializationSchema.serialize(row); } @Override diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java similarity index 74% rename from bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java rename to bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java index d8f362790..c454fd9a4 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationFactory.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java @@ -14,16 +14,13 @@ * limitations under the License. */ -package com.bytedance.bitsail.connector.rocketmq.sink.format; +package com.bytedance.bitsail.connector.rocketmq.format; -import com.bytedance.bitsail.base.format.SerializationSchema; import com.bytedance.bitsail.common.BitSailException; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.row.Row; import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; -import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema; import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; -import com.bytedance.bitsail.connector.rocketmq.sink.format.json.JsonSerializationSchema; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat; import java.util.List; @@ -41,8 +38,7 @@ public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List parti public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) { if (format == RocketMQSinkFormat.JSON) { - SerializationSchema jsonSerialSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo); - return new JsonSerializationSchema(jsonSerialSchema, partitionIndices, keyIndices); + return new JsonRocketMQSerializationSchema(bitSailConfiguration, rowTypeInfo, partitionIndices, keyIndices); } throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT, "unsupported sink format: " + format); diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java similarity index 83% rename from bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java rename to bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java index 8d03bef3c..002c72cdd 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSerializationSchema.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java @@ -14,13 +14,11 @@ * limitations under the License. */ -package com.bytedance.bitsail.connector.rocketmq.sink.format; +package com.bytedance.bitsail.connector.rocketmq.format; import com.bytedance.bitsail.common.row.Row; -import java.io.Serializable; - -public interface RocketMQSerializationSchema extends Serializable { +public interface RocketMQSerializationSchema { byte[] serializeKey(Row row); diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java index 02fc5910d..197dd32ca 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java @@ -20,6 +20,10 @@ import com.bytedance.bitsail.common.option.ConfigOption; import com.bytedance.bitsail.common.option.WriterOptions; +import com.alibaba.fastjson.TypeReference; + +import java.util.Map; + import static com.bytedance.bitsail.common.option.ConfigOptions.key; import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX; @@ -95,4 +99,9 @@ public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions { ConfigOption FORMAT = key(WRITER_PREFIX + "format") .defaultValue("json"); + + ConfigOption> OPTIONAL_PRODUCER_PROPERTIES = + key(WRITER_PREFIX + "optional_producer_properties") + .onlyReference(new TypeReference >() { + }); } diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java index a2e6798d1..20ad77a10 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java @@ -17,6 +17,7 @@ package com.bytedance.bitsail.connector.rocketmq.sink; import com.bytedance.bitsail.common.util.Preconditions; +import com.bytedance.bitsail.connector.rocketmq.constants.OptionalProducerConfig; import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; import lombok.Setter; @@ -37,6 +38,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class RocketMQProducer implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(RocketMQProducer.class); @@ -72,6 +74,7 @@ public class RocketMQProducer implements Serializable { @Setter private boolean enableQueueSelector; private transient HashQueueSelector queueSelector; + private final Map optionalProducerProperties; public RocketMQProducer(RocketMQSinkConfig sinkConfig) { this.nameServerAddress = sinkConfig.getNameServerAddress(); @@ -89,6 +92,7 @@ public RocketMQProducer(RocketMQSinkConfig sinkConfig) { this.enableQueueSelector = false; this.messageList = new ArrayList<>(); + this.optionalProducerProperties = sinkConfig.getOptionalProducerProperties(); } /** @@ -132,6 +136,19 @@ public void onException(Throwable e) { this.producer = new DefaultMQProducer(producerGroup); } + for (Map.Entry entry : this.optionalProducerProperties.entrySet()) { + if (OptionalProducerConfig.INSTANCE_NAME.equals(entry.getKey())) { + producer.setInstanceName((String) entry.getValue()); + } else if (OptionalProducerConfig.COMPRESS_SG_BODY_OVER.equals(entry.getKey())) { + producer.setCompressMsgBodyOverHowmuch((int) entry.getValue()); + } else if (OptionalProducerConfig.DEFAULT_TOPIC_QUEUE_NUMS.equals(entry.getKey())) { + producer.setDefaultTopicQueueNums((int) entry.getValue()); + } else if (OptionalProducerConfig.HEARTBEAT_BROKER_INTERVAL.equals(entry.getKey())) { + producer.setHeartbeatBrokerInterval((int) entry.getValue()); + } else if (OptionalProducerConfig.VIP_CHANNEL.equals(entry.getKey())) { + producer.setVipChannelEnabled((boolean) entry.getValue()); + } + } producer.setNamesrvAddr(nameServerAddress); producer.setRetryTimesWhenSendFailed(failureRetryTimes); producer.setSendMsgTimeout(sendMsgTimeout); diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java index 6306a3bfe..426893bf9 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java @@ -23,10 +23,10 @@ import com.bytedance.bitsail.common.row.Row; import com.bytedance.bitsail.common.util.Preconditions; import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.format.RocketMQSerializationFactory; +import com.bytedance.bitsail.connector.rocketmq.format.RocketMQSerializationSchema; import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; -import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSerializationFactory; -import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSerializationSchema; import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat; import org.apache.commons.lang3.StringUtils; @@ -51,7 +51,6 @@ public class RocketMQWriter implements Writer private final List partitionIndices; - /* serialize row(s) based on user-defined format */ private final RocketMQSerializationSchema serializationSchema; public RocketMQWriter(BitSailConfiguration commonConf, BitSailConfiguration writerConf, diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java index 63dde84a6..3473a1470 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java @@ -23,7 +23,6 @@ import lombok.Data; import java.io.Serializable; -import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -48,6 +47,7 @@ public class RocketMQSinkConfig implements Serializable { private int failureRetryTimes; private int sendMsgTimeout; private int maxMessageSize; + private Map optionalProducerProperties; public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { this.nameServerAddress = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.NAME_SERVER_ADDRESS, @@ -70,24 +70,27 @@ public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { this.failureRetryTimes = outputSliceConfig.get(RocketMQWriterOptions.SEND_FAILURE_RETRY_TIMES); this.sendMsgTimeout = outputSliceConfig.get(RocketMQWriterOptions.SEND_MESSAGE_TIMEOUT); this.maxMessageSize = outputSliceConfig.get(RocketMQWriterOptions.MAX_MESSAGE_SIZE); + this.optionalProducerProperties = outputSliceConfig.get(RocketMQWriterOptions.OPTIONAL_PRODUCER_PROPERTIES); } @Override public String toString() { - Map configMap = new HashMap<>(); - configMap.put("nameServerAddress", this.nameServerAddress); - configMap.put("producerGroup", this.producerGroup); - configMap.put("topic", this.topic); - configMap.put("tag", this.tag); - configMap.put("batchSize", this.batchSize + ""); - configMap.put("enableBatchFlush", this.enableBatchFlush + ""); - configMap.put("logFailuresOnly", this.logFailuresOnly + ""); - configMap.put("enableSyncSend", this.enableSyncSend + ""); - configMap.put("failureRetryTimes", this.failureRetryTimes + ""); - configMap.put("sendMsgTimeout", this.sendMsgTimeout + ""); - configMap.put("maxMessageSize", this.maxMessageSize + ""); - return configMap.toString(); + return "RocketMQSinkConfig{" + + "nameServerAddress='" + nameServerAddress + '\'' + + ", producerGroup='" + producerGroup + '\'' + + ", topic='" + topic + '\'' + + ", tag='" + tag + '\'' + + ", enableBatchFlush=" + enableBatchFlush + + ", batchSize=" + batchSize + + ", accessKey='" + accessKey + '\'' + + ", secretKey='" + secretKey + '\'' + + ", logFailuresOnly=" + logFailuresOnly + + ", enableSyncSend=" + enableSyncSend + + ", failureRetryTimes=" + failureRetryTimes + + ", sendMsgTimeout=" + sendMsgTimeout + + ", maxMessageSize=" + maxMessageSize + + ", optionalProducerProperties=" + optionalProducerProperties + + '}'; } - } diff --git a/bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java b/bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java new file mode 100644 index 000000000..d71c9a9ca --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class HashQueueSelectorTest { + + private HashQueueSelector selector; + + @Before + public void setUp() { + selector = new HashQueueSelector(); + } + + @Test + public void testSelectWithNonNullPartitionKey() { + List mqList = createMessageQueueList(3); + Message message = new Message("TestTopic", "TestTag", "TestBody".getBytes()); + Object partitionKey = "nonNullKey"; + + MessageQueue selectedQueue = selector.select(mqList, message, partitionKey); + + int expectedQueueId = Math.abs(partitionKey.hashCode()) % mqList.size(); + Assert.assertEquals(mqList.get(expectedQueueId), selectedQueue); + } + + @Test + public void testSelectWithNullPartitionKey() { + List mqList = createMessageQueueList(3); + Message message = new Message("TestTopic", "TestTag", "TestBody".getBytes()); + + MessageQueue selectedQueue1 = selector.select(mqList, message, null); + MessageQueue selectedQueue2 = selector.select(mqList, message, null); + + Assert.assertEquals(mqList.get(0), selectedQueue1); + Assert.assertEquals(mqList.get(1), selectedQueue2); + } + + private List createMessageQueueList(int size) { + List mqList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + mqList.add(new MessageQueue("TestTopic", "TestBroker", i)); + } + return mqList; + } + +} \ No newline at end of file From 850fb2b6a3d7cd17a5730e871deb17253329ea4c Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Fri, 24 Mar 2023 16:33:11 +0800 Subject: [PATCH 6/7] optimize rocketmq producer config --- .../option/RocketMQWriterOptions.java | 28 +++++++++++----- .../rocketmq/sink/RocketMQProducer.java | 32 +++++++++---------- .../sink/config/RocketMQSinkConfig.java | 19 ++++++++--- 3 files changed, 50 insertions(+), 29 deletions(-) diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java index 197dd32ca..73f16e079 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java @@ -20,10 +20,6 @@ import com.bytedance.bitsail.common.option.ConfigOption; import com.bytedance.bitsail.common.option.WriterOptions; -import com.alibaba.fastjson.TypeReference; - -import java.util.Map; - import static com.bytedance.bitsail.common.option.ConfigOptions.key; import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX; @@ -100,8 +96,24 @@ public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions { key(WRITER_PREFIX + "format") .defaultValue("json"); - ConfigOption> OPTIONAL_PRODUCER_PROPERTIES = - key(WRITER_PREFIX + "optional_producer_properties") - .onlyReference(new TypeReference >() { - }); + ConfigOption DEFAULT_TOPIC_QUEUE_NUMS = + key(WRITER_PREFIX + "default_topic_queue_nums") + .defaultValue(4); + + ConfigOption COMPRESS_MSG_BODY_SIZE = + key(WRITER_PREFIX + "compress_msg_body_over_how_much") + .defaultValue(4096); + + ConfigOption HEART_BEAT_BROKER_INTERVAL = + key(WRITER_PREFIX + "heart_beat_broker_interval") + .defaultValue(30000); + + ConfigOption INSTANCE_NAME = + key(WRITER_PREFIX + "instance_name") + .defaultValue("bitsail_rocketmq_instance"); + + ConfigOption VIP_CHANNEL_ENABLED = + key(WRITER_PREFIX + "vip_channel_enabled") + .defaultValue(false); + } diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java index 20ad77a10..fbd47e04f 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java @@ -17,7 +17,6 @@ package com.bytedance.bitsail.connector.rocketmq.sink; import com.bytedance.bitsail.common.util.Preconditions; -import com.bytedance.bitsail.connector.rocketmq.constants.OptionalProducerConfig; import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; import lombok.Setter; @@ -38,7 +37,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Map; public class RocketMQProducer implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(RocketMQProducer.class); @@ -63,6 +61,11 @@ public class RocketMQProducer implements Serializable { private final int failureRetryTimes; private final int sendMsgTimeout; private final int maxMessageSize; + private final String instanceName; + private final boolean vipChannelEnabled; + private final int defaultTopicQueueNums; + private final int compressMsgBodyOverHowmuch; + private final int heartbeatBrokerInterval; /* messages and producer */ private final List messageList; @@ -74,7 +77,6 @@ public class RocketMQProducer implements Serializable { @Setter private boolean enableQueueSelector; private transient HashQueueSelector queueSelector; - private final Map optionalProducerProperties; public RocketMQProducer(RocketMQSinkConfig sinkConfig) { this.nameServerAddress = sinkConfig.getNameServerAddress(); @@ -92,7 +94,11 @@ public RocketMQProducer(RocketMQSinkConfig sinkConfig) { this.enableQueueSelector = false; this.messageList = new ArrayList<>(); - this.optionalProducerProperties = sinkConfig.getOptionalProducerProperties(); + this.instanceName = sinkConfig.getInstanceName(); + this.compressMsgBodyOverHowmuch = sinkConfig.getCompressMsgBodyOverHowmuch(); + this.defaultTopicQueueNums = sinkConfig.getDefaultTopicQueueNums(); + this.heartbeatBrokerInterval = sinkConfig.getHeartbeatBrokerInterval(); + this.vipChannelEnabled = sinkConfig.isVipChannelEnabled(); } /** @@ -136,23 +142,15 @@ public void onException(Throwable e) { this.producer = new DefaultMQProducer(producerGroup); } - for (Map.Entry entry : this.optionalProducerProperties.entrySet()) { - if (OptionalProducerConfig.INSTANCE_NAME.equals(entry.getKey())) { - producer.setInstanceName((String) entry.getValue()); - } else if (OptionalProducerConfig.COMPRESS_SG_BODY_OVER.equals(entry.getKey())) { - producer.setCompressMsgBodyOverHowmuch((int) entry.getValue()); - } else if (OptionalProducerConfig.DEFAULT_TOPIC_QUEUE_NUMS.equals(entry.getKey())) { - producer.setDefaultTopicQueueNums((int) entry.getValue()); - } else if (OptionalProducerConfig.HEARTBEAT_BROKER_INTERVAL.equals(entry.getKey())) { - producer.setHeartbeatBrokerInterval((int) entry.getValue()); - } else if (OptionalProducerConfig.VIP_CHANNEL.equals(entry.getKey())) { - producer.setVipChannelEnabled((boolean) entry.getValue()); - } - } producer.setNamesrvAddr(nameServerAddress); producer.setRetryTimesWhenSendFailed(failureRetryTimes); producer.setSendMsgTimeout(sendMsgTimeout); producer.setMaxMessageSize(maxMessageSize); + producer.setInstanceName(instanceName); + producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch); + producer.setDefaultTopicQueueNums(defaultTopicQueueNums); + producer.setHeartbeatBrokerInterval(heartbeatBrokerInterval); + producer.setVipChannelEnabled(vipChannelEnabled); try { producer.start(); diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java index 3473a1470..1f0a8934b 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java @@ -23,7 +23,6 @@ import lombok.Data; import java.io.Serializable; -import java.util.Map; import java.util.UUID; @Data @@ -47,7 +46,11 @@ public class RocketMQSinkConfig implements Serializable { private int failureRetryTimes; private int sendMsgTimeout; private int maxMessageSize; - private Map optionalProducerProperties; + private String instanceName; + private boolean vipChannelEnabled; + private int defaultTopicQueueNums; + private int compressMsgBodyOverHowmuch; + private int heartbeatBrokerInterval; public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { this.nameServerAddress = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.NAME_SERVER_ADDRESS, @@ -70,7 +73,11 @@ public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { this.failureRetryTimes = outputSliceConfig.get(RocketMQWriterOptions.SEND_FAILURE_RETRY_TIMES); this.sendMsgTimeout = outputSliceConfig.get(RocketMQWriterOptions.SEND_MESSAGE_TIMEOUT); this.maxMessageSize = outputSliceConfig.get(RocketMQWriterOptions.MAX_MESSAGE_SIZE); - this.optionalProducerProperties = outputSliceConfig.get(RocketMQWriterOptions.OPTIONAL_PRODUCER_PROPERTIES); + this.instanceName = outputSliceConfig.get(RocketMQWriterOptions.INSTANCE_NAME); + this.vipChannelEnabled = outputSliceConfig.get(RocketMQWriterOptions.VIP_CHANNEL_ENABLED); + this.defaultTopicQueueNums = outputSliceConfig.get(RocketMQWriterOptions.DEFAULT_TOPIC_QUEUE_NUMS); + this.compressMsgBodyOverHowmuch = outputSliceConfig.get(RocketMQWriterOptions.COMPRESS_MSG_BODY_SIZE); + this.heartbeatBrokerInterval = outputSliceConfig.get(RocketMQWriterOptions.HEART_BEAT_BROKER_INTERVAL); } @Override @@ -89,7 +96,11 @@ public String toString() { ", failureRetryTimes=" + failureRetryTimes + ", sendMsgTimeout=" + sendMsgTimeout + ", maxMessageSize=" + maxMessageSize + - ", optionalProducerProperties=" + optionalProducerProperties + + ", instanceName='" + instanceName + '\'' + + ", vipChannelEnabled=" + vipChannelEnabled + + ", defaultTopicQueueNums=" + defaultTopicQueueNums + + ", compressMsgBodyOverHowmuch=" + compressMsgBodyOverHowmuch + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + '}'; } } From 144f3f722e81fd480e0181c2faebc48372de5dc5 Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Mon, 27 Mar 2023 10:23:14 +0800 Subject: [PATCH 7/7] optimize rocketmq write config --- .../connector/rocketmq/option/RocketMQWriterOptions.java | 4 ---- .../bitsail/connector/rocketmq/sink/RocketMQProducer.java | 3 --- .../connector/rocketmq/sink/config/RocketMQSinkConfig.java | 3 --- 3 files changed, 10 deletions(-) diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java index 73f16e079..a28027bed 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java @@ -108,10 +108,6 @@ public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions { key(WRITER_PREFIX + "heart_beat_broker_interval") .defaultValue(30000); - ConfigOption INSTANCE_NAME = - key(WRITER_PREFIX + "instance_name") - .defaultValue("bitsail_rocketmq_instance"); - ConfigOption VIP_CHANNEL_ENABLED = key(WRITER_PREFIX + "vip_channel_enabled") .defaultValue(false); diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java index fbd47e04f..9f8b0d397 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java @@ -61,7 +61,6 @@ public class RocketMQProducer implements Serializable { private final int failureRetryTimes; private final int sendMsgTimeout; private final int maxMessageSize; - private final String instanceName; private final boolean vipChannelEnabled; private final int defaultTopicQueueNums; private final int compressMsgBodyOverHowmuch; @@ -94,7 +93,6 @@ public RocketMQProducer(RocketMQSinkConfig sinkConfig) { this.enableQueueSelector = false; this.messageList = new ArrayList<>(); - this.instanceName = sinkConfig.getInstanceName(); this.compressMsgBodyOverHowmuch = sinkConfig.getCompressMsgBodyOverHowmuch(); this.defaultTopicQueueNums = sinkConfig.getDefaultTopicQueueNums(); this.heartbeatBrokerInterval = sinkConfig.getHeartbeatBrokerInterval(); @@ -146,7 +144,6 @@ public void onException(Throwable e) { producer.setRetryTimesWhenSendFailed(failureRetryTimes); producer.setSendMsgTimeout(sendMsgTimeout); producer.setMaxMessageSize(maxMessageSize); - producer.setInstanceName(instanceName); producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch); producer.setDefaultTopicQueueNums(defaultTopicQueueNums); producer.setHeartbeatBrokerInterval(heartbeatBrokerInterval); diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java index 1f0a8934b..5234efad8 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java @@ -46,7 +46,6 @@ public class RocketMQSinkConfig implements Serializable { private int failureRetryTimes; private int sendMsgTimeout; private int maxMessageSize; - private String instanceName; private boolean vipChannelEnabled; private int defaultTopicQueueNums; private int compressMsgBodyOverHowmuch; @@ -73,7 +72,6 @@ public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { this.failureRetryTimes = outputSliceConfig.get(RocketMQWriterOptions.SEND_FAILURE_RETRY_TIMES); this.sendMsgTimeout = outputSliceConfig.get(RocketMQWriterOptions.SEND_MESSAGE_TIMEOUT); this.maxMessageSize = outputSliceConfig.get(RocketMQWriterOptions.MAX_MESSAGE_SIZE); - this.instanceName = outputSliceConfig.get(RocketMQWriterOptions.INSTANCE_NAME); this.vipChannelEnabled = outputSliceConfig.get(RocketMQWriterOptions.VIP_CHANNEL_ENABLED); this.defaultTopicQueueNums = outputSliceConfig.get(RocketMQWriterOptions.DEFAULT_TOPIC_QUEUE_NUMS); this.compressMsgBodyOverHowmuch = outputSliceConfig.get(RocketMQWriterOptions.COMPRESS_MSG_BODY_SIZE); @@ -96,7 +94,6 @@ public String toString() { ", failureRetryTimes=" + failureRetryTimes + ", sendMsgTimeout=" + sendMsgTimeout + ", maxMessageSize=" + maxMessageSize + - ", instanceName='" + instanceName + '\'' + ", vipChannelEnabled=" + vipChannelEnabled + ", defaultTopicQueueNums=" + defaultTopicQueueNums + ", compressMsgBodyOverHowmuch=" + compressMsgBodyOverHowmuch +