Skip to content

Commit

Permalink
Returns 536 update validation for sams updated payload (#15)
Browse files Browse the repository at this point in the history
* Updated event payload schema for V0.4 with latest changes

* Adding support for draft-07 JSON schema

* Added validation for payloadVersion V0.4

* Exception handling for schema validation

* Removed the process of breaking down of pipeline
  • Loading branch information
Pallavi Salunkhe authored and micaelcarlstedt committed Nov 7, 2019
1 parent 7e41641 commit 308733c
Show file tree
Hide file tree
Showing 5 changed files with 2,288 additions and 268 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/ikea/isx/common/ErrorConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ public class ErrorConstants {
public static final String RAW_EVENT_PERSIST_EXCEPTION =
"error while persisting raw event";

public static final String EMPTY_EVENT_HEADER = "event header can not be empty";

public static final String INVALID_EVENT_PAYLOAD = "event payload is invalid";

public static final String INVALID_EVENT_HEADER = "event header is invalid";



}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public Pipeline createDataPipeline(String[] args) {
PCollection<EventPayload> validatedMessagePayload =
validatedMessagePayloadTuple.get(PUBSUB_MSG_VALIDATION_SUCCESS_TAG);


PCollectionTuple rawEventPersistDataTuple =
validatedMessagePayload.apply(
"Persist Raw Event Payload through raw API",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.ikea.isx.common.ErrorConstants;
import com.ikea.isx.entities.EventPayload;
import com.ikea.isx.exception.DataPipelineException;
import com.ikea.isx.gcp.service.dataflow.pipeline.DataflowPipelineBuilder;
import com.ikea.isx.gcp.service.dataflow.pipeline.FailureMetaData;
import com.ikea.isx.util.CommonUtil;
Expand All @@ -17,44 +18,64 @@
@Slf4j
public class EventPayloadMessageHeaderValidation extends DoFn<PubsubMessage, EventPayload> {

private final String className = this.getClass().getSimpleName();
private final String className = this.getClass().getSimpleName();


@DoFn.ProcessElement
public void eventPayloadHeaderValidation(
@DoFn.Element PubsubMessage pubsubMessage, ProcessContext processContext) {
@DoFn.ProcessElement
public void eventPayloadHeaderValidation(
@DoFn.Element PubsubMessage pubsubMessage, ProcessContext processContext) {

// Read the event headers data and event payload
Map<String, String> eventPayloadHeader = pubsubMessage.getAttributeMap();
String eventPayloadMessage = new String(pubsubMessage.getPayload());
// Read the event headers data and event payload
Map<String, String> eventPayloadHeader = pubsubMessage.getAttributeMap();
String eventPayloadMessage = new String(pubsubMessage.getPayload());

if (StringUtils.isEmpty(eventPayloadMessage)) {
CommonUtil.setDataValidationFailureResponse(
className, ErrorConstants.EMPTY_EVENT_PAYLOAD, null, processContext);
return;
if (StringUtils.isEmpty(eventPayloadMessage)) {
CommonUtil.setDataValidationFailureResponse(
className, ErrorConstants.EMPTY_EVENT_PAYLOAD, null, processContext);
return;
}
log.info("Received the Payload from topic :{}", eventPayloadMessage);
String payloadMsgSize = CommonUtil.displaySizeByByteCount(eventPayloadMessage.length());
log.info("Received the Payload from topic Payload Size: {}", payloadMsgSize);
log.debug("Validating event header");
EventPayload eventPayload = new EventPayload();
eventPayload.setHeader(eventPayloadHeader);
eventPayload.setPayload(eventPayloadMessage);

try {
EventsSchemaValidation eventsSchemaValidation = new EventsSchemaValidation(processContext);

// Event Header validation
if (eventPayloadHeader != null) {
boolean validationResult = eventsSchemaValidation.headerSchemaValidation(eventPayloadHeader);
if (validationResult == false) {
throw new DataPipelineException(ErrorConstants.INVALID_EVENT_HEADER);
}
log.info("Received the Payload from topic :{}", eventPayloadMessage);
String payloadMsgSize = CommonUtil.displaySizeByByteCount(eventPayloadMessage.length());
log.info("Received the Payload from topic Payload Size: {}", payloadMsgSize);
log.debug("Validating event header");
EventPayload eventPayload = new EventPayload();
eventPayload.setHeader(eventPayloadHeader);
eventPayload.setPayload(eventPayloadMessage);

try {
EventsSchemaValidation eventsSchemaValidation = new EventsSchemaValidation(processContext);
if (eventPayloadHeader != null) {
eventsSchemaValidation.headerSchemaValidation(eventPayloadHeader);
}
eventsSchemaValidation.eventPayloadSchemaValidation(eventPayloadMessage);

processContext.output(
DataflowPipelineBuilder.PUBSUB_MSG_HEADER_VALIDATION_SUCCESS_TAG, eventPayload);
} catch (Exception e) {
log.error("Exception Caught: " + e.getLocalizedMessage(), e.getCause());
FailureMetaData failureMetaData =
CommonUtil.getExceptionFailureResponse(className, eventPayload.toString(), e);
processContext.output(DataflowPipelineBuilder.FAILURE_TAG, failureMetaData);
} else {
throw new DataPipelineException(
ErrorConstants.EMPTY_EVENT_HEADER
);
}

// Event Payload validation
if (eventPayloadMessage != null) {
boolean validationResult = eventsSchemaValidation.eventPayloadSchemaValidation(eventPayloadMessage);
if (validationResult == false) {
throw new DataPipelineException(ErrorConstants.INVALID_EVENT_PAYLOAD);
}
}
} else {
throw new DataPipelineException(
ErrorConstants.EMPTY_EVENT_PAYLOAD
);
}

processContext.output(
DataflowPipelineBuilder.PUBSUB_MSG_HEADER_VALIDATION_SUCCESS_TAG, eventPayload);
} catch (Exception e) {
log.error("Exception Caught: " + e.getLocalizedMessage(), e.getCause());
FailureMetaData failureMetaData =
CommonUtil.getExceptionFailureResponse(className, eventPayload.toString(), e);
processContext.output(DataflowPipelineBuilder.FAILURE_TAG, failureMetaData);
}
}
}
67 changes: 35 additions & 32 deletions src/main/java/com/ikea/isx/util/EventsSchemaValidation.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,52 @@
@Slf4j
public class EventsSchemaValidation {

private JsonSchemaFactory factory;
private JsonSchema schema = null;
private ProcessingReport report; // We will use it in future
private DoFn.ProcessContext processContext; // We will use it in future
private JsonSchemaFactory factory;
private JsonSchema schema = null;
private ProcessingReport report;
private DoFn.ProcessContext processContext; // We will use it in future

public EventsSchemaValidation(DoFn.ProcessContext processContext) {
this.processContext = processContext;
}
public EventsSchemaValidation(DoFn.ProcessContext processContext) {
this.processContext = processContext;
}

public void headerSchemaValidation(Map<String, String> headerData) {
public boolean headerSchemaValidation(Map<String, String> headerData) {

ObjectMapper mapper = new ObjectMapper();
try {
JsonNode eventsHeaderSchema = Resource.loadResource(Constants.EVENT_HEADER_SCHEMA_FILE_PATH);
JsonNode eventsHeaderData = mapper.valueToTree(headerData);
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode eventsHeaderSchema = Resource.loadResource(Constants.EVENT_HEADER_SCHEMA_FILE_PATH);
JsonNode eventsHeaderData = mapper.valueToTree(headerData);

factory = JsonSchemaFactory.byDefault();
schema = factory.getJsonSchema(eventsHeaderSchema);
report = schema.validate(eventsHeaderData);
factory = JsonSchemaFactory.byDefault();
schema = factory.getJsonSchema(eventsHeaderSchema);
report = schema.validate(eventsHeaderData);

log.info("header schema validation result: " + report);
log.info("header schema validation result: " + report);
return report.isSuccess();

} catch (IOException | ProcessingException e) {
log.error("Exception Caught:" + e.getLocalizedMessage(), e.getCause());
}
} catch (IOException | ProcessingException e) {
log.error("Exception Caught:" + e.getLocalizedMessage(), e.getCause());
return false;
}
}


public void eventPayloadSchemaValidation(String eventPayload) {

ObjectMapper mapper = new ObjectMapper();
try {
public boolean eventPayloadSchemaValidation(String eventPayload) {

JsonNode eventsSchema = Resource.loadResource(Constants.EVENT_PAYLOAD_SCHEMA_FILE_PATH);
JsonNode eventspayload = mapper.readTree(eventPayload);
factory = JsonSchemaFactory.byDefault();
schema = factory.getJsonSchema(eventsSchema);
report = schema.validate(eventspayload);
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode eventsSchema = Resource.loadResource(Constants.EVENT_PAYLOAD_SCHEMA_FILE_PATH);
JsonNode eventspayload = mapper.readTree(eventPayload);
factory = JsonSchemaFactory.byDefault();
schema = factory.getJsonSchema(eventsSchema);
report = schema.validate(eventspayload);

log.info("payload schema validation result: " + report);
} catch (IOException | ProcessingException e) {
log.error("Exception Caught:" + e.getLocalizedMessage(), e.getCause());
}
log.info("payload schema validation result: " + report);
return report.isSuccess();
} catch (IOException | ProcessingException e) {
log.error("Exception Caught:" + e.getLocalizedMessage(), e.getCause());
return false;
}
}

}
Loading

0 comments on commit 308733c

Please sign in to comment.