Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BitSail][Connector] Migrate rocketmq Sink connector to V1 interface #457

Merged
merged 11 commits into from
Mar 27, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.constants;

public class OptionalProducerConfig {
public static final String INSTANCE_NAME = "instanceName";
public static final String VIP_CHANNEL = "vipChannelEnabled";
public static final String DEFAULT_TOPIC_QUEUE_NUMS = "defaultTopicQueueNums";
public static final String COMPRESS_SG_BODY_OVER = "compressMsgBodyOverHowmuch";
public static final String HEARTBEAT_BROKER_INTERVAL = "heartbeatBrokerInterval";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.constants;

public class RocketMQConstants {

public static final String CONNECTOR_NAME = "rocketmq";

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public enum RocketMQErrorCode implements ErrorCode {

CONSUMER_CREATE_FAILED("RocketMQ-1", "RocketMQ Consumer create failed."),
CONSUMER_FETCH_OFFSET_FAILED("RocketMQ-2", "RocketMQ Consumer fetch offset failed."),
CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed.");
CONSUMER_SEEK_OFFSET_FAILED("RocketMQ-3", "RocketMQ Consumer seek offset failed."),
REQUIRED_VALUE("RocketMQ-4", "You missed parameter which is required, please check your configuration."),
UNSUPPORTED_FORMAT("RocketMQ-5", "Unsupported output format.");

public String code;
public String description;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.format;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
import com.bytedance.bitsail.component.format.json.JsonRowSerializationSchema;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

public class JsonRocketMQSerializationSchema implements RocketMQSerializationSchema {
private static final Logger LOG = LoggerFactory.getLogger(JsonRocketMQSerializationSchema.class);

private static final long serialVersionUID = 3L;

private final List<Integer> partitionKeyIndices;
private final List<Integer> keyIndices;
private final transient JsonRowSerializationSchema rowSerializationSchema;

public JsonRocketMQSerializationSchema(BitSailConfiguration bitSailConfiguration, RowTypeInfo rowTypeInfo,
List<Integer> partitionKeyIndices, List<Integer> keyIndices) {
this.partitionKeyIndices = partitionKeyIndices;
this.keyIndices = keyIndices;
this.rowSerializationSchema = new JsonRowSerializationSchema(bitSailConfiguration, rowTypeInfo);
}

@Override
public byte[] serializeKey(Row row) {
if (keyIndices != null) {
String key = this.keyIndices.stream()
.map(i -> {
Object keyField = row.getField(i);
if (keyField != null) {
return keyField.toString();
}
LOG.warn("Found null key in row: [{}]", row);
return null;
})
.filter(StringUtils::isNotEmpty)
.collect(Collectors.joining());
return key.getBytes(StandardCharsets.UTF_8);
} else {
return null;
}
}

@Override
public byte[] serializeValue(Row row) {
return rowSerializationSchema.serialize(row);
}

@Override
public String getPartitionKey(Row row) {
if (partitionKeyIndices != null) {
return this.partitionKeyIndices.stream()
.map(i -> {
Object partitionField = row.getField(i);
if (partitionField != null) {
return partitionField.toString();
}
LOG.warn("Found null key in row: [{}]", row);
return null;
})
.filter(StringUtils::isNotEmpty)
.collect(Collectors.joining());
} else {
return null;
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.format;

import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
import com.bytedance.bitsail.connector.rocketmq.error.RocketMQErrorCode;
import com.bytedance.bitsail.connector.rocketmq.sink.format.RocketMQSinkFormat;

import java.util.List;

public class RocketMQSerializationFactory {

RowTypeInfo rowTypeInfo;
List<Integer> partitionIndices;
List<Integer> keyIndices;

public RocketMQSerializationFactory(RowTypeInfo rowTypeInfo, List<Integer> partitionIndices, List<Integer> keyIndices) {
this.rowTypeInfo = rowTypeInfo;
this.partitionIndices = partitionIndices;
this.keyIndices = keyIndices;
}

public RocketMQSerializationSchema getSerializationSchemaByFormat(BitSailConfiguration bitSailConfiguration, RocketMQSinkFormat format) {
if (format == RocketMQSinkFormat.JSON) {
return new JsonRocketMQSerializationSchema(bitSailConfiguration, rowTypeInfo, partitionIndices, keyIndices);
}
throw BitSailException.asBitSailException(RocketMQErrorCode.UNSUPPORTED_FORMAT,
"unsupported sink format: " + format);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.format;

import com.bytedance.bitsail.common.row.Row;

public interface RocketMQSerializationSchema {

byte[] serializeKey(Row row);

byte[] serializeValue(Row row);

default String getPartitionKey(Row row) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.option;

import com.bytedance.bitsail.common.annotation.Essential;
import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.WriterOptions;

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX;

public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions {

@Essential
ConfigOption<String> NAME_SERVER_ADDRESS =
key(WRITER_PREFIX + "name_server_address")
.noDefaultValue(String.class);

ConfigOption<String> PRODUCER_GROUP =
key(WRITER_PREFIX + "producer_group")
.noDefaultValue(String.class);

@Essential
ConfigOption<String> TOPIC =
key(WRITER_PREFIX + "topic")
.noDefaultValue(String.class);

ConfigOption<String> TAG =
key(WRITER_PREFIX + "tag")
.noDefaultValue(String.class);

ConfigOption<Boolean> ENABLE_BATCH_FLUSH =
key(WRITER_PREFIX + "enable_batch_flush")
.defaultValue(true);

ConfigOption<Integer> BATCH_SIZE =
key(WRITER_PREFIX + "batch_size")
.defaultValue(100);

/**
* when encounter errors while sending:<br/>
* true: log the error<br/>
* false: throw exceptions
*/
ConfigOption<Boolean> LOG_FAILURES_ONLY =
key(WRITER_PREFIX + "log_failures_only")
.defaultValue(false);

ConfigOption<Boolean> ENABLE_SYNC_SEND =
key(WRITER_PREFIX + "enable_sync_send")
.defaultValue(false);

ConfigOption<String> ACCESS_KEY =
key(WRITER_PREFIX + "access_key")
.noDefaultValue(String.class);

ConfigOption<String> SECRET_KEY =
key(WRITER_PREFIX + "secret_key")
.noDefaultValue(String.class);

ConfigOption<Integer> SEND_FAILURE_RETRY_TIMES =
key(WRITER_PREFIX + "send_failure_retry_times")
.defaultValue(3);

ConfigOption<Integer> SEND_MESSAGE_TIMEOUT =
key(WRITER_PREFIX + "send_message_timeout_ms")
.defaultValue(3000);

ConfigOption<Integer> MAX_MESSAGE_SIZE =
key(WRITER_PREFIX + "max_message_size_bytes")
.defaultValue(4194304);

ConfigOption<String> KEY_FIELDS =
key(WRITER_PREFIX + "key")
.noDefaultValue(String.class);

ConfigOption<String> PARTITION_FIELDS =
key(WRITER_PREFIX + "partition_fields")
.noDefaultValue(String.class);

ConfigOption<String> FORMAT =
key(WRITER_PREFIX + "format")
.defaultValue("json");

ConfigOption<Integer> DEFAULT_TOPIC_QUEUE_NUMS =
key(WRITER_PREFIX + "default_topic_queue_nums")
.defaultValue(4);

ConfigOption<Integer> COMPRESS_MSG_BODY_SIZE =
key(WRITER_PREFIX + "compress_msg_body_over_how_much")
.defaultValue(4096);

ConfigOption<Integer> HEART_BEAT_BROKER_INTERVAL =
key(WRITER_PREFIX + "heart_beat_broker_interval")
.defaultValue(30000);

ConfigOption<Boolean> VIP_CHANNEL_ENABLED =
key(WRITER_PREFIX + "vip_channel_enabled")
.defaultValue(false);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.rocketmq.sink;

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class HashQueueSelector implements MessageQueueSelector {

private int nullKeyCount;

public HashQueueSelector() {
super();
nullKeyCount = 0;
}

@Override
public MessageQueue select(List<MessageQueue> mqList, Message message, Object partitionKeys) {
int queueId;

if (partitionKeys != null) {
queueId = partitionKeys.hashCode() % mqList.size();
} else {
queueId = nullKeyCount % mqList.size();
nullKeyCount = (nullKeyCount + 1) % mqList.size();
}

return mqList.get(queueId);
}
}

Loading