Skip to content

Commit

Permalink
SDK 0.3.0 release
Browse files Browse the repository at this point in the history
Updating SDK to use Kafka 2.6.0
Updating base docker image for 0.20.0
Updating mirrormaker
Updating certificate management code to download the correct cert
Adding date in consumer-group-id
  • Loading branch information
ruchirvaninasdaq committed Jan 19, 2021
1 parent d1d87d7 commit ca415e2
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 94 deletions.
4 changes: 2 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN mvn -B \

### Build Images ###
## SDK app ##
FROM strimzi/kafka:0.14.0-kafka-2.3.0 as sdk-app
FROM strimzi/kafka:0.20.0-kafka-2.6.0 as sdk-app

COPY . /home/kafka

Expand All @@ -24,4 +24,4 @@ ENV JAVAX_NET_SSL_TRUSTSTORE=truststore/ncdsTrustStore.p12

ENTRYPOINT ["bash","docker/run-sdk-app.sh"]

CMD ["-opt", "TOPICS"]
CMD ["-opt", "TOPICS"]
2 changes: 1 addition & 1 deletion docker/mirrormaker/template/kafka-mirror-maker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: KafkaMirrorMaker
metadata:
name: my-mirror-maker
spec:
version: 2.3.0
version: 2.6.0
replicas: 1
consumer:
bootstrapServers: clouddataservice.broker.bootstrap.nasdaq.com:9094
Expand Down
112 changes: 73 additions & 39 deletions ncds-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@
<parent>
<groupId>com.nasdaq.ncds</groupId>
<artifactId>ncds</artifactId>
<version>0.0.2</version>
<version>0.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>ncds-sdk</artifactId>
<version>0.0.2</version>
<packaging>jar</packaging>
<properties>
<kafkaScalaVersion>kafka_2.12</kafkaScalaVersion>
<junit5.version>5.6.2</junit5.version>
<junit5PlatformProvider.version>1.3.2</junit5PlatformProvider.version>
<curatorTestVersion>2.12.0</curatorTestVersion>
<slf4jVersion>1.7.30</slf4jVersion>
<surefire.version>2.22.2</surefire.version>
</properties>

<name>SDK</name>
<description>Provide Development Kit to connect with Kafka</description>
Expand Down Expand Up @@ -43,76 +51,88 @@
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>kafka-oauth-common</artifactId>
<version>0.1.0</version>
<version>0.6.0</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Testing -->

<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>${kafkaScalaVersion}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<!-- Don't bring in kafka's logging framework -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<dependency>
<artifactId>kafka-junit-core</artifactId>
<groupId>com.salesforce.kafka.test</groupId>
<version>3.2.0</version>

<artifactId>kafka-junit-core</artifactId>
<version>3.2.2</version>
<scope>test</scope>
</dependency>

<!-- JUnit5 tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.5.2</version>
<scope>compile</scope>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.5.2</version>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Mockito for mocks in tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.5.2</version>

<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.28.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>

<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curatorTestVersion}</version>
<scope>test</scope>
</dependency>

<!-- Logging in tests -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.0</version>
<type>maven-plugin</type>

<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4jVersion}</version>
<scope>test</scope>
</dependency>

<!-- Testing support class -->
<dependency>
<groupId>com.github.stephenc.high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.1.4</version>

<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>

</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.2</version>
<scope>test</scope>
</dependency>

Expand All @@ -122,22 +142,36 @@
<version>1.3.176</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>8</source>
<target>8</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
<version>${surefire.version}</version>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>${junit5PlatformProvider.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit5.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.apache.kafka.common.serialization.StringDeserializer;


import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

import static com.nasdaq.ncdsclient.internal.utils.AuthenticationConfigLoader.getClientID;
Expand Down Expand Up @@ -151,7 +153,7 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + UUID.randomUUID().toString());
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" +getDate() + "_" + UUID.randomUUID().toString());
ConfigProperties.resolve(kafkaProps);
return new KafkaAvroConsumer(kafkaProps, avroSchema);
}
Expand Down Expand Up @@ -211,4 +213,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
throw (e);
}
}

private String getDate(){
// Get Today's EST date
DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd");
dateformat.setTimeZone(TimeZone.getTimeZone("America/New_York"));
String date = dateformat.format(new Date());
return date;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.bouncycastle.asn1.x509.*;
import org.json.JSONObject;
import sun.security.x509.URIName;
import sun.security.x509.X509CertImpl;

import javax.net.ssl.HttpsURLConnection;
Expand All @@ -17,6 +19,8 @@
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.List;


public class InstallCertificates {
Expand Down Expand Up @@ -123,14 +127,26 @@ private void installCertsToTrustStore(String password) throws Exception {

}

private Certificate getAuthCertificate() throws IOException {
private Certificate getAuthCertificate() throws Exception {
URL url = new URL(null, authUrl, new sun.net.www.protocol.https.Handler());
HttpsURLConnection con = (HttpsURLConnection)url.openConnection();
con.connect();
Certificate[] certs = con.getServerCertificates();
for(Certificate cert : certs){
if(((X509CertImpl) cert).getSubjectDN().getName().contains("keycloak")){
return cert;
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
for(Certificate cert : certs) {
List<sun.security.x509.AccessDescription> descriptions = ((X509CertImpl) cert).getAuthorityInfoAccessExtension().getAccessDescriptions();
for (sun.security.x509.AccessDescription ad : descriptions) {
// check if it's a URL to issuer's certificate
if (ad.getAccessMethod().toString().equals(X509ObjectIdentifiers.id_ad_caIssuers.toString())) {
sun.security.x509.GeneralName location = ad.getAccessLocation();
if (location.getType() == GeneralName.uniformResourceIdentifier) {
// Get issuer's URL
String issuerUrl = ((URIName) location.getName()).getURI().toString();
URL url1 = new URL(issuerUrl);
X509Certificate issuer = (X509Certificate) certificateFactory.generateCertificate(url1.openStream());
return issuer;
}
}
}
}
return null;
Expand Down
32 changes: 15 additions & 17 deletions ncds-sdk/src/test/java/com/nasdaq/ncdsclient/NCDSSDKJunitTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.nasdaq.ncdsclient;

import junit.framework.Assert;
import com.nasdaq.ncdsclient.utils.NCDSTestUtil;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class NCDSSDKJunitTest {
Expand All @@ -23,16 +21,16 @@ class NCDSSDKJunitTest {
ncdsTestUtil = new NCDSTestUtil();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
}


@Test
@Order(1)
void testNCDSClient() {
public void testNCDSClient() {
try {
NCDSClient ncdsClient = new NCDSClient(null, null);
Assertions.assertNotNull(ncdsClient);
assertNotNull(ncdsClient);
} catch (Exception e){
new AssertionError("Error");
System.out.println(e.getMessage());
Expand All @@ -42,41 +40,41 @@ void testNCDSClient() {

@Test
@Order(2)
void testListTopicsForTheClient() {
public void testListTopicsForTheClient() {
try {
NCDSClient ncdsClient = new NCDSClient(null, null);
String[] topics = ncdsClient.ListTopicsForTheClient();
Collections.sort(Arrays.asList(topics));
String[] addedTopics = ncdsTestUtil.getAddedTopics();
Collections.sort(Arrays.asList(addedTopics));
Assert.assertEquals(Arrays.asList(topics),Arrays.asList(addedTopics));
assertEquals(Arrays.asList(topics),Arrays.asList(addedTopics));
} catch (Exception e) {
Assert.fail();
Assertions.fail();
System.out.println(e.getMessage());
}
}


@Test
@Order(3)
void testgetSchemaForTheTopic(){
public void testgetSchemaForTheTopic(){
try {
NCDSClient ncdsClient = new NCDSClient(null, null);
String topic = "GIDS";
String schemaFromSDK = ncdsClient.getSchemaForTheTopic(topic);
String schemaFile = "testGIDS.avsc";
String schemaFromFile = ncdsTestUtil.getSchemaForTopic(schemaFile);

Assert.assertEquals(schemaFromSDK,schemaFromFile);
assertEquals(schemaFromSDK,schemaFromFile);
} catch (Exception e) {
Assert.fail();
Assertions.fail();
System.out.println(e.getMessage());
}
}

@Test
@Order(4)
void testInsertion(){
public void testInsertion(){
ArrayList<GenericRecord> mockRecords = ncdsTestUtil.getMockMessages();
ArrayList<GenericRecord> mockRecordsFromKafka = new ArrayList<>();

Expand All @@ -90,9 +88,9 @@ void testInsertion(){
mockRecordsFromKafka.add(iteratorMocker.next().value());
}
//mockRecordsFromKafka.remove(0);
Assert.assertEquals(mockRecords,mockRecordsFromKafka);
assertEquals(mockRecords,mockRecordsFromKafka);
} catch (Exception e) {
Assert.fail();
Assertions.fail();
e.printStackTrace();
}
}
Expand Down
Loading

0 comments on commit ca415e2

Please sign in to comment.