From 08e14254f44c02ab282212599220c6e3e62dff6e Mon Sep 17 00:00:00 2001 From: Sam O'Brien Date: Mon, 14 Oct 2019 14:59:31 +0100 Subject: [PATCH 1/2] Ability to specify nested values as partition keys --- .../utils/scheduler/StandardScheduler.java | 36 +++++++++++++-- .../scheduler/StandardSchedulerTest.java | 45 +++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 src/test/java/net/redborder/utils/scheduler/StandardSchedulerTest.java diff --git a/src/main/java/net/redborder/utils/scheduler/StandardScheduler.java b/src/main/java/net/redborder/utils/scheduler/StandardScheduler.java index 126b602..92fb83b 100644 --- a/src/main/java/net/redborder/utils/scheduler/StandardScheduler.java +++ b/src/main/java/net/redborder/utils/scheduler/StandardScheduler.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.TimeUnit; public class StandardScheduler implements Scheduler { @@ -68,13 +67,42 @@ public void run() { while (!currentThread().isInterrupted()) { rateLimiter.acquire(); Map message = generator.generate(); - Object partitionKeyObject = message.get(producer.getPartitionKey()); - String partitionKey = null; - if(partitionKeyObject != null) partitionKey = partitionKeyObject.toString(); + String partitionKey = extractPartitionKey(message); String json = gson.toJson(message); producer.send(json, partitionKey); messages.mark(); } } } + + String extractPartitionKey(final Map message) { + try { + // No partition key field results in null + final String partitionKey = producer.getPartitionKey(); + if (partitionKey == null) { + return null; + } + + // Try to get the key from the object, then treat it as a nested path + Object partitionKeyObject = message.get(partitionKey); + if (partitionKeyObject != null) { + return partitionKeyObject.toString(); + } else { + final String[] keys = partitionKey.split("\\."); + Map currentMessage = message; + for (final String key : keys) { + partitionKeyObject = currentMessage.get(key); + if (partitionKeyObject == null) { + break; + } else if (partitionKeyObject instanceof Map) { + currentMessage = (Map) partitionKeyObject; + } + } + return partitionKeyObject.toString(); + } + } catch (final Exception e) { + log.warn(e.getMessage(), e); + return null; + } + } } diff --git a/src/test/java/net/redborder/utils/scheduler/StandardSchedulerTest.java b/src/test/java/net/redborder/utils/scheduler/StandardSchedulerTest.java new file mode 100644 index 0000000..04ffb89 --- /dev/null +++ b/src/test/java/net/redborder/utils/scheduler/StandardSchedulerTest.java @@ -0,0 +1,45 @@ +package net.redborder.utils.scheduler; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.util.HashMap; +import java.util.Map; +import net.redborder.utils.producers.IProducer; +import org.junit.Before; +import org.junit.Test; + +public class StandardSchedulerTest { + + private Map message; + + private IProducer producer; + + private StandardScheduler standardScheduler; + + @Before + @SuppressWarnings("unchecked") + public void before() throws Exception { + + message = new HashMap<>(); + message.put("id", "123"); + message.put("nested", new HashMap()); + ((Map) message.get("nested")).put("id", "345"); + + producer = mock(IProducer.class); + + standardScheduler = new StandardScheduler(null, producer, 1D, 1); + } + + @Test + public void extractPartitionKey() { + when(producer.getPartitionKey()).thenReturn("id"); + String result = standardScheduler.extractPartitionKey(message); + assertEquals("123", result); + + when(producer.getPartitionKey()).thenReturn("nested.id"); + result = standardScheduler.extractPartitionKey(message); + assertEquals("345", result); + } +} From 9a66019e5befd244710500e13cf111874d8f04ad Mon Sep 17 00:00:00 2001 From: Sam O'Brien Date: Mon, 14 Oct 2019 15:06:35 +0100 Subject: [PATCH 2/2] README update --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 7e0066d..841f123 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,20 @@ second, which would look like the following: { "myConstant": "Hello, World!", "randomNumber": 33} ``` +Partition keys can also be nested in the json as long as they are not nested in an array eg. + +```yaml +topic: testTopic +partitionKey: "nested.id" +fields: + nested: + type: json + components: + id: "id123" +``` + +Will result in "id123" being used as the partition key. + You can find a full example of config file in [this file](https://github.com/redBorder/synthetic-producer/blob/master/configProducer.yml) ## Fields types