diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java new file mode 100644 index 000000000..f1585ac3d --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/OptionalProducerConfig.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.constants; + +public class OptionalProducerConfig { + public static final String INSTANCE_NAME = "instanceName"; + public static final String VIP_CHANNEL = "vipChannelEnabled"; + public static final String DEFAULT_TOPIC_QUEUE_NUMS = "defaultTopicQueueNums"; + public static final String COMPRESS_SG_BODY_OVER = "compressMsgBodyOverHowmuch"; + public static final String HEARTBEAT_BROKER_INTERVAL = "heartbeatBrokerInterval"; +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java new file mode 100644 index 000000000..7c759375a --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/constants/RocketMQConstants.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.constants; + +public class RocketMQConstants { + + public static final String CONNECTOR_NAME = "rocketmq"; + +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java index 86988d79e..c9a19a6f9 100644 --- a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/error/RocketMQErrorCode.java @@ -22,7 +22,9 @@ public enum RocketMQErrorCode implements ErrorCode { CONSUMER_CREATE_FAILED("RocketMQ-1", "RocketMQ Consumer create failed."), CONSUMER_FETCH_OFFSET_FAILED("RocketMQ-2", "RocketMQ Consumer fetch offset failed."), - CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."); + CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."), + REQUIRED_VALUE("RocketMQ-4", "You missed parameter which is required, please check your configuration."), + UNSUPPORTED_FORMAT("RocketMQ-5", "Unsupported output format."); public String code; public String description; diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java new file mode 100644 index 000000000..375158854 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/JsonRocketMQSerializationSchema.java @@ -0,0 +1,92 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.format; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +public class JsonRocketMQSerializationSchema implements RocketMQSerializationSchema { + private static final Logger LOG = LoggerFactory.getLogger(JsonRocketMQSerializationSchema.class); + + private static final long serialVersionUID = 3L; + + private final List partitionKeyIndices; + private final List keyIndices; + private final transient JsonRowSerializationSchema rowSerializationSchema; + + public JsonRocketMQSerializationSchema(BitSailConfiguration bitSailConfiguration, RowTypeInfo rowTypeInfo, + List partitionKeyIndices, List keyIndices) { + this.partitionKeyIndices = partitionKeyIndices; + this.keyIndices = keyIndices; + this.rowSerializationSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo); + } + + @Override + public byte[] serializeKey(Row row) { + if (keyIndices != null) { + String key = this.keyIndices.stream() + .map(i -> { + Object keyField = row.getField(i); + if (keyField != null) { + return keyField.toString(); + } + LOG.warn("Found null key in row: [{}]", row); + return null; + }) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.joining()); + return key.getBytes(StandardCharsets.UTF_8); + } else { + return null; + } + } + + @Override + public byte[] serializeValue(Row row) { + return rowSerializationSchema.serialize(row); + } + + @Override + public String getPartitionKey(Row row) { + if (partitionKeyIndices != null) { + return this.partitionKeyIndices.stream() + .map(i -> { + Object partitionField = row.getField(i); + if (partitionField != null) { + return partitionField.toString(); + } + LOG.warn("Found null key in row: [{}]", row); + return null; + }) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.joining()); + } else { + return null; + } + } +} + diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java new file mode 100644 index 000000000..c454fd9a4 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.format; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat; + +import java.util.List; + +public class RocketMQSerializationFactory { + + RowTypeInfo rowTypeInfo; + List partitionIndices; + List keyIndices; + + public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List partitionIndices, List keyIndices) { + this.rowTypeInfo = rowTypeInfo; + this.partitionIndices = partitionIndices; + this.keyIndices = keyIndices; + } + + public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) { + if (format == RocketMQSinkFormat.JSON) { + return new JsonRocketMQSerializationSchema(bitSailConfiguration, rowTypeInfo, partitionIndices, keyIndices); + } + throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT, + "unsupported sink format: " + format); + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java new file mode 100644 index 000000000..002c72cdd --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/format/RocketMQSerializationSchema.java @@ -0,0 +1,30 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.format; + +import com.bytedance.bitsail.common.row.Row; + +public interface RocketMQSerializationSchema { + + byte[] serializeKey(Row row); + + byte[] serializeValue(Row row); + + default String getPartitionKey(Row row) { + return null; + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java new file mode 100644 index 000000000..a28027bed --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/option/RocketMQWriterOptions.java @@ -0,0 +1,115 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.WriterOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX; + +public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions { + + @Essential + ConfigOption NAME_SERVER_ADDRESS = + key(WRITER_PREFIX + "name_server_address") + .noDefaultValue(String.class); + + ConfigOption PRODUCER_GROUP = + key(WRITER_PREFIX + "producer_group") + .noDefaultValue(String.class); + + @Essential + ConfigOption TOPIC = + key(WRITER_PREFIX + "topic") + .noDefaultValue(String.class); + + ConfigOption TAG = + key(WRITER_PREFIX + "tag") + .noDefaultValue(String.class); + + ConfigOption ENABLE_BATCH_FLUSH = + key(WRITER_PREFIX + "enable_batch_flush") + .defaultValue(true); + + ConfigOption BATCH_SIZE = + key(WRITER_PREFIX + "batch_size") + .defaultValue(100); + + /** + * when encounter errors while sending:
+ * true: log the error
+ * false: throw exceptions + */ + ConfigOption LOG_FAILURES_ONLY = + key(WRITER_PREFIX + "log_failures_only") + .defaultValue(false); + + ConfigOption ENABLE_SYNC_SEND = + key(WRITER_PREFIX + "enable_sync_send") + .defaultValue(false); + + ConfigOption ACCESS_KEY = + key(WRITER_PREFIX + "access_key") + .noDefaultValue(String.class); + + ConfigOption SECRET_KEY = + key(WRITER_PREFIX + "secret_key") + .noDefaultValue(String.class); + + ConfigOption SEND_FAILURE_RETRY_TIMES = + key(WRITER_PREFIX + "send_failure_retry_times") + .defaultValue(3); + + ConfigOption SEND_MESSAGE_TIMEOUT = + key(WRITER_PREFIX + "send_message_timeout_ms") + .defaultValue(3000); + + ConfigOption MAX_MESSAGE_SIZE = + key(WRITER_PREFIX + "max_message_size_bytes") + .defaultValue(4194304); + + ConfigOption KEY_FIELDS = + key(WRITER_PREFIX + "key") + .noDefaultValue(String.class); + + ConfigOption PARTITION_FIELDS = + key(WRITER_PREFIX + "partition_fields") + .noDefaultValue(String.class); + + ConfigOption FORMAT = + key(WRITER_PREFIX + "format") + .defaultValue("json"); + + ConfigOption DEFAULT_TOPIC_QUEUE_NUMS = + key(WRITER_PREFIX + "default_topic_queue_nums") + .defaultValue(4); + + ConfigOption COMPRESS_MSG_BODY_SIZE = + key(WRITER_PREFIX + "compress_msg_body_over_how_much") + .defaultValue(4096); + + ConfigOption HEART_BEAT_BROKER_INTERVAL = + key(WRITER_PREFIX + "heart_beat_broker_interval") + .defaultValue(30000); + + ConfigOption VIP_CHANNEL_ENABLED = + key(WRITER_PREFIX + "vip_channel_enabled") + .defaultValue(false); + +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java new file mode 100644 index 000000000..4bb48890c --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelector.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + +public class HashQueueSelector implements MessageQueueSelector { + + private int nullKeyCount; + + public HashQueueSelector() { + super(); + nullKeyCount = 0; + } + + @Override + public MessageQueue select(List mqList, Message message, Object partitionKeys) { + int queueId; + + if (partitionKeys != null) { + queueId = partitionKeys.hashCode() % mqList.size(); + } else { + queueId = nullKeyCount % mqList.size(); + nullKeyCount = (nullKeyCount + 1) % mqList.size(); + } + + return mqList.get(queueId); + } +} + diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java new file mode 100644 index 000000000..9f8b0d397 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQProducer.java @@ -0,0 +1,267 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import com.bytedance.bitsail.common.util.Preconditions; +import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; + +import lombok.Setter; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class RocketMQProducer implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQProducer.class); + + private static final long serialVersionUID = 1L; + + /* necessary properties for @{link DefaultMQProducer} */ + private final String nameServerAddress; + private final String producerGroup; + + /* flush settings */ + private final int batchSize; + private final boolean batchFlushEnable; + private final boolean logFailuresOnly; + private final boolean enableSyncSend; + + /* acl control */ + private final String accessKey; + private final String secretKey; + + /* props for producer */ + private final int failureRetryTimes; + private final int sendMsgTimeout; + private final int maxMessageSize; + private final boolean vipChannelEnabled; + private final int defaultTopicQueueNums; + private final int compressMsgBodyOverHowmuch; + private final int heartbeatBrokerInterval; + + /* messages and producer */ + private final List messageList; + private transient DefaultMQProducer producer; + + private transient SendCallback callBack; + private transient volatile Exception flushException; + + @Setter + private boolean enableQueueSelector; + private transient HashQueueSelector queueSelector; + + public RocketMQProducer(RocketMQSinkConfig sinkConfig) { + this.nameServerAddress = sinkConfig.getNameServerAddress(); + this.producerGroup = sinkConfig.getProducerGroup(); + this.accessKey = sinkConfig.getAccessKey(); + this.secretKey = sinkConfig.getSecretKey(); + this.failureRetryTimes = sinkConfig.getFailureRetryTimes(); + this.sendMsgTimeout = sinkConfig.getSendMsgTimeout(); + this.maxMessageSize = sinkConfig.getMaxMessageSize(); + + this.batchSize = sinkConfig.getBatchSize(); + this.batchFlushEnable = sinkConfig.isEnableBatchFlush(); + this.logFailuresOnly = sinkConfig.isLogFailuresOnly(); + this.enableSyncSend = sinkConfig.isEnableSyncSend(); + + this.enableQueueSelector = false; + this.messageList = new ArrayList<>(); + this.compressMsgBodyOverHowmuch = sinkConfig.getCompressMsgBodyOverHowmuch(); + this.defaultTopicQueueNums = sinkConfig.getDefaultTopicQueueNums(); + this.heartbeatBrokerInterval = sinkConfig.getHeartbeatBrokerInterval(); + this.vipChannelEnabled = sinkConfig.isVipChannelEnabled(); + } + + /** + * initialize message list, open rocketmq producer + */ + public void open() { + // step1: initialize callback and queue-selector + if (logFailuresOnly) { + callBack = new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) {} + + @Override + public void onException(Throwable throwable) { + LOG.error("Error while sending record to rocketmq: " + throwable.getMessage(), throwable); + } + }; + } else { + callBack = new SendCallback() { + @Override + public void onSuccess(SendResult result) {} + + @Override + public void onException(Throwable e) { + if (flushException == null) { + flushException = new IOException(e.getMessage(), e); + } + } + }; + } + + if (enableQueueSelector) { + this.queueSelector = new HashQueueSelector(); + } + + // step2: open a producer + if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { + AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + this.producer = new DefaultMQProducer(producerGroup, rpcHook); + } else { + this.producer = new DefaultMQProducer(producerGroup); + } + + producer.setNamesrvAddr(nameServerAddress); + producer.setRetryTimesWhenSendFailed(failureRetryTimes); + producer.setSendMsgTimeout(sendMsgTimeout); + producer.setMaxMessageSize(maxMessageSize); + producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch); + producer.setDefaultTopicQueueNums(defaultTopicQueueNums); + producer.setHeartbeatBrokerInterval(heartbeatBrokerInterval); + producer.setVipChannelEnabled(vipChannelEnabled); + + try { + producer.start(); + } catch (MQClientException e) { + LOG.error("Failed to start rocketmq producer.", e); + throw new RuntimeException(e); + } + } + + public void validateParams() { + if (enableQueueSelector && batchFlushEnable) { + LOG.warn("Queue selector can not be used int batch flush mode!"); + } + Preconditions.checkNotNull(nameServerAddress); + Preconditions.checkNotNull(producerGroup); + } + + /** + * send row to rocketmq + */ + public void send(Message message, String partitionKeys) throws Exception { + checkErroneous(); + if (batchFlushEnable) { + synchronized (messageList) { + messageList.add(message); + if (messageList.size() >= batchSize) { + flushMessages(enableSyncSend); + } + } + } else { + sendSingleMessage(message, partitionKeys, enableSyncSend); + } + } + + /** + * flush message list to rocketmq producer + */ + public void flushMessages(boolean syncSend) { + if (messageList.isEmpty()) { + return; + } + + try { + sendMessageBatch(messageList, syncSend); + } catch (Exception e) { + LOG.error("Failed to flush.", e); + throw new RuntimeException("Rocketmq flush exception", e); + } + messageList.clear(); + } + + /** + * close producer + */ + public void close() throws Exception { + LOG.info("Closing RocketMQProducer."); + + if (producer != null) { + try { + flushMessages(true); + } catch (Exception e) { + LOG.error("Failed to send last batch of message.", e); + throw new RuntimeException("Failed to send last batch of message.", e); + } + + producer.shutdown(); + } + + checkErroneous(); + } + + private void checkErroneous() throws IOException { + Exception e = flushException; + if (e != null) { + // prevent double throwing + flushException = null; + throw new IOException("Failed to send data to rocketmq: " + e.getMessage(), e); + } + } + + /** + * Send single message. + */ + private void sendSingleMessage(Message message, String partitionKeys, boolean enableSyncSend) throws Exception { + if (enableSyncSend) { + SendResult result; + if (enableQueueSelector) { + result = producer.send(message, queueSelector, partitionKeys); + } else { + result = producer.send(message); + } + if (result.getSendStatus() != SendStatus.SEND_OK) { + throw new RemotingException(result.toString()); + } + } else { + if (enableQueueSelector) { + producer.send(message, queueSelector, partitionKeys, callBack); + } else { + producer.send(message, callBack); + } + } + } + + /** + * Send multi message. + */ + private void sendMessageBatch(List messages, boolean syncSend) throws Exception { + if (syncSend) { + SendResult result = producer.send(messages); + if (result.getSendStatus() != SendStatus.SEND_OK) { + throw new RemotingException(result.toString()); + } + } else { + producer.send(messages, callBack); + } + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java new file mode 100644 index 000000000..e6fbcc5a7 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQSink.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import com.bytedance.bitsail.base.connector.writer.v1.Sink; +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.connector.rocketmq.constants.RocketMQConstants; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +public class RocketMQSink implements Sink { + private BitSailConfiguration commonConf; + + private BitSailConfiguration writerConf; + + @Override + public String getWriterName() { + return RocketMQConstants.CONNECTOR_NAME; + } + + @Override + public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception { + this.commonConf = commonConfiguration; + this.writerConf = writerConfiguration; + } + + @Override + public Writer createWriter(Writer.Context context) throws IOException { + return new RocketMQWriter<>(commonConf, writerConf, context); + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new BitSailTypeInfoConverter(); + } + + @Override + public Optional> createCommitter() { + return Optional.empty(); + } +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java new file mode 100644 index 000000000..426893bf9 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/RocketMQWriter.java @@ -0,0 +1,165 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.util.Preconditions; +import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.format.RocketMQSerializationFactory; +import com.bytedance.bitsail.connector.rocketmq.format.RocketMQSerializationSchema; +import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; +import com.bytedance.bitsail.connector.rocketmq.sink.config.RocketMQSinkConfig; +import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class RocketMQWriter implements Writer { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQWriter.class); + + private final RocketMQProducer rocketmqProducer; + + private final String topic; + private final String tag; + + private final List partitionIndices; + + private final RocketMQSerializationSchema serializationSchema; + + public RocketMQWriter(BitSailConfiguration commonConf, BitSailConfiguration writerConf, + Context context) throws IOException { + RocketMQSinkConfig sinkConfig = new RocketMQSinkConfig(writerConf); + this.topic = sinkConfig.getTopic(); + this.tag = sinkConfig.getTag(); + this.rocketmqProducer = new RocketMQProducer(sinkConfig); + + List columns = writerConf.getNecessaryOption(RocketMQWriterOptions.COLUMNS, + RocketMQErrorCode.REQUIRED_VALUE); + + String partitionFields = writerConf.getNecessaryOption(RocketMQWriterOptions.PARTITION_FIELDS, RocketMQErrorCode.REQUIRED_VALUE); + this.partitionIndices = getIndicesByFieldNames(columns, partitionFields); + + String keyFields = writerConf.getNecessaryOption(RocketMQWriterOptions.KEY_FIELDS, RocketMQErrorCode.REQUIRED_VALUE); + List keyIndices = getIndicesByFieldNames(columns, keyFields); + this.open(); + + String formatType = writerConf.get(RocketMQWriterOptions.FORMAT); + RocketMQSinkFormat sinkFormat = RocketMQSinkFormat.valueOf(formatType.toUpperCase()); + + LOG.info("RocketMQ producer settings: " + sinkConfig); + LOG.info("RocketMQ partition fields indices: " + partitionIndices); + LOG.info("RocketMQ key indices: " + keyIndices); + LOG.info("RocketMQ sink format type: " + sinkFormat); + + RocketMQSerializationFactory factory = new RocketMQSerializationFactory(context.getRowTypeInfo(), partitionIndices, keyIndices); + this.serializationSchema = factory.getSerializationSchemaByFormat(writerConf, sinkFormat); + } + + public void open() throws IOException { + if (partitionIndices != null && !partitionIndices.isEmpty()) { + rocketmqProducer.setEnableQueueSelector(true); + } + rocketmqProducer.validateParams(); + + try { + this.rocketmqProducer.open(); + } catch (Exception e) { + throw new IOException("failed to open rocketmq producer: " + e.getMessage(), e); + } + } + + @Override + public void write(Row row) throws IOException { + Message message = prepareMessage(row); + String partitionKeys = serializationSchema.getPartitionKey(row); + try { + rocketmqProducer.send(message, partitionKeys); + } catch (Exception e) { + throw new IOException("failed to send record to rocketmq: " + e.getMessage(), e); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException { + synchronized (this) { + if (Objects.nonNull(rocketmqProducer)) { + rocketmqProducer.flushMessages(!endOfInput); + } + if (endOfInput) { + LOG.info("all records are sent to commit buffer."); + } + } + } + + @Override + public List prepareCommit() throws IOException { + return null; + } + + /** + * transform BitSail Row to RocketMQ Message (value, topic, tag) + */ + private Message prepareMessage(Row row) { + byte[] k = serializationSchema.serializeKey(row); + byte[] value = serializationSchema.serializeValue(row); + String key = k != null ? new String(k, StandardCharsets.UTF_8) : ""; + + Preconditions.checkNotNull(topic, "the message topic is null"); + Preconditions.checkNotNull(value, "the message body is null"); + + Message msg = new Message(topic, value); + msg.setKeys(key); + msg.setTags(tag); + + return msg; + } + + /** + * get indices by field names + */ + private List getIndicesByFieldNames(List columns, String fieldNames) { + if (StringUtils.isEmpty(fieldNames)) { + return null; + } + + List fields = Arrays.asList(fieldNames.split(",\\s*")); + List indices = fields.stream().map(field -> { + for (int i = 0; i < columns.size(); ++i) { + String columnName = columns.get(i).getName().trim(); + if (columnName.equals(field)) { + return i; + } + } + throw new IllegalArgumentException("Field " + field + " not found in columns! All fields are: " + fieldNames); + }).collect(Collectors.toList()); + return indices.isEmpty() ? null : indices; + } + +} diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java new file mode 100644 index 000000000..5234efad8 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/config/RocketMQSinkConfig.java @@ -0,0 +1,104 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode; +import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; + +import lombok.Data; + +import java.io.Serializable; +import java.util.UUID; + +@Data +public class RocketMQSinkConfig implements Serializable { + private static final long serialVersionUID = 2L; + + private String nameServerAddress; + private String producerGroup; + private String topic; + private String tag; + + private boolean enableBatchFlush; + private int batchSize; + + private String accessKey; + private String secretKey; + + private boolean logFailuresOnly; + private boolean enableSyncSend; + + private int failureRetryTimes; + private int sendMsgTimeout; + private int maxMessageSize; + private boolean vipChannelEnabled; + private int defaultTopicQueueNums; + private int compressMsgBodyOverHowmuch; + private int heartbeatBrokerInterval; + + public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) { + this.nameServerAddress = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.NAME_SERVER_ADDRESS, + RocketMQErrorCode.REQUIRED_VALUE); + this.producerGroup = outputSliceConfig.getUnNecessaryOption(RocketMQWriterOptions.PRODUCER_GROUP, + UUID.randomUUID().toString()); + this.topic = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.TOPIC, + RocketMQErrorCode.REQUIRED_VALUE); + this.tag = outputSliceConfig.get(RocketMQWriterOptions.TAG); + + this.enableBatchFlush = outputSliceConfig.get(RocketMQWriterOptions.ENABLE_BATCH_FLUSH); + this.batchSize = outputSliceConfig.get(RocketMQWriterOptions.BATCH_SIZE); + + this.accessKey = outputSliceConfig.get(RocketMQWriterOptions.ACCESS_KEY); + this.secretKey = outputSliceConfig.get(RocketMQWriterOptions.SECRET_KEY); + + this.logFailuresOnly = outputSliceConfig.get(RocketMQWriterOptions.LOG_FAILURES_ONLY); + this.enableSyncSend = outputSliceConfig.get(RocketMQWriterOptions.ENABLE_SYNC_SEND); + + this.failureRetryTimes = outputSliceConfig.get(RocketMQWriterOptions.SEND_FAILURE_RETRY_TIMES); + this.sendMsgTimeout = outputSliceConfig.get(RocketMQWriterOptions.SEND_MESSAGE_TIMEOUT); + this.maxMessageSize = outputSliceConfig.get(RocketMQWriterOptions.MAX_MESSAGE_SIZE); + this.vipChannelEnabled = outputSliceConfig.get(RocketMQWriterOptions.VIP_CHANNEL_ENABLED); + this.defaultTopicQueueNums = outputSliceConfig.get(RocketMQWriterOptions.DEFAULT_TOPIC_QUEUE_NUMS); + this.compressMsgBodyOverHowmuch = outputSliceConfig.get(RocketMQWriterOptions.COMPRESS_MSG_BODY_SIZE); + this.heartbeatBrokerInterval = outputSliceConfig.get(RocketMQWriterOptions.HEART_BEAT_BROKER_INTERVAL); + } + + @Override + public String toString() { + return "RocketMQSinkConfig{" + + "nameServerAddress='" + nameServerAddress + '\'' + + ", producerGroup='" + producerGroup + '\'' + + ", topic='" + topic + '\'' + + ", tag='" + tag + '\'' + + ", enableBatchFlush=" + enableBatchFlush + + ", batchSize=" + batchSize + + ", accessKey='" + accessKey + '\'' + + ", secretKey='" + secretKey + '\'' + + ", logFailuresOnly=" + logFailuresOnly + + ", enableSyncSend=" + enableSyncSend + + ", failureRetryTimes=" + failureRetryTimes + + ", sendMsgTimeout=" + sendMsgTimeout + + ", maxMessageSize=" + maxMessageSize + + ", vipChannelEnabled=" + vipChannelEnabled + + ", defaultTopicQueueNums=" + defaultTopicQueueNums + + ", compressMsgBodyOverHowmuch=" + compressMsgBodyOverHowmuch + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + + '}'; + } +} + diff --git a/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java new file mode 100644 index 000000000..37b6bdcf0 --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/main/java/com/bytedance/bitsail/connector/rocketmq/sink/format/RocketMQSinkFormat.java @@ -0,0 +1,21 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink.format; + +public enum RocketMQSinkFormat { + JSON +} diff --git a/bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java b/bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java new file mode 100644 index 000000000..d71c9a9ca --- /dev/null +++ b/bitsail-connectors/connector-rocketmq/src/test/java/com/bytedance/bitsail/connector/rocketmq/sink/HashQueueSelectorTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.rocketmq.sink; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class HashQueueSelectorTest { + + private HashQueueSelector selector; + + @Before + public void setUp() { + selector = new HashQueueSelector(); + } + + @Test + public void testSelectWithNonNullPartitionKey() { + List mqList = createMessageQueueList(3); + Message message = new Message("TestTopic", "TestTag", "TestBody".getBytes()); + Object partitionKey = "nonNullKey"; + + MessageQueue selectedQueue = selector.select(mqList, message, partitionKey); + + int expectedQueueId = Math.abs(partitionKey.hashCode()) % mqList.size(); + Assert.assertEquals(mqList.get(expectedQueueId), selectedQueue); + } + + @Test + public void testSelectWithNullPartitionKey() { + List mqList = createMessageQueueList(3); + Message message = new Message("TestTopic", "TestTag", "TestBody".getBytes()); + + MessageQueue selectedQueue1 = selector.select(mqList, message, null); + MessageQueue selectedQueue2 = selector.select(mqList, message, null); + + Assert.assertEquals(mqList.get(0), selectedQueue1); + Assert.assertEquals(mqList.get(1), selectedQueue2); + } + + private List createMessageQueueList(int size) { + List mqList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + mqList.add(new MessageQueue("TestTopic", "TestBroker", i)); + } + return mqList; + } + +} \ No newline at end of file diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java new file mode 100644 index 000000000..e492dced0 --- /dev/null +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/java/com/bytedance/bitsail/test/integration/rocketmq/RocketMQSinkITCase.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.test.integration.rocketmq; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.fake.option.FakeReaderOptions; +import com.bytedance.bitsail.connector.rocketmq.option.RocketMQWriterOptions; +import com.bytedance.bitsail.test.integration.AbstractIntegrationTest; +import com.bytedance.bitsail.test.integration.utils.JobConfUtils; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Ignore("Ignore unbounded streaming task.") +public class RocketMQSinkITCase extends AbstractIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSinkITCase.class); + + private static final int TOTAL_SEND_COUNT = 300; + + private static final String NAME_SERVER_ADDRESS = "127.0.0.1:10911"; + private static final String TOPIC_NAME = "test_topic"; + private static final String PRODUCER_GROUP = "test_producer_group"; + private static final String TAG = "itcase_test"; + + @Test + public void testFakeToRocketMQ() throws Exception { + BitSailConfiguration configuration = JobConfUtils.fromClasspath("fake_to_rocketmq.json"); + updateConfiguration(configuration); + submitJob(configuration); + + Queue messages = consumeTopic(); + Assert.assertEquals(TOTAL_SEND_COUNT, messages.size()); + } + + private Queue consumeTopic() throws Exception { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); + consumer.setNamesrvAddr(NAME_SERVER_ADDRESS); + consumer.subscribe(TOPIC_NAME, TAG); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + Queue messageQ = new ConcurrentLinkedQueue<>(); + + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + messageQ.addAll(msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + try { + consumer.start(); + } catch (MQClientException e) { + LOG.info("send message failed. {}", e.toString()); + } + Thread.sleep(5000); + consumer.shutdown(); + + for (MessageExt msg : messageQ) { + LOG.debug(new String(msg.getBody())); + } + + LOG.info("Total get {} messages.", messageQ.size()); + return messageQ; + } + + protected void updateConfiguration(BitSailConfiguration jobConfiguration) { + jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT); + jobConfiguration.set(RocketMQWriterOptions.NAME_SERVER_ADDRESS, NAME_SERVER_ADDRESS); + jobConfiguration.set(RocketMQWriterOptions.TOPIC, TOPIC_NAME); + jobConfiguration.set(RocketMQWriterOptions.PRODUCER_GROUP, PRODUCER_GROUP); + jobConfiguration.set(RocketMQWriterOptions.TAG, TAG); + jobConfiguration.set(RocketMQWriterOptions.PARTITION_FIELDS, "id"); + jobConfiguration.set(RocketMQWriterOptions.ENABLE_BATCH_FLUSH, false); + jobConfiguration.set(RocketMQWriterOptions.FORMAT, "json"); + } +} diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json new file mode 100644 index 000000000..aa0b90675 --- /dev/null +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-rocketmq/src/test/resources/fake_to_rocketmq.json @@ -0,0 +1,70 @@ +{ + "job": { + "common": { + "job_id": -2413, + "job_name": "bitsail_test_integration_fake_to_rockermq", + "instance_id": -20413, + "user_name": "user" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.fake.source.FakeSource", + "total_count": 300, + "rate": 100000, + "random_null_rate": 0, + "unique_fields": "id", + "columns": [ + { + "name": "id", + "type": "string" + }, + { + "name": "string_field", + "type": "string" + }, + { + "name": "int_field", + "type": "bigint" + }, + { + "name": "double_field", + "type": "double" + }, + { + "name": "date_field", + "type": "date.date" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.rocketmq.sink.RocketMQSink", + "name_server_address": "127.0.0.1:10911", + "producer_group": "test_producer_group", + "topic": "test_topic", + "tag": "itcase_test", + "key": "id", + "partition_fields": "id,date_field", + "columns": [ + { + "name": "id", + "type": "string" + }, + { + "name": "string_field", + "type": "string" + }, + { + "name": "int_field", + "type": "long" + }, + { + "name": "double_field", + "type": "double" + }, + { + "name": "date_field", + "type": "date" + } + ] + } + } +}