Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested partition keys #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ second, which would look like the following:
{ "myConstant": "Hello, World!", "randomNumber": 33}
```

Partition keys can also be nested in the json as long as they are not nested in an array eg.

```yaml
topic: testTopic
partitionKey: "nested.id"
fields:
nested:
type: json
components:
id: "id123"
```

Will result in "id123" being used as the partition key.

You can find a full example of config file in [this file](https://github.com/redBorder/synthetic-producer/blob/master/configProducer.yml)

## Fields types
Expand Down
36 changes: 32 additions & 4 deletions src/main/java/net/redborder/utils/scheduler/StandardScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class StandardScheduler implements Scheduler {
Expand Down Expand Up @@ -68,13 +67,42 @@ public void run() {
while (!currentThread().isInterrupted()) {
rateLimiter.acquire();
Map<String, Object> message = generator.generate();
Object partitionKeyObject = message.get(producer.getPartitionKey());
String partitionKey = null;
if(partitionKeyObject != null) partitionKey = partitionKeyObject.toString();
String partitionKey = extractPartitionKey(message);
String json = gson.toJson(message);
producer.send(json, partitionKey);
messages.mark();
}
}
}

String extractPartitionKey(final Map<String, Object> message) {
try {
// No partition key field results in null
final String partitionKey = producer.getPartitionKey();
if (partitionKey == null) {
return null;
}

// Try to get the key from the object, then treat it as a nested path
Object partitionKeyObject = message.get(partitionKey);
if (partitionKeyObject != null) {
return partitionKeyObject.toString();
} else {
final String[] keys = partitionKey.split("\\.");
Map currentMessage = message;
for (final String key : keys) {
partitionKeyObject = currentMessage.get(key);
if (partitionKeyObject == null) {
break;
} else if (partitionKeyObject instanceof Map) {
currentMessage = (Map) partitionKeyObject;
}
}
return partitionKeyObject.toString();
}
} catch (final Exception e) {
log.warn(e.getMessage(), e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package net.redborder.utils.scheduler;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import java.util.HashMap;
import java.util.Map;
import net.redborder.utils.producers.IProducer;
import org.junit.Before;
import org.junit.Test;

public class StandardSchedulerTest {

private Map<String, Object> message;

private IProducer producer;

private StandardScheduler standardScheduler;

@Before
@SuppressWarnings("unchecked")
public void before() throws Exception {

message = new HashMap<>();
message.put("id", "123");
message.put("nested", new HashMap<String, Object>());
((Map<String, Object>) message.get("nested")).put("id", "345");

producer = mock(IProducer.class);

standardScheduler = new StandardScheduler(null, producer, 1D, 1);
}

@Test
public void extractPartitionKey() {
when(producer.getPartitionKey()).thenReturn("id");
String result = standardScheduler.extractPartitionKey(message);
assertEquals("123", result);

when(producer.getPartitionKey()).thenReturn("nested.id");
result = standardScheduler.extractPartitionKey(message);
assertEquals("345", result);
}
}