Skip to content

Commit

Permalink
Issue #SC-961 feat: Include message and output timezone for Channel a…
Browse files Browse the repository at this point in the history
…nd Date parser
  • Loading branch information
Anand committed Apr 14, 2019
1 parent ddd54f2 commit 4b55db9
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 22 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.pinterest</groupId>
<artifactId>secor</artifactId>
<version>0.24-SNAPSHOT</version>
<version>0.25-SNAPSHOT</version>
<packaging>jar</packaging>
<name>secor</name>
<description>Kafka to s3/gs/swift logs exporter</description>
Expand Down Expand Up @@ -47,8 +47,8 @@
</scm>

<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<parquet.version>1.9.0</parquet.version>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ public TimeZone getTimeZone() {
return Strings.isNullOrEmpty(timezone) ? TimeZone.getTimeZone("UTC") : TimeZone.getTimeZone(timezone);
}

public TimeZone getMessageTimeZone() {
String timezone = getString("secor.message.timezone");
return Strings.isNullOrEmpty(timezone) ? TimeZone.getTimeZone("UTC") : TimeZone.getTimeZone(timezone);
}

public boolean getBoolean(String name, boolean defaultValue) {
return mProperties.getBoolean(name, defaultValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;

import net.minidev.json.JSONObject;
Expand Down Expand Up @@ -56,10 +53,16 @@ public class ChannelDateMessageParser extends MessageParser {
protected static final String defaultFormatter = "yyyy-MM-dd";
private Map<String, String> partitionPrefixMap;
private static final String channelScrubRegex = "[^a-zA-Z0-9._$-]";
private SimpleDateFormat outputFormatter;
private TimeZone messageTimeZone;

public ChannelDateMessageParser(SecorConfig config) {
super(config);
partitionPrefixMap = new HashMap<String, String>();
messageTimeZone = mConfig.getMessageTimeZone();
outputFormatter = new SimpleDateFormat(
StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter));
outputFormatter.setTimeZone(mConfig.getTimeZone());
partitionPrefixMap = new HashMap<>();
String partitionMapping = config.getPartitionPrefixMapping();
if (null != partitionMapping) {
JSONObject jsonObject = (JSONObject) JSONValue.parse(partitionMapping);
Expand All @@ -74,7 +77,7 @@ public String[] extractPartitions(Message message) {

JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload());
boolean prefixEnabled = mConfig.isPartitionPrefixEnabled();
String result[] = { defaultDate };
String result[] = {defaultDate};

if (jsonObject != null) {

Expand All @@ -86,15 +89,18 @@ public String[] extractPartitions(Message message) {

Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier());
Object inputPattern = mConfig.getMessageTimestampInputPattern();
if (fieldValue != null && inputPattern != null) {
if (inputPattern != null) {
try {
/*
SimpleDateFormat outputFormatter = new SimpleDateFormat(
StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter));
Date dateFormat = null;
*/
Date dateFormat;
if (fieldValue instanceof Number) {
dateFormat = new Date(((Number) fieldValue).longValue());
} else {
SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString());
inputFormatter.setTimeZone(messageTimeZone);
dateFormat = inputFormatter.parse(fieldValue.toString());
}

Expand All @@ -107,7 +113,7 @@ public String[] extractPartitions(Message message) {
return result;
} catch (Exception e) {
e.printStackTrace();
LOG.warn("Unable to get path: " + e.getMessage() +" - " + message.getPayload());
LOG.warn("Unable to get path: " + e.getMessage() + " - " + message.getPayload());
}
}
}
Expand All @@ -131,15 +137,15 @@ private String getChannel(JSONObject jsonObject) {
String rawChannelStr = "";
Map<String, Object> dimensions = (HashMap<String, Object>) jsonObject.get("dimensions");
Map<String, Object> context = (HashMap<String, Object>) jsonObject.get("context");

String channel = (String) jsonObject.get("channel");
if (channel != null && !channel.isEmpty()) {
rawChannelStr = channel;
} else if (dimensions != null && dimensions.get("channel") != null) {
rawChannelStr = (String) dimensions.get("channel");
} else if(context != null && context.get("channel") != null){
} else if (context != null && context.get("channel") != null) {
rawChannelStr = (String) context.get("channel");
}else {
} else {
rawChannelStr = "in.ekstep";
}
return rawChannelStr.replaceAll(channelScrubRegex, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package com.pinterest.secor.parser;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;

import net.minidev.json.JSONObject;
Expand Down Expand Up @@ -53,10 +51,16 @@ public class PatternDateMessageParser extends MessageParser {
protected static final String defaultDate = "1970-01-01";
protected static final String defaultFormatter = "yyyy-MM-dd";
private Map<String, String> partitionPrefixMap;
private SimpleDateFormat outputFormatter;
private TimeZone messageTimeZone;

public PatternDateMessageParser(SecorConfig config) {
super(config);
partitionPrefixMap = new HashMap<String, String>();
messageTimeZone = config.getMessageTimeZone();
outputFormatter = new SimpleDateFormat(
StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter));
outputFormatter.setTimeZone(config.getTimeZone());
partitionPrefixMap = new HashMap<>();
String partitionMapping = config.getPartitionPrefixMapping();
if (null != partitionMapping) {
JSONObject jsonObject = (JSONObject) JSONValue.parse(partitionMapping);
Expand All @@ -73,7 +77,6 @@ public String[] extractPartitions(Message message) {
boolean prefixEnabled = mConfig.isPartitionPrefixEnabled();
String result[] = { prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate };
if (jsonObject != null) {

Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName());
if (fieldValue == null)
fieldValue = jsonObject.get(mConfig.getFallbackMessageTimestampName());
Expand All @@ -82,21 +85,25 @@ public String[] extractPartitions(Message message) {

Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier());
Object inputPattern = mConfig.getMessageTimestampInputPattern();
if (fieldValue != null && inputPattern != null) {
if (inputPattern != null) {
try {
/*
SimpleDateFormat outputFormatter = new SimpleDateFormat(
StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter));
Date dateFormat = null;
*/
Date dateFormat;
if (fieldValue instanceof Number) {
dateFormat = new Date(((Number) fieldValue).longValue());
} else {
SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString());
inputFormatter.setTimeZone(messageTimeZone);
dateFormat = inputFormatter.parse(fieldValue.toString());
}
result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + outputFormatter.format(dateFormat)
: outputFormatter.format(dateFormat);
return result;
} catch (Exception e) {
e.printStackTrace();
LOG.warn("Unable to get path: " + e.getMessage());
}
}
Expand Down
Loading

0 comments on commit 4b55db9

Please sign in to comment.