Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
bobeal committed Nov 7, 2024
2 parents c74b8cd + 6431b64 commit 8d3d25e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 37 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ When set to `true` a single email with grouped errors will be sent. When set to
## Todo

- Optional inclusion of FlowFile contents.
- Add batching support in email provenance reporter.
- Add testing.

## License
Expand Down
2 changes: 1 addition & 1 deletion nifi-provenance-reporting-tasks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.2</version>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Tags({"elasticsearch", "provenance"})
@CapabilityDescription("A provenance reporting task that writes to Elasticsearch")
public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter {

static final List<String> DEFAULT_PROCESSORS_TYPES_ALLOWLIST = Arrays.asList(
"DeleteSFTP", "ExecuteSQLRecord", "ExtendedValidateCsv", "FetchFTP",
"FetchSFTP", "FetchSmb", "GenerateFlowFile", "GetFTP", "GetSFTP", "GetSmbFile", "InvokeHTTP", "ListenFTP",
"ListFTP", "ListSFTP", "ListSmb", "PutFTP", "PutSFTP", "PutSmbFile"
);

public static final PropertyDescriptor ELASTICSEARCH_URL = new PropertyDescriptor
.Builder().name("Elasticsearch URL")
.displayName("Elasticsearch URL")
Expand All @@ -62,6 +70,15 @@ public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor PROCESSORS_TYPES_ALLOWLIST = new PropertyDescriptor
.Builder().name("Processors Types Allowlist")
.displayName("Processors Types Allowlist")
.description("Specifies a comma-separated list of processors types for which all provenance events "
+ "will be sent. If the processor type is not in the list, only error events will be sent.")
.defaultValue(String.join(",", DEFAULT_PROCESSORS_TYPES_ALLOWLIST))
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.NON_BLANK_VALIDATOR))
.build();

private final Map<String, ElasticsearchClient> esClients = new HashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();

Expand All @@ -85,55 +102,73 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors = super.getSupportedPropertyDescriptors();
descriptors.add(ELASTICSEARCH_URL);
descriptors.add(ELASTICSEARCH_INDEX);
descriptors.add(PROCESSORS_TYPES_ALLOWLIST);
return descriptors;
}

public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
final List<String> processorTypesAllowlist =
Arrays.asList(context.getProperty(PROCESSORS_TYPES_ALLOWLIST).getValue().split(","));

events.forEach(event -> {
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
if(!event.containsKey("component_type")) {
getLogger().warn("Provenance event has no component type, ignoring");
return;
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);

final String componentType = event.get("component_type").toString();
final String status = event.get("status").toString();

if(processorTypesAllowlist.contains(componentType)|| status.equals("Error")) {

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_id", event.get("event_id"));
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}

}


});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private String composeMessageContent(final Map<String, Object> event, Boolean gr
.append("\tProcessor type: ").append(event.get("component_type")).append("\n")
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n");

if (groupSimilarErrors) {
if (groupSimilarErrors && groupedEventsSize > 1) {
message.append("\tTotal similar errors : ").append(groupedEventsSize).append("\n");
}

Expand Down Expand Up @@ -417,7 +417,7 @@ public void sendErrorEmail(Map<String, Object> event, ReportingContext context,
emailSubjectBuilder.append("[").append(subjectPrefix).append("] ");
}

if (groupSimilarErrors) {
if (groupSimilarErrors && groupedEventsSize > 1) {
emailSubjectBuilder.append(groupedEventsSize).append(" errors occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));
Expand Down

0 comments on commit 8d3d25e

Please sign in to comment.