From ef2c3c7283e7c1ff7ce445b71a62977ac5ed0403 Mon Sep 17 00:00:00 2001 From: limin Date: Tue, 29 Oct 2024 21:59:36 +0800 Subject: [PATCH] [Improve][Connector-V2] Redis support custom key and value (#7888) Co-authored-by: limin --- docs/en/connector-v2/sink/Redis.md | 134 ++++++++++++++--- docs/zh/connector-v2/sink/Redis.md | 137 +++++++++++++++--- .../seatunnel/redis/config/RedisConfig.java | 26 ++++ .../redis/config/RedisParameters.java | 20 +++ .../redis/sink/RedisSinkFactory.java | 4 + .../seatunnel/redis/sink/RedisSinkWriter.java | 113 +++++++++++++-- .../redis/RedisTestCaseTemplateIT.java | 85 +++++++++++ ...is-to-redis-custom-hash-key-and-value.conf | 119 +++++++++++++++ .../resources/redis-to-redis-custom-key.conf | 118 +++++++++++++++ .../redis-to-redis-custom-value-for-key.conf | 119 +++++++++++++++ .../redis-to-redis-custom-value-for-list.conf | 118 +++++++++++++++ .../redis-to-redis-custom-value-for-set.conf | 118 +++++++++++++++ .../redis-to-redis-custom-value-for-zset.conf | 118 +++++++++++++++ 13 files changed, 1182 insertions(+), 47 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index b5f444bb117..5b37720891b 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -12,21 +12,25 @@ Used to write data to Redis. ## Options -| name | type | required | default value | -|----------------|--------|-----------------------|---------------| -| host | string | yes | - | -| port | int | yes | - | -| key | string | yes | - | -| data_type | string | yes | - | -| batch_size | int | no | 10 | -| user | string | no | - | -| auth | string | no | - | -| db_num | int | no | 0 | -| mode | string | no | single | -| nodes | list | yes when mode=cluster | - | -| format | string | no | json | -| expire | long | no | -1 | -| common-options | | no | - | +| name | type | required | default value | +|--------------------|---------|-----------------------|---------------| +| host | string | yes | - | +| port | int | yes | - | +| key | string | yes | - | +| data_type | string | yes | - | +| batch_size | int | no | 10 | +| user | string | no | - | +| auth | string | no | - | +| db_num | int | no | 0 | +| mode | string | no | single | +| nodes | list | yes when mode=cluster | - | +| format | string | no | json | +| expire | long | no | -1 | +| support_custom_key | boolean | no | false | +| value_field | string | no | - | +| hash_key_field | string | no | - | +| hash_value_field | string | no | - | +| common-options | | no | - | ### host [string] @@ -50,12 +54,12 @@ Upstream data is the following: | 500 | internal error | false | If you assign field name to `code` and data_type to `key`, two data will be written to redis: -1. `200 -> {code: 200, message: true, data: get success}` -2. `500 -> {code: 500, message: false, data: internal error}` +1. `200 -> {code: 200, data: get success, success: true}` +2. `500 -> {code: 500, data: internal error, success: false}` If you assign field name to `value` and data_type to `key`, only one data will be written to redis because `value` is not existed in upstream data's fields: -1. `value -> {code: 500, message: false, data: internal error}` +1. `value -> {code: 500, data: internal error, success: false}` Please see the data_type section for specific writing rules. @@ -85,7 +89,7 @@ Redis data types, support `key` `hash` `list` `set` `zset` > Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption. > - ### batch_size [int] +### batch_size [int] ensure the batch write size in single-machine mode; no guarantees in cluster mode. @@ -135,6 +139,61 @@ Connector will generate data as the following and write it to redis: Set redis expiration time, the unit is second. The default value is -1, keys do not automatically expire by default. +### support_custom_key [boolean] + +if true, the key can be customized by the field value in the upstream data. + +Upstream data is the following: + +| code | data | success | +|------|----------------|---------| +| 200 | get success | true | +| 500 | internal error | false | + +You can customize the Redis key using '{' and '}', and the field name in '{}' will be parsed and replaced by the field value in the upstream data. For example, If you assign field name to `{code}` and data_type to `key`, two data will be written to redis: +1. `200 -> {code: 200, data: get success, success: true}` +2. `500 -> {code: 500, data: internal error, success: false}` + +Redis key can be composed of fixed and variable parts, connected by ':'. For example, If you assign field name to `code:{code}` and data_type to `key`, two data will be written to redis: +1. `code:200 -> {code: 200, data: get success, success: true}` +2. `code:500 -> {code: 500, data: internal error, success: false}` + +### value_field [string] + +The field of value you want to write to redis, `data_type` support `key` `list` `set` `zset`. + +When you assign field name to `value` and value_field is `data` and data_type to `key`, for example: + +Upstream data is the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +The following data will be written to redis: +1. `value -> get success` + +### hash_key_field [string] + +The field of hash key you want to write to redis, `data_type` support `hash` + +### hash_value_field [string] + +The field of hash value you want to write to redis, `data_type` support `hash` + +When you assign field name to `value` and hash_key_field is `data` and hash_value_field is `success` and data_type to `hash`, for example: + +Upstream data is the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +Connector will generate data as the following and write it to redis: + +The following data will be written to redis: +1. `value -> get success | true` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details @@ -152,6 +211,43 @@ Redis { } ``` +custom key: + +```hocon +Redis { + host = localhost + port = 6379 + key = "name:{name}" + support_custom_key = true + data_type = key +} +``` + +custom value: + +```hocon +Redis { + host = localhost + port = 6379 + key = person + value_field = "name" + data_type = key +} +``` + +custom HashKey and HashValue: + +```hocon +Redis { + host = localhost + port = 6379 + key = person + hash_key_field = "name" + hash_value_field = "age" + data_type = hash +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/zh/connector-v2/sink/Redis.md b/docs/zh/connector-v2/sink/Redis.md index b47d9de9146..5640710de4e 100644 --- a/docs/zh/connector-v2/sink/Redis.md +++ b/docs/zh/connector-v2/sink/Redis.md @@ -12,20 +12,25 @@ ## 选项 -| 名称 | 类型 | 是否必须 | 默认值 | -|----------------|--------|---------------------|--------| -| host | string | 是 | - | -| port | int | 是 | - | -| key | string | 是 | - | -| data_type | string | 是 | - | -| user | string | 否 | - | -| auth | string | 否 | - | -| db_num | int | 否 | 0 | -| mode | string | 否 | single | -| nodes | list | 当 mode=cluster 时为:是 | - | -| format | string | 否 | json | -| expire | long | 否 | -1 | -| common-options | | 否 | - | +| name | type | required | default value | +|--------------------|---------|-----------------------|---------------| +| host | string | yes | - | +| port | int | yes | - | +| key | string | yes | - | +| data_type | string | yes | - | +| batch_size | int | no | 10 | +| user | string | no | - | +| auth | string | no | - | +| db_num | int | no | 0 | +| mode | string | no | single | +| nodes | list | yes when mode=cluster | - | +| format | string | no | json | +| expire | long | no | -1 | +| support_custom_key | boolean | no | false | +| value_field | string | no | - | +| hash_key_field | string | no | - | +| hash_value_field | string | no | - | +| common-options | | no | - | ### host [string] @@ -48,13 +53,17 @@ Redis 端口 | 200 | 获取成功 | true | | 500 | 内部错误 | false | -如果将字段名称指定为 `code` 并将 data_type 设置为 `key`,将有两个数据写入 Redis: -1. `200 -> {code: 200, message: true, data: 获取成功}` -2. `500 -> {code: 500, message: false, data: 内部错误}` +可以使用`{`和`}`符号自定义Redis键名,`{}`中的字段名会被解析替换为上游数据中的某个字段值,例如:将字段名称指定为 `{code}` 并将 data_type 设置为 `key`,将有两个数据写入 Redis: +1. `200 -> {code: 200, data: 获取成功, success: true}` +2. `500 -> {code: 500, data: 内部错误, success: false}` -如果将字段名称指定为 `value` 并将 data_type 设置为 `key`,则由于上游数据的字段中没有 `value` 字段,将只有一个数据写入 Redis: +Redis键名可以由固定部分和变化部分组成,通过Redis分组符号:连接,例如:将字段名称指定为 `code:{code}` 并将 data_type 设置为 `key`,将有两个数据写入 Redis: +1. `code:200 -> {code: 200, data: 获取成功, success: true}` +2. `code:500 -> {code: 500, data: 内部错误, success: false}` -1. `value -> {code: 500, message: false, data: 内部错误}` +如果将Redis键名指定为 `value` 并将 data_type 设置为 `key`,则只有一个数据写入 Redis: + +1. `value -> {code: 500, data: 内部错误, success: false}` 请参见 data_type 部分以了解具体的写入规则。 @@ -128,6 +137,59 @@ Redis 节点信息,在集群模式下使用,必须按如下格式: 设置 Redis 的过期时间,单位为秒。默认值为 -1,表示键不会自动过期。 +### support_custom_key [boolean] + +设置为true,表示启用自定义Key。 + +上游数据如下: + +| code | data | success | +|------|------|---------| +| 200 | 获取成功 | true | +| 500 | 内部错误 | false | + +可以使用`{`和`}`符号自定义Redis键名,`{}`中的字段名会被解析替换为上游数据中的某个字段值,例如:将字段名称指定为 `{code}` 并将 data_type 设置为 `key`,将有两个数据写入 Redis: +1. `200 -> {code: 200, data: 获取成功, success: true}` +2. `500 -> {code: 500, data: 内部错误, success: false}` + +Redis键名可以由固定部分和变化部分组成,通过Redis分组符号:连接,例如:将字段名称指定为 `code:{code}` 并将 data_type 设置为 `key`,将有两个数据写入 Redis: +1. `code:200 -> {code: 200, data: 获取成功, success: true}` +2. `code:500 -> {code: 500, data: 内部错误, success: false}` + +### value_field [string] + +要写入Redis的值的字段, `data_type` 支持 `key` `list` `set` `zset`. + +当你指定Redis键名字段`key`指定为 `value`,值字段`value_field`指定为`data`,并将`data_type`指定为`key`时, + +上游数据如下: + +| code | data | success | +|------|------|---------| +| 200 | 获取成功 | true | + +如下的数据会被写入Redis: +1. `value -> 获取成功` + +### hash_key_field [string] + +要写入Redis的hash键字段, `data_type` 支持 `hash` + +### hash_value_field [string] + +要写入Redis的hash值字段, `data_type` 支持 `hash` + +当你指定Redis键名字段`key`指定为 `value`,hash键字段`hash_key_field`指定为`data`,hash值字段`hash_value_field`指定为`success`,并将`data_type`指定为`hash`时, + +上游数据如下: + +| code | data | success | +|------|------|---------| +| 200 | 获取成功 | true | + +如下的数据会被写入Redis: +1. `value -> 获取成功 | true` + ### common options Sink 插件通用参数,请参考 [Sink Common Options](../sink-common-options.md) 获取详情 @@ -145,6 +207,43 @@ Redis { } ``` +自定义Key示例: + +```hocon +Redis { + host = localhost + port = 6379 + key = "name:{name}" + support_custom_key = true + data_type = key +} +``` + +自定义Value示例: + +```hocon +Redis { + host = localhost + port = 6379 + key = person + value_field = "name" + data_type = key +} +``` + +自定义HashKey和HashValue示例: + +```hocon +Redis { + host = localhost + port = 6379 + key = person + hash_key_field = "name" + hash_value_field = "age" + data_type = hash +} +``` + ## 更新日志 ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index 3be5b39de99..c9809868dc9 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -127,6 +127,32 @@ public enum HashKeyParseMode { "batch_size is used to control the size of a batch of data during read and write operations" + ",default 10"); + public static final Option SUPPORT_CUSTOM_KEY = + Options.key("support_custom_key") + .booleanType() + .defaultValue(false) + .withDescription( + "if true, the key can be customized by the field value in the upstream data."); + + public static final Option VALUE_FIELD = + Options.key("value_field") + .stringType() + .noDefaultValue() + .withDescription( + "The field of value you want to write to redis, support string list set zset"); + + public static final Option HASH_KEY_FIELD = + Options.key("hash_key_field") + .stringType() + .noDefaultValue() + .withDescription("The field of hash key you want to write to redis"); + + public static final Option HASH_VALUE_FIELD = + Options.key("hash_value_field") + .stringType() + .noDefaultValue() + .withDescription("The field of hash value you want to write to redis"); + public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index 3d7e954f1d1..6dff3cba71f 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -57,6 +57,10 @@ public class RedisParameters implements Serializable { private List redisNodes = Collections.emptyList(); private long expire = RedisConfig.EXPIRE.defaultValue(); private int batchSize = RedisConfig.BATCH_SIZE.defaultValue(); + private Boolean supportCustomKey; + private String valueField; + private String hashKeyField; + private String hashValueField; private int redisVersion; @@ -97,6 +101,22 @@ public void buildWithConfig(ReadonlyConfig config) { this.redisDataType = config.get(RedisConfig.DATA_TYPE); // Indicates the number of keys to attempt to return per iteration.default 10 this.batchSize = config.get(RedisConfig.BATCH_SIZE); + // set support custom key + if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) { + this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY); + } + // set value field + if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) { + this.valueField = config.get(RedisConfig.VALUE_FIELD); + } + // set hash key field + if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) { + this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD); + } + // set hash value field + if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) { + this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD); + } } public RedisClient buildRedisClient() { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java index 49c2644d707..38098a25603 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java @@ -53,6 +53,10 @@ public OptionRule optionRule() { RedisConfig.KEY_PATTERN, RedisConfig.FORMAT, RedisConfig.EXPIRE, + RedisConfig.SUPPORT_CUSTOM_KEY, + RedisConfig.VALUE_FIELD, + RedisConfig.HASH_KEY_FIELD, + RedisConfig.HASH_VALUE_FIELD, SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) .build(); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index f03c5c48c88..9d5c73df2c8 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; @@ -29,13 +30,20 @@ import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; import org.apache.seatunnel.format.json.JsonSerializationSchema; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class RedisSinkWriter extends AbstractSinkWriter implements SupportMultiTableSinkWriter { + private static final String REDIS_GROUP_DELIMITER = ":"; + private static final String LEFT_PLACEHOLDER_MARKER = "{"; + private static final String RIGHT_PLACEHOLDER_MARKER = "}"; private final SeaTunnelRowType seaTunnelRowType; private final RedisParameters redisParameters; private final SerializationSchema serializationSchema; @@ -60,23 +68,110 @@ public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisP @Override public void write(SeaTunnelRow element) throws IOException { - String data = new String(serializationSchema.serialize(element)); - String keyField = redisParameters.getKeyField(); List fields = Arrays.asList(seaTunnelRowType.getFieldNames()); - String key; - if (fields.contains(keyField)) { - key = element.getField(fields.indexOf(keyField)).toString(); - } else { - key = keyField; - } + String key = getKey(element, fields); keyBuffer.add(key); - valueBuffer.add(data); + String value = getValue(element, fields); + valueBuffer.add(value); if (keyBuffer.size() >= batchSize) { doBatchWrite(); clearBuffer(); } } + private String getKey(SeaTunnelRow element, List fields) { + String key = redisParameters.getKeyField(); + Boolean supportCustomKey = redisParameters.getSupportCustomKey(); + if (Boolean.TRUE.equals(supportCustomKey)) { + return getCustomKey(element, fields, key); + } + return getNormalKey(element, fields, key); + } + + private static String getNormalKey(SeaTunnelRow element, List fields, String keyField) { + if (fields.contains(keyField)) { + return element.getField(fields.indexOf(keyField)).toString(); + } else { + return keyField; + } + } + + private String getCustomKey(SeaTunnelRow element, List fields, String keyField) { + String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER); + StringBuilder key = new StringBuilder(); + for (int i = 0; i < keyFieldSegments.length; i++) { + String keyFieldSegment = keyFieldSegments[i]; + if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER) + && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) { + String realKeyField = keyFieldSegment.substring(1, keyFieldSegment.length() - 1); + if (fields.contains(realKeyField)) { + key.append(element.getField(fields.indexOf(realKeyField)).toString()); + } else { + key.append(keyFieldSegment); + } + } else { + key.append(keyFieldSegment); + } + if (i != keyFieldSegments.length - 1) { + key.append(REDIS_GROUP_DELIMITER); + } + } + return key.toString(); + } + + private String getValue(SeaTunnelRow element, List fields) { + String value; + RedisDataType redisDataType = redisParameters.getRedisDataType(); + if (RedisDataType.HASH.equals(redisDataType)) { + value = handleHashType(element, fields); + } else { + value = handleOtherTypes(element, fields); + } + if (value == null) { + byte[] serialize = serializationSchema.serialize(element); + value = new String(serialize); + } + return value; + } + + private String handleHashType(SeaTunnelRow element, List fields) { + String hashKeyField = redisParameters.getHashKeyField(); + String hashValueField = redisParameters.getHashValueField(); + if (StringUtils.isEmpty(hashKeyField)) { + return null; + } + String hashKey; + if (fields.contains(hashKeyField)) { + hashKey = element.getField(fields.indexOf(hashKeyField)).toString(); + } else { + hashKey = hashKeyField; + } + String hashValue; + if (StringUtils.isEmpty(hashValueField)) { + hashValue = new String(serializationSchema.serialize(element)); + } else { + if (fields.contains(hashValueField)) { + hashValue = element.getField(fields.indexOf(hashValueField)).toString(); + } else { + hashValue = hashValueField; + } + } + Map kvMap = new HashMap<>(); + kvMap.put(hashKey, hashValue); + return JsonUtils.toJsonString(kvMap); + } + + private String handleOtherTypes(SeaTunnelRow element, List fields) { + String valueField = redisParameters.getValueField(); + if (StringUtils.isEmpty(valueField)) { + return null; + } + if (fields.contains(valueField)) { + return element.getField(fields.indexOf(valueField)).toString(); + } + return valueField; + } + private void clearBuffer() { keyBuffer.clear(); valueBuffer.clear(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java index 60bffba6f42..0f67575ea4e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java @@ -357,5 +357,90 @@ public void testMultipletableRedisSink(TestContainer container) jedis.select(0); } + @TestTemplate + public void testCustomKeyWriteRedis(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis-custom-key.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + int count = 0; + for (int i = 0; i < 100; i++) { + String data = jedis.get("custom-key-check:" + i); + if (data != null) { + count++; + } + } + Assertions.assertEquals(100, count); + for (int i = 0; i < 100; i++) { + jedis.del("custom-key-check:" + i); + } + } + + @TestTemplate + public void testCustomValueForStringWriteRedis(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/redis-to-redis-custom-value-for-key.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + int count = 0; + for (int i = 0; i < 100; i++) { + String data = jedis.get("custom-value-check:" + i); + if (data != null) { + Assertions.assertEquals("string", data); + count++; + } + } + Assertions.assertEquals(100, count); + for (int i = 0; i < 100; i++) { + jedis.del("custom-value-check:" + i); + } + } + + @TestTemplate + public void testCustomValueForListWriteRedis(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/redis-to-redis-custom-value-for-list.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List list = jedis.lrange("custom-value-check-list", 0, -1); + Assertions.assertEquals(100, list.size()); + jedis.del("custom-value-check-list"); + } + + @TestTemplate + public void testCustomValueForSetWriteRedis(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/redis-to-redis-custom-value-for-set.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + long amount = jedis.scard("custom-value-check-set"); + Assertions.assertEquals(100, amount); + jedis.del("custom-value-check-set"); + } + + @TestTemplate + public void testCustomValueForZSetWriteRedis(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/redis-to-redis-custom-value-for-zset.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + long amount = jedis.zcard("custom-value-check-zset"); + Assertions.assertEquals(100, amount); + jedis.del("custom-value-check-zset"); + } + + @TestTemplate + public void testCustomHashKeyAndValueWriteRedis(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/redis-to-redis-custom-hash-key-and-value.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + long amount = jedis.hlen("custom-hash-check"); + Assertions.assertEquals(100, amount); + for (int i = 0; i < 100; i++) { + Assertions.assertEquals("string", jedis.hget("custom-hash-check", String.valueOf(i))); + } + jedis.del("custom-hash-check"); + } + public abstract RedisContainerInfo getRedisContainerInfo(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf new file mode 100644 index 00000000000..ecdbbaccea7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = string + batch_size = 33 + format = "json" + schema = { + table = "RedisDatabase.RedisTable" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "c_map" + type = "map" + }, + { + name = "c_array" + type = "array" + }, + { + name = "c_string" + type = "string" + }, + { + name = "c_boolean" + type = "boolean" + }, + { + name = "c_tinyint" + type = "tinyint" + }, + { + name = "c_smallint" + type = "smallint" + }, + { + name = "c_int" + type = "int" + }, + { + name = "c_bigint" + type = "bigint" + }, + { + name = "c_float" + type = "float" + }, + { + name = "c_double" + type = "double" + }, + { + name = "c_decimal" + type = "decimal(2,1)" + }, + { + name = "c_bytes" + type = "bytes" + }, + { + name = "c_date" + type = "date" + }, + { + name = "c_timestamp" + type = "timestamp" + } + ] + } + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "custom-hash-check" + hash_key_field = "id" + hash_value_field = "c_string" + data_type = hash + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf new file mode 100644 index 00000000000..aa1171546f9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = string + batch_size = 33 + format = "json" + schema = { + table = "RedisDatabase.RedisTable" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "c_map" + type = "map" + }, + { + name = "c_array" + type = "array" + }, + { + name = "c_string" + type = "string" + }, + { + name = "c_boolean" + type = "boolean" + }, + { + name = "c_tinyint" + type = "tinyint" + }, + { + name = "c_smallint" + type = "smallint" + }, + { + name = "c_int" + type = "int" + }, + { + name = "c_bigint" + type = "bigint" + }, + { + name = "c_float" + type = "float" + }, + { + name = "c_double" + type = "double" + }, + { + name = "c_decimal" + type = "decimal(2,1)" + }, + { + name = "c_bytes" + type = "bytes" + }, + { + name = "c_date" + type = "date" + }, + { + name = "c_timestamp" + type = "timestamp" + } + ] + } + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "custom-key-check:{id}" + support_custom_key = true + data_type = key + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf new file mode 100644 index 00000000000..05dfa510407 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = string + batch_size = 33 + format = "json" + schema = { + table = "RedisDatabase.RedisTable" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "c_map" + type = "map" + }, + { + name = "c_array" + type = "array" + }, + { + name = "c_string" + type = "string" + }, + { + name = "c_boolean" + type = "boolean" + }, + { + name = "c_tinyint" + type = "tinyint" + }, + { + name = "c_smallint" + type = "smallint" + }, + { + name = "c_int" + type = "int" + }, + { + name = "c_bigint" + type = "bigint" + }, + { + name = "c_float" + type = "float" + }, + { + name = "c_double" + type = "double" + }, + { + name = "c_decimal" + type = "decimal(2,1)" + }, + { + name = "c_bytes" + type = "bytes" + }, + { + name = "c_date" + type = "date" + }, + { + name = "c_timestamp" + type = "timestamp" + } + ] + } + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "custom-value-check:{id}" + support_custom_key = true + value_field = "c_string" + data_type = key + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf new file mode 100644 index 00000000000..260d7b010be --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = string + batch_size = 33 + format = "json" + schema = { + table = "RedisDatabase.RedisTable" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "c_map" + type = "map" + }, + { + name = "c_array" + type = "array" + }, + { + name = "c_string" + type = "string" + }, + { + name = "c_boolean" + type = "boolean" + }, + { + name = "c_tinyint" + type = "tinyint" + }, + { + name = "c_smallint" + type = "smallint" + }, + { + name = "c_int" + type = "int" + }, + { + name = "c_bigint" + type = "bigint" + }, + { + name = "c_float" + type = "float" + }, + { + name = "c_double" + type = "double" + }, + { + name = "c_decimal" + type = "decimal(2,1)" + }, + { + name = "c_bytes" + type = "bytes" + }, + { + name = "c_date" + type = "date" + }, + { + name = "c_timestamp" + type = "timestamp" + } + ] + } + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "custom-value-check-list" + value_field = "c_string" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf new file mode 100644 index 00000000000..28acbd70ca3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = string + batch_size = 33 + format = "json" + schema = { + table = "RedisDatabase.RedisTable" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "c_map" + type = "map" + }, + { + name = "c_array" + type = "array" + }, + { + name = "c_string" + type = "string" + }, + { + name = "c_boolean" + type = "boolean" + }, + { + name = "c_tinyint" + type = "tinyint" + }, + { + name = "c_smallint" + type = "smallint" + }, + { + name = "c_int" + type = "int" + }, + { + name = "c_bigint" + type = "bigint" + }, + { + name = "c_float" + type = "float" + }, + { + name = "c_double" + type = "double" + }, + { + name = "c_decimal" + type = "decimal(2,1)" + }, + { + name = "c_bytes" + type = "bytes" + }, + { + name = "c_date" + type = "date" + }, + { + name = "c_timestamp" + type = "timestamp" + } + ] + } + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "custom-value-check-set" + value_field = "id" + data_type = set + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf new file mode 100644 index 00000000000..b862127c91b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = string + batch_size = 33 + format = "json" + schema = { + table = "RedisDatabase.RedisTable" + columns = [ + { + name = "id" + type = "bigint" + }, + { + name = "c_map" + type = "map" + }, + { + name = "c_array" + type = "array" + }, + { + name = "c_string" + type = "string" + }, + { + name = "c_boolean" + type = "boolean" + }, + { + name = "c_tinyint" + type = "tinyint" + }, + { + name = "c_smallint" + type = "smallint" + }, + { + name = "c_int" + type = "int" + }, + { + name = "c_bigint" + type = "bigint" + }, + { + name = "c_float" + type = "float" + }, + { + name = "c_double" + type = "double" + }, + { + name = "c_decimal" + type = "decimal(2,1)" + }, + { + name = "c_bytes" + type = "bytes" + }, + { + name = "c_date" + type = "date" + }, + { + name = "c_timestamp" + type = "timestamp" + } + ] + } + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "custom-value-check-zset" + value_field = "id" + data_type = zset + batch_size = 33 + } +} \ No newline at end of file