diff --git a/docker/Dockerfile b/docker/Dockerfile
index d78a1e8..599b390 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -10,7 +10,7 @@ RUN mvn -B \
### Build Images ###
## SDK app ##
-FROM strimzi/kafka:0.14.0-kafka-2.3.0 as sdk-app
+FROM strimzi/kafka:0.20.0-kafka-2.6.0 as sdk-app
COPY . /home/kafka
@@ -24,4 +24,4 @@ ENV JAVAX_NET_SSL_TRUSTSTORE=truststore/ncdsTrustStore.p12
ENTRYPOINT ["bash","docker/run-sdk-app.sh"]
-CMD ["-opt", "TOPICS"]
\ No newline at end of file
+CMD ["-opt", "TOPICS"]
diff --git a/docker/mirrormaker/template/kafka-mirror-maker.yaml b/docker/mirrormaker/template/kafka-mirror-maker.yaml
index c7cb225..26fbfcb 100644
--- a/docker/mirrormaker/template/kafka-mirror-maker.yaml
+++ b/docker/mirrormaker/template/kafka-mirror-maker.yaml
@@ -3,7 +3,7 @@ kind: KafkaMirrorMaker
metadata:
name: my-mirror-maker
spec:
- version: 2.3.0
+ version: 2.6.0
replicas: 1
consumer:
bootstrapServers: clouddataservice.broker.bootstrap.nasdaq.com:9094
diff --git a/ncds-sdk/pom.xml b/ncds-sdk/pom.xml
index 4c5b68f..4a4b305 100644
--- a/ncds-sdk/pom.xml
+++ b/ncds-sdk/pom.xml
@@ -7,12 +7,20 @@
com.nasdaq.ncds
ncds
- 0.0.2
+ 0.3.0
../pom.xml
ncds-sdk
- 0.0.2
+ jar
+
+ kafka_2.12
+ 5.6.2
+ 1.3.2
+ 2.12.0
+ 1.7.30
+ 2.22.2
+
SDK
Provide Development Kit to connect with Kafka
@@ -43,76 +51,88 @@
io.strimzi
kafka-oauth-common
- 0.1.0
+ 0.6.0
compile
-
- org.apache.kafka
- kafka_2.11
- ${kafka.version}
-
+
+
+
+ org.apache.kafka
+ ${kafkaScalaVersion}
+ ${kafka.version}
+
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ javax.mail
+ mail
+
+
+ test
+
+
- kafka-junit-core
com.salesforce.kafka.test
- 3.2.0
-
+ kafka-junit-core
+ 3.2.2
+ test
+
org.junit.jupiter
junit-jupiter-api
- 5.5.2
- compile
+ ${junit5.version}
+ test
org.junit.jupiter
- junit-jupiter-engine
- 5.5.2
+ junit-jupiter-params
+ ${junit5.version}
+ test
+
- org.junit.jupiter
- junit-jupiter-params
- 5.5.2
-
+ org.mockito
+ mockito-core
+ 2.28.2
+ test
- org.hamcrest
- hamcrest-all
- 1.3
-
+ org.apache.curator
+ curator-test
+ ${curatorTestVersion}
+ test
+
- org.apache.maven.plugins
- maven-checkstyle-plugin
- 3.1.0
- maven-plugin
-
+ org.slf4j
+ slf4j-simple
+ ${slf4jVersion}
+ test
+
com.github.stephenc.high-scale-lib
high-scale-lib
1.1.4
-
+ test
org.apache.commons
commons-lang3
3.3.2
-
-
-
-
- org.apache.logging.log4j
- log4j-core
- 2.13.2
test
@@ -122,6 +142,7 @@
1.3.176
test
+
@@ -129,15 +150,28 @@
org.apache.maven.plugins
maven-compiler-plugin
+ 3.8.0
-
- 8
+
+ 1.8
org.apache.maven.plugins
maven-surefire-plugin
- 2.22.1
+ ${surefire.version}
+
+
+ org.junit.platform
+ junit-platform-surefire-provider
+ ${junit5PlatformProvider.version}
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit5.version}
+
+
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java
index cf2680a..570e879 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java
@@ -17,6 +17,8 @@
import org.apache.kafka.common.serialization.StringDeserializer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.*;
import static com.nasdaq.ncdsclient.internal.utils.AuthenticationConfigLoader.getClientID;
@@ -151,7 +153,7 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
- kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + UUID.randomUUID().toString());
+ kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" +getDate() + "_" + UUID.randomUUID().toString());
ConfigProperties.resolve(kafkaProps);
return new KafkaAvroConsumer(kafkaProps, avroSchema);
}
@@ -211,4 +213,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
throw (e);
}
}
+
+ private String getDate(){
+ // Get Today's EST date
+ DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd");
+ dateformat.setTimeZone(TimeZone.getTimeZone("America/New_York"));
+ String date = dateformat.format(new Date());
+ return date;
+ }
}
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java
index 46f1f5a..d74622e 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java
@@ -8,7 +8,9 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
+import org.bouncycastle.asn1.x509.*;
import org.json.JSONObject;
+import sun.security.x509.URIName;
import sun.security.x509.X509CertImpl;
import javax.net.ssl.HttpsURLConnection;
@@ -17,6 +19,8 @@
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.List;
public class InstallCertificates {
@@ -123,14 +127,26 @@ private void installCertsToTrustStore(String password) throws Exception {
}
- private Certificate getAuthCertificate() throws IOException {
+ private Certificate getAuthCertificate() throws Exception {
URL url = new URL(null, authUrl, new sun.net.www.protocol.https.Handler());
HttpsURLConnection con = (HttpsURLConnection)url.openConnection();
con.connect();
Certificate[] certs = con.getServerCertificates();
- for(Certificate cert : certs){
- if(((X509CertImpl) cert).getSubjectDN().getName().contains("keycloak")){
- return cert;
+ CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
+ for(Certificate cert : certs) {
+ List descriptions = ((X509CertImpl) cert).getAuthorityInfoAccessExtension().getAccessDescriptions();
+ for (sun.security.x509.AccessDescription ad : descriptions) {
+ // check if it's a URL to issuer's certificate
+ if (ad.getAccessMethod().toString().equals(X509ObjectIdentifiers.id_ad_caIssuers.toString())) {
+ sun.security.x509.GeneralName location = ad.getAccessLocation();
+ if (location.getType() == GeneralName.uniformResourceIdentifier) {
+ // Get issuer's URL
+ String issuerUrl = ((URIName) location.getName()).getURI().toString();
+ URL url1 = new URL(issuerUrl);
+ X509Certificate issuer = (X509Certificate) certificateFactory.generateCertificate(url1.openStream());
+ return issuer;
+ }
+ }
}
}
return null;
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/NCDSSDKJunitTest.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/NCDSSDKJunitTest.java
index 87bb7b9..02fe9f7 100644
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/NCDSSDKJunitTest.java
+++ b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/NCDSSDKJunitTest.java
@@ -1,17 +1,15 @@
package com.nasdaq.ncdsclient;
-import junit.framework.Assert;
import com.nasdaq.ncdsclient.utils.NCDSTestUtil;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class NCDSSDKJunitTest {
@@ -23,16 +21,16 @@ class NCDSSDKJunitTest {
ncdsTestUtil = new NCDSTestUtil();
} catch (Exception e) {
e.printStackTrace();
+ System.exit(0);
}
}
-
@Test
@Order(1)
- void testNCDSClient() {
+ public void testNCDSClient() {
try {
NCDSClient ncdsClient = new NCDSClient(null, null);
- Assertions.assertNotNull(ncdsClient);
+ assertNotNull(ncdsClient);
} catch (Exception e){
new AssertionError("Error");
System.out.println(e.getMessage());
@@ -42,16 +40,16 @@ void testNCDSClient() {
@Test
@Order(2)
- void testListTopicsForTheClient() {
+ public void testListTopicsForTheClient() {
try {
NCDSClient ncdsClient = new NCDSClient(null, null);
String[] topics = ncdsClient.ListTopicsForTheClient();
Collections.sort(Arrays.asList(topics));
String[] addedTopics = ncdsTestUtil.getAddedTopics();
Collections.sort(Arrays.asList(addedTopics));
- Assert.assertEquals(Arrays.asList(topics),Arrays.asList(addedTopics));
+ assertEquals(Arrays.asList(topics),Arrays.asList(addedTopics));
} catch (Exception e) {
- Assert.fail();
+ Assertions.fail();
System.out.println(e.getMessage());
}
}
@@ -59,7 +57,7 @@ void testListTopicsForTheClient() {
@Test
@Order(3)
- void testgetSchemaForTheTopic(){
+ public void testgetSchemaForTheTopic(){
try {
NCDSClient ncdsClient = new NCDSClient(null, null);
String topic = "GIDS";
@@ -67,16 +65,16 @@ void testgetSchemaForTheTopic(){
String schemaFile = "testGIDS.avsc";
String schemaFromFile = ncdsTestUtil.getSchemaForTopic(schemaFile);
- Assert.assertEquals(schemaFromSDK,schemaFromFile);
+ assertEquals(schemaFromSDK,schemaFromFile);
} catch (Exception e) {
- Assert.fail();
+ Assertions.fail();
System.out.println(e.getMessage());
}
}
@Test
@Order(4)
- void testInsertion(){
+ public void testInsertion(){
ArrayList mockRecords = ncdsTestUtil.getMockMessages();
ArrayList mockRecordsFromKafka = new ArrayList<>();
@@ -90,9 +88,9 @@ void testInsertion(){
mockRecordsFromKafka.add(iteratorMocker.next().value());
}
//mockRecordsFromKafka.remove(0);
- Assert.assertEquals(mockRecords,mockRecordsFromKafka);
+ assertEquals(mockRecords,mockRecordsFromKafka);
} catch (Exception e) {
- Assert.fail();
+ Assertions.fail();
e.printStackTrace();
}
}
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/AvroMocker.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/AvroMocker.java
index f7dbc44..4117874 100644
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/AvroMocker.java
+++ b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/AvroMocker.java
@@ -3,7 +3,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import java.util.ArrayList;
import java.util.List;
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/NCDSTestUtil.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/NCDSTestUtil.java
index c8bc3a5..113f5aa 100644
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/NCDSTestUtil.java
+++ b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/NCDSTestUtil.java
@@ -2,8 +2,10 @@
import com.nasdaq.ncdsclient.core.GenericRecordSerializer;
import com.nasdaq.ncdsclient.core.KafkaControlSchema;
-import com.nasdaq.ncdsclient.core.KafkaTestServer;
-import com.nasdaq.ncdsclient.core.KafkaTestUtils;
+import com.salesforce.kafka.test.KafkaTestServer;
+import com.salesforce.kafka.test.KafkaTestUtils;
+import com.salesforce.kafka.test.listeners.BrokerListener;
+import com.salesforce.kafka.test.listeners.PlainListener;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -63,7 +65,9 @@ private GenericRecord getStreamInitiatedRecord(String strmName, Schema schema) {
}
private void beforeEachTest() throws Exception {
- kafkaTestServer = new KafkaTestServer(getDefaultBrokerOverrideProperties());
+ final List brokerListeners = new ArrayList<>();
+ brokerListeners.add(new PlainListenerSpecificPort());
+ kafkaTestServer = new KafkaTestServer(getDefaultBrokerOverrideProperties(), brokerListeners);
kafkaTestServer.start();
}
@@ -144,6 +148,7 @@ private void addSchemasToControlTopic() throws Exception {
while (!future.isDone()) {
Thread.sleep(500L);
}
+ producer.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
@@ -192,6 +197,7 @@ public void pushMockMessages() throws IOException {
while (!future.isDone()) {
Thread.sleep(500L);
}
+ mockerProducer.close();
}
}catch (Exception e){
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/PlainListenerSpecificPort.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/PlainListenerSpecificPort.java
new file mode 100644
index 0000000..1aedd6a
--- /dev/null
+++ b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/utils/PlainListenerSpecificPort.java
@@ -0,0 +1,32 @@
+package com.nasdaq.ncdsclient.utils;
+
+import com.salesforce.kafka.test.listeners.AbstractListener;
+
+import java.util.Properties;
+
+public class PlainListenerSpecificPort extends AbstractListener{
+ @Override
+ public String getProtocol() {
+ return "PLAINTEXT";
+ }
+
+ @Override
+ public Properties getBrokerProperties() {
+ return new Properties();
+ }
+
+ @Override
+ public Properties getClientProperties() {
+ return new Properties();
+ }
+
+ @Override
+ public PlainListenerSpecificPort onPorts(int... ports) {
+ return super.onPorts(9095);
+ }
+
+ @Override
+ public int getNextPort() {
+ return 9095;
+ }
+}
diff --git a/ncdssdk-client/pom.xml b/ncdssdk-client/pom.xml
index f9065ae..6f1bffc 100644
--- a/ncdssdk-client/pom.xml
+++ b/ncdssdk-client/pom.xml
@@ -7,11 +7,10 @@
com.nasdaq.ncds
ncds
- 0.0.2
+ 0.3.0
ncdssdk-client
- 0.0.2
Client
Provide example client to interact with data
@@ -20,7 +19,7 @@
com.nasdaq.ncds
ncds-sdk
- 0.0.2
+ 0.3.0
compile
diff --git a/pom.xml b/pom.xml
index 09e9249..2dfa57e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.nasdaq.ncds
ncds
- 0.0.2
+ 0.3.0
pom
Nasdaq Cloud Data Service
@@ -16,9 +16,7 @@
1.8.2
- 2.0.1
- 2.0.1
- 5.2.2
+ 2.6.0
true
@@ -35,12 +33,6 @@
org.apache.kafka
kafka-clients
- ${kafka.version-client}
-
-
-
- org.apache.kafka
- kafka-streams
${kafka.version}
@@ -48,24 +40,14 @@
io.strimzi
kafka-oauth-client
- 0.1.0
+ 0.6.0
-
-
-
-
-
-
-
-
-
-
-
+
org.slf4j
slf4j-log4j12
- 1.7.5
+ 1.7.30