Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple entries of 'source' with 'list_to_map_when' condition in list_to_map processor #5407

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ListToMapProcessor extends AbstractProcessor<Record<Event>, Record<
private final ListToMapProcessorConfig config;

private final ExpressionEvaluator expressionEvaluator;
private final List<ListToMapProcessorConfig.Entry> entries;

@DataPrepperPluginConstructor
public ListToMapProcessor(final PluginMetrics pluginMetrics, final ListToMapProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -46,95 +47,123 @@ public ListToMapProcessor(final PluginMetrics pluginMetrics, final ListToMapProc
String.format("list_to_map_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
config.getListToMapWhen()));
}

if (config.getSource() != null || config.getListToMapWhen() != null) {
ListToMapProcessorConfig.Entry entry = new ListToMapProcessorConfig.Entry(
config.getSource(),
config.getTarget(),
config.getKey(),
config.getValueKey(),
config.getUseSourceKey(),
config.getExtractValue(),
config.getFlatten(),
config.getFlattenedElement(),
config.getTagsOnFailure(),
config.getListToMapWhen()
);
entries = List.of(entry);
}
else{
entries = config.getEntries();
}

for (ListToMapProcessorConfig.Entry entry : entries) {
if (entry.getListToMapWhen() != null && !expressionEvaluator.isValidExpressionStatement(entry.getListToMapWhen())) {
throw new InvalidPluginConfigurationException(
String.format("list_to_map_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getListToMapWhen()));
}
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();
for (ListToMapProcessorConfig.Entry entry : entries) {
try {

try {

if (Objects.nonNull(config.getListToMapWhen()) && !expressionEvaluator.evaluateConditional(config.getListToMapWhen(), recordEvent)) {
continue;
}
if (Objects.nonNull(entry.getListToMapWhen()) && !expressionEvaluator.evaluateConditional(entry.getListToMapWhen(), recordEvent)) {
continue;
}

final List<Map<String, Object>> sourceList;
try {
sourceList = recordEvent.get(config.getSource(), List.class);
} catch (final Exception e) {
LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]",
config.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}
final List<Map<String, Object>> sourceList;
try {
sourceList = recordEvent.get(entry.getSource(), List.class);
} catch (final Exception e) {
LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]",
entry.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(entry.getTagsOnFailure());
continue;
}

final Map<String, Object> targetMap;
try {
targetMap = constructTargetMap(sourceList);
} catch (final IllegalArgumentException e) {
LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]",
config.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error converting source list to map on record [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}
final Map<String, Object> targetMap;
try {
targetMap = constructTargetMap(sourceList, entry);
} catch (final IllegalArgumentException e) {
LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]",
entry.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(entry.getTagsOnFailure());
continue;
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error converting source list to map on record [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(entry.getTagsOnFailure());
continue;
}

try {
updateEvent(recordEvent, targetMap);
try {
updateEvent(recordEvent, targetMap, entry.getTarget());
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error updating record [{}] after converting source list to map")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(entry.getTagsOnFailure());
}
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error updating record [{}] after converting source list to map")
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
recordEvent.getMetadata().addTags(entry.getTagsOnFailure());
}
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

private Map<String, Object> constructTargetMap(final List<Map<String, Object>> sourceList) {
private Map<String, Object> constructTargetMap(final List<Map<String, Object>> sourceList, ListToMapProcessorConfig.Entry entry) {
Map<String, Object> targetMap = new HashMap<>();
for (final Map<String, Object> itemMap : sourceList) {

if (config.getUseSourceKey()) {
if (config.getFlatten()) {
if (entry.getUseSourceKey()) {
if (entry.getFlatten()) {
for (final String entryKey : itemMap.keySet()) {
setTargetMapFlattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue());
setTargetMapFlattened(targetMap, itemMap, entryKey, entryKey, entry.getExtractValue(), entry.getFlattenedElement());
}
} else {
for (final String entryKey : itemMap.keySet()) {
setTargetMapUnflattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue());
setTargetMapUnflattened(targetMap, itemMap, entryKey, entryKey, entry.getExtractValue());
}
}
} else {
final String itemKey = (String) itemMap.get(config.getKey());
if (config.getFlatten()) {
setTargetMapFlattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null);
final String itemKey = (String) itemMap.get(entry.getKey());
if (entry.getFlatten()) {
setTargetMapFlattened(targetMap, itemMap, itemKey, entry.getValueKey(), entry.getValueKey() != null, entry.getFlattenedElement());
} else {
setTargetMapUnflattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null);
setTargetMapUnflattened(targetMap, itemMap, itemKey, entry.getValueKey(), entry.getValueKey() != null);
}
}
}
Expand All @@ -157,8 +186,9 @@ private void setTargetMapUnflattened(
}

private void setTargetMapFlattened(
final Map<String, Object> targetMap, final Map<String, Object> itemMap, final String itemKey, final String itemValueKey, final boolean doExtractValue) {
if (!targetMap.containsKey(itemKey) || config.getFlattenedElement() == ListToMapProcessorConfig.FlattenedElement.LAST) {
final Map<String, Object> targetMap, final Map<String, Object> itemMap, final String itemKey, final String itemValueKey,
final boolean doExtractValue, final ListToMapProcessorConfig.FlattenedElement flattenedElement) {
if (!targetMap.containsKey(itemKey) || flattenedElement == ListToMapProcessorConfig.FlattenedElement.LAST) {
if (doExtractValue) {
targetMap.put(itemKey, itemMap.get(itemValueKey));
} else {
Expand All @@ -167,14 +197,14 @@ private void setTargetMapFlattened(
}
}

private void updateEvent(final Event recordEvent, final Map<String, Object> targetMap) {
final boolean doWriteToRoot = Objects.isNull(config.getTarget());
private void updateEvent(final Event recordEvent, final Map<String, Object> targetMap, final String target) {
final boolean doWriteToRoot = Objects.isNull(target);
if (doWriteToRoot) {
for (final Map.Entry<String, Object> entry : targetMap.entrySet()) {
recordEvent.put(entry.getKey(), entry.getValue());
}
} else {
recordEvent.put(config.getTarget(), targetMap);
recordEvent.put(target, targetMap);
}
}

Expand Down
Loading
Loading