diff --git a/ncds-sdk/pom.xml b/ncds-sdk/pom.xml
index 4a4b305..21d2d58 100644
--- a/ncds-sdk/pom.xml
+++ b/ncds-sdk/pom.xml
@@ -7,7 +7,7 @@
com.nasdaq.ncds
ncds
- 0.3.0
+ 0.4.0
../pom.xml
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java
index de636d7..4704b61 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java
@@ -12,6 +12,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -82,8 +83,12 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception {
if (kafkaSchema == null) {
throw new Exception("Kafka Schema not Found for Stream: " + streamName);
}
- kafkaConsumer = getConsumer(kafkaSchema);
- kafkaConsumer.subscribe(Collections.singletonList(streamName + ".stream"));
+ kafkaConsumer = getConsumer(kafkaSchema, streamName);
+ TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0);
+ kafkaConsumer.assign(Collections.singletonList(topicPartition));
+ if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
+ return seekToMidNight(topicPartition);
+ }
}
catch (Exception e) {
throw (e);
@@ -106,7 +111,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
if (kafkaSchema == null) {
throw new Exception("Kafka Schema not Found for Stream: " + streamName);
}
- kafkaConsumer = getConsumer(kafkaSchema);
+ kafkaConsumer = getConsumer(kafkaSchema, streamName);
TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0);
kafkaConsumer.assign(Collections.singleton(topicPartition));
@@ -137,7 +142,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
*/
- public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
+ public KafkaAvroConsumer getConsumer(Schema avroSchema, String streamName) throws Exception {
try {
if(!IsItJunit.isJUnitTest()) {
ConfigProperties.resolveAndExportToSystemProperties(securityProps);
@@ -147,9 +152,9 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
kafkaProps.put("value.deserializer", AvroDeserializer.class.getName());
if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
- kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
}
- kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" +getDate() + "_" + UUID.randomUUID().toString());
+ kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + streamName + "_" + getDate());
ConfigProperties.resolve(kafkaProps);
return new KafkaAvroConsumer(kafkaProps, avroSchema);
}
@@ -201,8 +206,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
if (newsSchema == null) {
throw new Exception("News Schema not Found ");
}
- kafkaConsumer = getConsumer(newsSchema);
- kafkaConsumer.subscribe(Collections.singletonList(topic+".stream"));
+ kafkaConsumer = getConsumer(newsSchema, topic);
+ TopicPartition topicPartition = new TopicPartition(topic + ".stream",0);
+ kafkaConsumer.assign(Collections.singletonList(topicPartition));
+ if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
+ return seekToMidNight(topicPartition);
+ }
return kafkaConsumer;
}
catch (Exception e){
@@ -217,4 +226,32 @@ private String getDate(){
String date = dateformat.format(new Date());
return date;
}
+
+ private KafkaConsumer seekToMidNight(TopicPartition topicPartition){
+ Map timestmaps = new HashMap();
+ timestmaps.put(topicPartition , getTodayMidNightTimeStamp());
+ Map offsetsForTimes = kafkaConsumer.offsetsForTimes(timestmaps);
+ OffsetAndTimestamp offsetAndTimestamp = null;
+ if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes.get(topicPartition)) != null) {
+ kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
+ } else {
+ kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
+ }
+ return kafkaConsumer;
+ }
+
+ private long getTodayMidNightTimeStamp(){
+
+ TimeZone timeZone = TimeZone.getTimeZone("America/New_York");
+
+ Calendar today = Calendar.getInstance(timeZone);
+ today.set(Calendar.HOUR_OF_DAY, 0);
+ today.set(Calendar.MINUTE, 0);
+ today.set(Calendar.SECOND, 0);
+
+ long timestampFromMidnight = today.getTimeInMillis();
+
+ return timestampFromMidnight;
+ }
+
}
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java
index 6889b8f..df0a26f 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java
@@ -5,10 +5,8 @@
import io.strimzi.kafka.oauth.common.ConfigProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
@@ -34,7 +32,6 @@ public ReadSchemaTopic(){
public Schema readSchema(String topic) throws Exception {
KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps));
- schemaConsumer.subscribe(Collections.singletonList(controlSchemaName));
Duration sec = Duration.ofSeconds(10);
Schema messageSchema = null;
ConsumerRecord lastRecord=null;
@@ -92,7 +89,6 @@ public Set getTopics() throws Exception{
Set topics = new HashSet();
KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps));
- schemaConsumer.subscribe(Collections.singletonList(controlSchemaName));
Duration sec = Duration.ofSeconds(10);
while (true) {
ConsumerRecords schemaRecords = schemaConsumer.poll(sec);
@@ -130,34 +126,66 @@ private KafkaAvroConsumer getConsumer(String cleindId) throws Exception {
}
Schema.Parser parser = new Schema.Parser();
- controlMessageSchema = parser.parse(ClassLoader.getSystemResourceAsStream("ControlMessageSchema.avsc"));
+ //controlMessageSchema = parser.parse(ClassLoader.getSystemResourceAsStream("ControlMessageSchema.avsc"));
+ controlMessageSchema = parser.parse(this.getClass().getResourceAsStream("/ControlMessageSchema.avsc"));
if (IsItJunit.isJUnitTest()) {
kafkaProps = KafkaConfigLoader.loadConfig();
}
kafkaProps.put("key.deserializer", StringSerializer.class.getName());
kafkaProps.put("value.deserializer", AvroDeserializer.class.getName());
- kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, cleindId + "_" + UUID.randomUUID().toString());
+ kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, cleindId);
kafkaProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576");
ConfigProperties.resolve(kafkaProps);
}
catch (Exception e) {
throw e;
}
- return new KafkaAvroConsumer(kafkaProps, controlMessageSchema);
+ KafkaAvroConsumer kafkaAvroConsumer = new KafkaAvroConsumer(kafkaProps, controlMessageSchema);
+ TopicPartition topicPartition = new TopicPartition(controlSchemaName,0);
+ kafkaAvroConsumer.assign(Collections.singletonList(topicPartition));
+ return seekTo7DaysBack(kafkaAvroConsumer, topicPartition);
}
private Schema internalSchema (String topic) throws Exception {
try {
final Schema topicSchema;
Schema.Parser parser = new Schema.Parser();
- topicSchema = parser.parse(ClassLoader.getSystemResourceAsStream("schemas/" + topic + ".avsc"));
+ topicSchema = parser.parse(this.getClass().getResourceAsStream("schemas/" + topic + ".avsc"));
return topicSchema;
} catch (Exception e){
throw new Exception("SCHEMA NOT FOUND FOR TOPIC: "+ topic);
}
}
- }
+ private KafkaAvroConsumer seekTo7DaysBack(KafkaAvroConsumer kafkaAvroConsumer, TopicPartition topicPartition){
+ Map timestmaps = new HashMap();
+ timestmaps.put(topicPartition , getTodayMidNightTimeStamp());
+ Map offsetsForTimes = kafkaAvroConsumer.offsetsForTimes(timestmaps);
+ OffsetAndTimestamp offsetAndTimestamp = null;
+ if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes.get(topicPartition)) != null) {
+ kafkaAvroConsumer.seek(topicPartition, offsetAndTimestamp.offset());
+ } else {
+ kafkaAvroConsumer.seekToBeginning(Collections.singleton(topicPartition));
+ }
+ return kafkaAvroConsumer;
+ }
+
+ private long getTodayMidNightTimeStamp(){
+
+ TimeZone timeZone = TimeZone.getTimeZone("America/New_York");
+
+ Calendar today = Calendar.getInstance(timeZone);
+ today.add(Calendar.DATE, -7);
+ today.set(Calendar.HOUR_OF_DAY, 0);
+ today.set(Calendar.MINUTE, 0);
+ today.set(Calendar.SECOND, 0);
+
+ long timestampFromMidnight = today.getTimeInMillis();
+
+ return timestampFromMidnight;
+ }
+
+ }
\ No newline at end of file
diff --git a/ncdssdk-client/pom.xml b/ncdssdk-client/pom.xml
index 6f1bffc..1429aad 100644
--- a/ncdssdk-client/pom.xml
+++ b/ncdssdk-client/pom.xml
@@ -7,7 +7,7 @@
com.nasdaq.ncds
ncds
- 0.3.0
+ 0.4.0
ncdssdk-client
@@ -19,7 +19,7 @@
com.nasdaq.ncds
ncds-sdk
- 0.3.0
+ 0.4.0
compile
diff --git a/pom.xml b/pom.xml
index 2dfa57e..9e1a6b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.nasdaq.ncds
ncds
- 0.3.0
+ 0.4.0
pom
Nasdaq Cloud Data Service