Skip to content

Commit

Permalink
Add ability to rename partition field names to custom ones using "par…
Browse files Browse the repository at this point in the history
…tition.field.rename" property
  • Loading branch information
Ildar Almakaev committed May 20, 2021
1 parent 9b404e5 commit 1c97ac8
Showing 1 changed file with 91 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,100 +21,111 @@
import io.confluent.connect.storage.partitioner.PartitionerConfig;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.storage.util.DataUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Locale;
import java.util.Map;

public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {

public static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path";
public static final String PARTITION_FIELD_FORMAT_PATH_DOC = "Whether directory labels should be included when partitioning for custom fields e.g. " +
"whether this 'orgId=XXXX/appId=ZZZZ/customField=YYYY' or this 'XXXX/ZZZZ/YYYY'.";
public static final String PARTITION_FIELD_FORMAT_PATH_DISPLAY = "Partition Field Format Path";
public static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private PartitionFieldExtractor partitionFieldExtractor;

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
super.init(partitionDurationMs, pathFormat, locale, timeZone, config);

final List<String> fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
final boolean formatPath = (Boolean) config.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT);

this.partitionFieldExtractor = new PartitionFieldExtractor(fieldNames, formatPath);
}

public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) {
final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis);
final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

log.info("Encoded partition : {}", partition);

return partition;
public static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path";
public static final String PARTITION_FIELD_RENAME = "partition.field.rename";
public static final String PARTITION_FIELD_FORMAT_PATH_DOC =
"Whether directory labels should be included when partitioning for custom fields e.g. " +
"whether this 'orgId=XXXX/appId=ZZZZ/customField=YYYY' or this 'XXXX/ZZZZ/YYYY'.";
public static final String PARTITION_FIELD_FORMAT_PATH_DISPLAY = "Partition Field Format Path";
public static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private PartitionFieldExtractor partitionFieldExtractor;

protected void init(long partitionDurationMs, String pathFormat, Locale locale,
DateTimeZone timeZone, Map<String, Object> config) {
super.init(partitionDurationMs, pathFormat, locale, timeZone, config);

final List<String> fieldNames =
(List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
final boolean formatPath = (Boolean) config
.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT);
String partitionsToRename = (String) config.getOrDefault(PARTITION_FIELD_RENAME, "");

log.info("Partitions fields to rename: {}", partitionsToRename);
Map<String, String> partitionsToRenameMap = partitionsToRename.isEmpty() ?
new HashMap<>()
: Arrays.stream(partitionsToRename.split(","))
.map(s -> s.split(":"))
.collect(Collectors.toMap(k -> k[0], v -> v[1]));

this.partitionFieldExtractor =
new PartitionFieldExtractor(fieldNames, formatPath, partitionsToRenameMap);
}

public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) {
final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis);
final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

log.info("Encoded partition : {}", partition);

return partition;
}

public String encodePartition(final SinkRecord sinkRecord) {
final String partitionsForTimestamp = super.encodePartition(sinkRecord);
final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

log.info("Encoded partition : {}", partition);

return partition;
}

public static class PartitionFieldExtractor {

private static final String DELIMITER_EQ = "=";

private final boolean formatPath;
private final List<String> fieldNames;
private final Map<String, String> partitionsOtherNames;

PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath,
Map<String, String> partitionsOtherNames) {
this.fieldNames = fieldNames;
this.formatPath = formatPath;
this.partitionsOtherNames = partitionsOtherNames;
}

public String encodePartition(final SinkRecord sinkRecord) {
final String partitionsForTimestamp = super.encodePartition(sinkRecord);
final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);
public String extract(final ConnectRecord<?> record) {

log.info("Encoded partition : {}", partition);

return partition;
}
final Object value = record.value();
final StringBuilder builder = new StringBuilder();

public static class PartitionFieldExtractor {

private static final String DELIMITER_EQ = "=";

private final boolean formatPath;
private final List<String> fieldNames;

PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath) {
this.fieldNames = fieldNames;
this.formatPath = formatPath;
log.info("Partitions to rename: {}", partitionsOtherNames);
for (final String fieldName : this.fieldNames) {
if (builder.length() != 0) {
builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
}

public String extract(final ConnectRecord<?> record) {

final Object value = record.value();

final StringBuilder builder = new StringBuilder();

for (final String fieldName : this.fieldNames) {

if (builder.length() != 0) {
builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
}

if (value instanceof Struct || value instanceof Map) {

final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName);

if (formatPath) {
builder.append(String.join(DELIMITER_EQ, fieldName, partitionField));
} else {
builder.append(partitionField);
}

} else {
log.error("Value is not of Struct or Map type.");
throw new PartitionException("Error encoding partition.");
}

}

return builder.toString();

if (value instanceof Struct || value instanceof Map) {
final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName);
String partitionName = partitionsOtherNames.getOrDefault(fieldName, fieldName);
if (formatPath) {
builder.append(String.join(DELIMITER_EQ, partitionName, partitionField));
} else {
builder.append(partitionField);
}
} else {
log.error("Value is not of Struct or Map type.");
throw new PartitionException("Error encoding partition.");
}
}
return builder.toString();
}

}
}

0 comments on commit 1c97ac8

Please sign in to comment.