From 51f95594545e3a83530b3833fb4b39b7dbace778 Mon Sep 17 00:00:00 2001 From: Ruchir Vani Date: Mon, 19 Jul 2021 14:51:33 -0400 Subject: [PATCH] Reusing Consumer Group Reusing Consumer group within NCDS sdk --- ncds-sdk/pom.xml | 2 +- .../consumer/NasdaqKafkaAvroConsumer.java | 53 ++++++++++++++++--- .../ncdsclient/internal/ReadSchemaTopic.java | 52 +++++++++++++----- ncdssdk-client/pom.xml | 4 +- pom.xml | 2 +- 5 files changed, 89 insertions(+), 24 deletions(-) diff --git a/ncds-sdk/pom.xml b/ncds-sdk/pom.xml index 4a4b305..21d2d58 100644 --- a/ncds-sdk/pom.xml +++ b/ncds-sdk/pom.xml @@ -7,7 +7,7 @@ com.nasdaq.ncds ncds - 0.3.0 + 0.4.0 ../pom.xml diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java index de636d7..4704b61 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; @@ -82,8 +83,12 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception { if (kafkaSchema == null) { throw new Exception("Kafka Schema not Found for Stream: " + streamName); } - kafkaConsumer = getConsumer(kafkaSchema); - kafkaConsumer.subscribe(Collections.singletonList(streamName + ".stream")); + kafkaConsumer = getConsumer(kafkaSchema, streamName); + TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0); + kafkaConsumer.assign(Collections.singletonList(topicPartition)); + if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) { + return seekToMidNight(topicPartition); + } } catch (Exception e) { throw (e); @@ -106,7 +111,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws if (kafkaSchema == null) { throw new Exception("Kafka Schema not Found for Stream: " + streamName); } - kafkaConsumer = getConsumer(kafkaSchema); + kafkaConsumer = getConsumer(kafkaSchema, streamName); TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0); kafkaConsumer.assign(Collections.singleton(topicPartition)); @@ -137,7 +142,7 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws */ - public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception { + public KafkaAvroConsumer getConsumer(Schema avroSchema, String streamName) throws Exception { try { if(!IsItJunit.isJUnitTest()) { ConfigProperties.resolveAndExportToSystemProperties(securityProps); @@ -147,9 +152,9 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception { kafkaProps.put("key.deserializer", StringDeserializer.class.getName()); kafkaProps.put("value.deserializer", AvroDeserializer.class.getName()); if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) { - kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); } - kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" +getDate() + "_" + UUID.randomUUID().toString()); + kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + streamName + "_" + getDate()); ConfigProperties.resolve(kafkaProps); return new KafkaAvroConsumer(kafkaProps, avroSchema); } @@ -201,8 +206,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception { if (newsSchema == null) { throw new Exception("News Schema not Found "); } - kafkaConsumer = getConsumer(newsSchema); - kafkaConsumer.subscribe(Collections.singletonList(topic+".stream")); + kafkaConsumer = getConsumer(newsSchema, topic); + TopicPartition topicPartition = new TopicPartition(topic + ".stream",0); + kafkaConsumer.assign(Collections.singletonList(topicPartition)); + if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) { + return seekToMidNight(topicPartition); + } return kafkaConsumer; } catch (Exception e){ @@ -217,4 +226,32 @@ private String getDate(){ String date = dateformat.format(new Date()); return date; } + + private KafkaConsumer seekToMidNight(TopicPartition topicPartition){ + Map timestmaps = new HashMap(); + timestmaps.put(topicPartition , getTodayMidNightTimeStamp()); + Map offsetsForTimes = kafkaConsumer.offsetsForTimes(timestmaps); + OffsetAndTimestamp offsetAndTimestamp = null; + if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes.get(topicPartition)) != null) { + kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset()); + } else { + kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition)); + } + return kafkaConsumer; + } + + private long getTodayMidNightTimeStamp(){ + + TimeZone timeZone = TimeZone.getTimeZone("America/New_York"); + + Calendar today = Calendar.getInstance(timeZone); + today.set(Calendar.HOUR_OF_DAY, 0); + today.set(Calendar.MINUTE, 0); + today.set(Calendar.SECOND, 0); + + long timestampFromMidnight = today.getTimeInMillis(); + + return timestampFromMidnight; + } + } diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java index 6889b8f..df0a26f 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java @@ -5,10 +5,8 @@ import io.strimzi.kafka.oauth.common.ConfigProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; @@ -34,7 +32,6 @@ public ReadSchemaTopic(){ public Schema readSchema(String topic) throws Exception { KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps)); - schemaConsumer.subscribe(Collections.singletonList(controlSchemaName)); Duration sec = Duration.ofSeconds(10); Schema messageSchema = null; ConsumerRecord lastRecord=null; @@ -92,7 +89,6 @@ public Set getTopics() throws Exception{ Set topics = new HashSet(); KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps)); - schemaConsumer.subscribe(Collections.singletonList(controlSchemaName)); Duration sec = Duration.ofSeconds(10); while (true) { ConsumerRecords schemaRecords = schemaConsumer.poll(sec); @@ -130,34 +126,66 @@ private KafkaAvroConsumer getConsumer(String cleindId) throws Exception { } Schema.Parser parser = new Schema.Parser(); - controlMessageSchema = parser.parse(ClassLoader.getSystemResourceAsStream("ControlMessageSchema.avsc")); + //controlMessageSchema = parser.parse(ClassLoader.getSystemResourceAsStream("ControlMessageSchema.avsc")); + controlMessageSchema = parser.parse(this.getClass().getResourceAsStream("/ControlMessageSchema.avsc")); if (IsItJunit.isJUnitTest()) { kafkaProps = KafkaConfigLoader.loadConfig(); } kafkaProps.put("key.deserializer", StringSerializer.class.getName()); kafkaProps.put("value.deserializer", AvroDeserializer.class.getName()); - kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, cleindId + "_" + UUID.randomUUID().toString()); + kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, cleindId); kafkaProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"); ConfigProperties.resolve(kafkaProps); } catch (Exception e) { throw e; } - return new KafkaAvroConsumer(kafkaProps, controlMessageSchema); + KafkaAvroConsumer kafkaAvroConsumer = new KafkaAvroConsumer(kafkaProps, controlMessageSchema); + TopicPartition topicPartition = new TopicPartition(controlSchemaName,0); + kafkaAvroConsumer.assign(Collections.singletonList(topicPartition)); + return seekTo7DaysBack(kafkaAvroConsumer, topicPartition); } private Schema internalSchema (String topic) throws Exception { try { final Schema topicSchema; Schema.Parser parser = new Schema.Parser(); - topicSchema = parser.parse(ClassLoader.getSystemResourceAsStream("schemas/" + topic + ".avsc")); + topicSchema = parser.parse(this.getClass().getResourceAsStream("schemas/" + topic + ".avsc")); return topicSchema; } catch (Exception e){ throw new Exception("SCHEMA NOT FOUND FOR TOPIC: "+ topic); } } - } + private KafkaAvroConsumer seekTo7DaysBack(KafkaAvroConsumer kafkaAvroConsumer, TopicPartition topicPartition){ + Map timestmaps = new HashMap(); + timestmaps.put(topicPartition , getTodayMidNightTimeStamp()); + Map offsetsForTimes = kafkaAvroConsumer.offsetsForTimes(timestmaps); + OffsetAndTimestamp offsetAndTimestamp = null; + if (offsetsForTimes != null && (offsetAndTimestamp = offsetsForTimes.get(topicPartition)) != null) { + kafkaAvroConsumer.seek(topicPartition, offsetAndTimestamp.offset()); + } else { + kafkaAvroConsumer.seekToBeginning(Collections.singleton(topicPartition)); + } + return kafkaAvroConsumer; + } + + private long getTodayMidNightTimeStamp(){ + + TimeZone timeZone = TimeZone.getTimeZone("America/New_York"); + + Calendar today = Calendar.getInstance(timeZone); + today.add(Calendar.DATE, -7); + today.set(Calendar.HOUR_OF_DAY, 0); + today.set(Calendar.MINUTE, 0); + today.set(Calendar.SECOND, 0); + + long timestampFromMidnight = today.getTimeInMillis(); + + return timestampFromMidnight; + } + + } \ No newline at end of file diff --git a/ncdssdk-client/pom.xml b/ncdssdk-client/pom.xml index 6f1bffc..1429aad 100644 --- a/ncdssdk-client/pom.xml +++ b/ncdssdk-client/pom.xml @@ -7,7 +7,7 @@ com.nasdaq.ncds ncds - 0.3.0 + 0.4.0 ncdssdk-client @@ -19,7 +19,7 @@ com.nasdaq.ncds ncds-sdk - 0.3.0 + 0.4.0 compile diff --git a/pom.xml b/pom.xml index 2dfa57e..9e1a6b9 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.nasdaq.ncds ncds - 0.3.0 + 0.4.0 pom Nasdaq Cloud Data Service