Skip to content

Commit

Permalink
Merge pull request #4 from sowmya-dixit/secor-0.25
Browse files Browse the repository at this point in the history
Fix secor process failure for empty message
  • Loading branch information
sowmya-dixit authored Dec 6, 2019
2 parents 40ea0ef + 665f711 commit ef8b199
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected boolean consumeNextMessage() {
if (mUnparsableMessages > MAX_UNPARSABLE_MESSAGES) {
throw new RuntimeException("Failed to parse message " + rawMessage, e);
}
LOG.warn("Failed to parse message {}", rawMessage, e);
LOG.warn("Consumer: Failed to parse message {}", rawMessage, e);
}

if (parsedMessage != null) {
Expand All @@ -163,7 +163,7 @@ protected boolean consumeNextMessage() {
mMetricCollector.metric("consumer.message_size_bytes", rawMessage.getPayload().length, rawMessage.getTopic());
mMetricCollector.increment("consumer.throughput_bytes", rawMessage.getPayload().length, rawMessage.getTopic());
} catch (Exception e) {
throw new RuntimeException("Failed to write message " + parsedMessage, e);
throw new RuntimeException("Consumer: Failed to write message " + parsedMessage, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,43 +73,53 @@ public PatternDateMessageParser(SecorConfig config) {
@Override
public String[] extractPartitions(Message message) {

JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload());
boolean prefixEnabled = mConfig.isPartitionPrefixEnabled();
String result[] = { prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate };
if (jsonObject != null) {
Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName());
if (fieldValue == null)
fieldValue = jsonObject.get(mConfig.getFallbackMessageTimestampName());
if (fieldValue == null)
fieldValue = System.currentTimeMillis();
String result[] = {prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate};
try {
JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload());
if (jsonObject != null) {
Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName());
if (fieldValue == null)
fieldValue = jsonObject.get(mConfig.getFallbackMessageTimestampName());
if (fieldValue == null)
fieldValue = System.currentTimeMillis();

Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier());
Object inputPattern = mConfig.getMessageTimestampInputPattern();
if (inputPattern != null) {
try {
Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier());
Object inputPattern = mConfig.getMessageTimestampInputPattern();
if (inputPattern != null) {
try {
/*
SimpleDateFormat outputFormatter = new SimpleDateFormat(
StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter));
*/
Date dateFormat;
if (fieldValue instanceof Number) {
dateFormat = new Date(((Number) fieldValue).longValue());
} else {
SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString());
inputFormatter.setTimeZone(messageTimeZone);
dateFormat = inputFormatter.parse(fieldValue.toString());
Date dateFormat;
if (fieldValue instanceof Number) {
dateFormat = new Date(((Number) fieldValue).longValue());
} else {
SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString());
inputFormatter.setTimeZone(messageTimeZone);
dateFormat = inputFormatter.parse(fieldValue.toString());
}
result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + outputFormatter.format(dateFormat)
: outputFormatter.format(dateFormat);
return result;
} catch (Exception e) {
e.printStackTrace();
LOG.info("PatternDateMessageParser: Exception while parsing date " + e.getMessage());
LOG.warn("Unable to get path: " + e.getMessage());
}
result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + outputFormatter.format(dateFormat)
: outputFormatter.format(dateFormat);
return result;
} catch (Exception e) {
e.printStackTrace();
LOG.warn("Unable to get path: " + e.getMessage());
}
} else {
LOG.info("PatternDateMessageParser: Unable to parse json object " + jsonObject);
}
return result;
}
catch (Exception e) {
e.printStackTrace();
LOG.info("PatternDateMessageParser: Exception while parsing date " + e.getMessage());
return result;
}

return result;
}

private String getPrefix(String prefixIdentifier) {
Expand Down

0 comments on commit ef8b199

Please sign in to comment.