From 04af855f6c119cc1485f1412c53b049b3d2a0896 Mon Sep 17 00:00:00 2001
From: Ruchir Vani <32844383+ruchirvaninasdaq@users.noreply.github.com>
Date: Mon, 1 Nov 2021 11:39:23 -0400
Subject: [PATCH] release-0.5.0
SDK 0.5.0 Changes:
- Remove certificate pinning for java sdk
- Removing certificate pinning from docker sdk
- Provide and option for custom groupid suffix in mirrormaker
- Updated unittest stack
- Updated ReadME for certificate pinning
---
README.md | 182 +++---
docker/Dockerfile | 4 -
docker/README.md | 22 +-
docker/auth.properties | 3 +-
docker/mirrormaker/README.md | 30 +-
docker/mirrormaker/consumer.properties | 3 +-
docker/mirrormaker/run-mirror-maker.sh | 13 +-
docker/run-sdk-app.sh | 28 +-
ncds-sdk/pom.xml | 3 +-
.../com/nasdaq/ncdsclient/NCDSClient.java | 2 +-
.../utils/AuthenticationConfigLoader.java | 14 +-
.../internal/utils/InstallCertificates.java | 154 ------
.../internal/utils/KafkaConfigLoader.java | 7 +-
.../core/AbstractKafkaTestResource.java | 181 ------
.../core/AbstractZookeeperTestResource.java | 26 -
.../nasdaq/ncdsclient/core/KafkaBroker.java | 55 --
.../nasdaq/ncdsclient/core/KafkaBrokers.java | 96 ----
.../nasdaq/ncdsclient/core/KafkaCluster.java | 20 -
.../nasdaq/ncdsclient/core/KafkaProvider.java | 32 --
.../ncdsclient/core/KafkaTestCluster.java | 295 ----------
.../ncdsclient/core/KafkaTestServer.java | 371 -------------
.../ncdsclient/core/KafkaTestUtils.java | 521 ------------------
.../ncdsclient/core/ListenerProperties.java | 51 --
.../ncdsclient/core/ProducedKafkaRecord.java | 87 ---
.../com/nasdaq/ncdsclient/core/Utils.java | 29 -
.../ncdsclient/core/ZookeeperTestServer.java | 106 ----
.../ncdsclient/listeners/BrokerListener.java | 33 --
.../listeners/JaasValidationTool.java | 16 -
.../ncdsclient/listeners/PlainListener.java | 24 -
.../listeners/SaslPlainListener.java | 84 ---
.../ncdsclient/listeners/SaslSslListener.java | 179 ------
.../ncdsclient/listeners/SslListener.java | 123 -----
ncdssdk-client/pom.xml | 7 +-
.../com/nasdaq/ncdsclient/NCDSSession.java | 27 -
pom.xml | 2 +-
35 files changed, 144 insertions(+), 2686 deletions(-)
mode change 100644 => 100755 docker/run-sdk-app.sh
delete mode 100644 ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractKafkaTestResource.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractZookeeperTestResource.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBroker.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBrokers.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaCluster.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaProvider.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestCluster.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestServer.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestUtils.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/ListenerProperties.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/ProducedKafkaRecord.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/Utils.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/ZookeeperTestServer.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/listeners/BrokerListener.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/listeners/JaasValidationTool.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/listeners/PlainListener.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/listeners/SaslPlainListener.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/listeners/SaslSslListener.java
delete mode 100644 ncds-sdk/src/test/java/com/nasdaq/ncdsclient/listeners/SslListener.java
diff --git a/README.md b/README.md
index d1273d7..6a946b7 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# Nasdaq Cloud Data Service (NCDS) - Streaming Client Java SDK
-Nasdaq Cloud Data Service (NCDS) provides a modern and efficient method of delivery for realtime exchange data and other financial information. Data is made available through a suite of APIs, allowing for effortless integration of data from disparate sources, and a dramatic reduction in time to market for customer-designed applications. The API is highly scalable, and robust enough to support the delivery of real-time exchange data. Read our latest NCDS case study: Nasdaq Cloud Data Service Provides Real-Time Data with Efficiency and Scale to Fintech Newcomer Unhedged
+Nasdaq Cloud Data Service (NCDS) provides a modern and efficient method of delivery for realtime exchange data and other financial information. Data is made available through a suite of APIs, allowing for effortless integration of data from disparate sources, and a dramatic reduction in time to market for customer-designed applications. The API is highly scalable, and robust enough to support the delivery of real-time exchange data. Read our latest NCDS case study: Nasdaq Cloud Data Service Provides Real-Time Data with Efficiency and Scale to Fintech Newcomer Unhedged
This repository provides an SDK for developing applications to access the NCDS API. While the SDK is open source, connecting to the API does require credentials, which are provided by Nasdaq during an on-boarding process.
@@ -9,7 +9,7 @@ This repository provides an SDK for developing applications to access the NCDS A
### Equities
#### The Nasdaq Stock Market
- [Nasdaq Basic](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NasdaqBasic-Cloud.pdf)
-- [Nasdaq Last Sale+](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NLSPlus-cloud.pdf)
+- [Nasdaq Last Sale+](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NLSPlus-cloud.pdf)
- [Nasdaq TotalView](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/Totalview-ITCH-cloud.pdf)
- [Nasdaq Consolidated Quotes and Trades](https://github.com/Nasdaq/CloudDataService/raw/master/specs/CQT-cloud.pdf)
#### Nasdaq BX
@@ -24,24 +24,24 @@ This repository provides an SDK for developing applications to access the NCDS A
- [Global Index Data Service](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/GIDS_Cloud.pdf)
### Options
#### Nasdaq U.S. Derivatives
-- [Nasdaq Smart Options](http://nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NCDSSmartOptions.pdf)
+- [Nasdaq Smart Options](http://nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NCDSSmartOptions.pdf)
### Mutual Funds
-- [Nasdaq Fund Network](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NFNDS_NCDS.pdf)
+- [Nasdaq Fund Network](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/NFNDS_NCDS.pdf)
### News
-- [Financial News](http://nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/MTNewswires-cloud.pdf)
+- [Financial News](http://nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/MTNewswires-cloud.pdf)
# Table of Contents
- - [Getting Started](#Getting-Started)
- - [Using the SDK](#Using-The-SDK)
- - [Documentation](#Documentation)
- - [Docker](#Docker)
- - [Contributing](#Contributing)
- - [License](#License)
-
+- [Getting Started](#Getting-Started)
+- [Using the SDK](#Using-The-SDK)
+- [Documentation](#Documentation)
+- [Docker](#Docker)
+- [Contributing](#Contributing)
+- [License](#License)
+
## Getting Started
-### Pre-requisites
+### Pre-requisites
- OpenJDK 8
- Maven 3
@@ -49,58 +49,43 @@ This repository provides an SDK for developing applications to access the NCDS A
### Get the SDK
Clone the repository: ```git clone https://github.com/Nasdaq/CloudDataService```
- - Run ```mvn install``` to build the library, test, javadoc and source jars and install to your local Maven repository.
- - Run ```mvn javadoc:javadoc``` to build the documentation.
-
-### Retrieving certificates
-
-Run jar `ncdssdk-client/target/ncdssdk-client.jar` with arguments, which take path and password (minimum 6 characters) for keystore
-
-For example:
-
-```java -jar ncdssdk-client.jar -opt INSTALLCERTS -path /my/trusted/store/ncdsinstallcerts -pass my_password```
+- Run ```mvn install``` to build the library, test, javadoc and source jars and install to your local Maven repository.
+- Run ```mvn javadoc:javadoc``` to build the documentation.
### Stream configuration
- Replace example stream properties in the file **kafka-config.properties** (https://github.com/Nasdaq/CloudDataService/blob/master/ncdssdk-client/src/main/resources/kafka-config.properties) with provided values during on-boarding.
+Replace example stream properties in the file **kafka-config.properties** (https://github.com/Nasdaq/CloudDataService/blob/master/ncdssdk-client/src/main/resources/kafka-config.properties) with provided values during on-boarding.
+
+Required kafka configuration
- Required kafka configuration
-
```properties
bootstrap.servers={streams_endpoint_url}:9094
```
-
- For optional consumer configurations see: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html
-
- For example:
+
+For optional consumer configurations see: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html
+
+For example:
```properties
request.timeout.ms=20000
retry.backoff.ms=500
max.poll.records=2000
```
-
+
### Client Authentication configuration
- Replace example client authentication properties in the file **clientAuthentication-config.properties** (https://github.com/Nasdaq/CloudDataService/blob/master/ncdssdk-client/src/main/resources/clientAuthentication-config.properties) with valid credentials provided during on-boarding.
-
+Replace example client authentication properties in the file **clientAuthentication-config.properties** (https://github.com/Nasdaq/CloudDataService/blob/master/ncdssdk-client/src/main/resources/clientAuthentication-config.properties) with valid credentials provided during on-boarding.
+
```properties
oauth.token.endpoint.uri=https://{auth_endpoint_url}/auth/realms/pro-realm/protocol/openid-connect/token
oauth.client.id=client
oauth.client.secret=client-secret
oauth.username.claim=preferred_username
-```
- Update the path to your local Keystore
-
-```properties
-javax.net.ssl.trustStore=/my/trusted/store/ncdsinstallcerts/ncdsTrustStore.p12
-javax.net.ssl.trustStorePassword=my_password
-javax.net.ssl.trustStoreType=PKCS12
-```
+```
### Create NCDS Session Client
- Run `mvn clean install` command in ncdssdk-client project (https://github.com/Nasdaq/CloudDataService/tree/master/ncdssdk-client). It will generate the jar file in target file.
- How to run the jar:
+Run `mvn clean install` command in ncdssdk-client project (https://github.com/Nasdaq/CloudDataService/tree/master/ncdssdk-client). It will generate the jar file in target file.
+How to run the jar:
```
-opt -- Provide the operation you want to perform \n" +
" * TOP - View the top nnn records in the Topic/Stream\n"+
@@ -108,7 +93,6 @@ javax.net.ssl.trustStoreType=PKCS12
" * METRICS - Display the Metrics for the topic\n"+
" * TOPICS - List of streams available on Nasdaq Cloud DataService\n"+
" * GETMSG - Get one example message for the\n"+
- " * INSTALLCERTS - Install certificate to keystore\n"+
" * CONTSTREAM - Retrieve continuous stream \n"+
" * FILTERSTREAM - Retrieve continuous stream filtered by symbols and/or msgtypes \n"+
" * NEWS - Get stream for Pro Global news stream\n"+
@@ -120,26 +104,24 @@ javax.net.ssl.trustStoreType=PKCS12
"-kafkaprops -- Provide Kafka Properties File path --- For using different set of Kafka Properties \n"+
"-n -- Provide number of messages to retrieve --- REQUIRED for TOP \n"+
"-msgName -- Provide name of message based on schema --- REQUIRED for GETMSG \n"+
-"-path -- Provide the path for key store --- REQUIRED for INSTALLCERTS \n"+
-"-pass -- Provide the password for key store --- REQUIRED for INSTALLCERTS \n"+
"-timestamp -- Provide timestamp in milliseconds --- OPTIONAL for TOP, CONTSTREAM and FILTERSTREAM\n"
```
-
- Few examples to use jar:
-
- Get first 100 records for given stream
-
- ```java -jar ncdssdk-client.jar -opt TOP -n 100 -topic GIDS```
-
- Get all available streams
-
- ```java -jar ncdssdk-client.jar -opt TOPICS```
-
+
+Few examples to use jar:
+
+Get first 100 records for given stream
+
+```java -jar ncdssdk-client.jar -opt TOP -n 100 -topic GIDS```
+
+Get all available streams
+
+```java -jar ncdssdk-client.jar -opt TOPICS```
+
## Using the SDK
- ### Getting list of data stream available
- List all available data stream for the user.
+### Getting list of data stream available
+List all available data stream for the user.
```java
// Example1.java
NCDSClient ncdsClient = new NCDSClient();
@@ -150,7 +132,7 @@ for (String topicEntry : topics) {
}
```
- Example output:
+Example output:
```
List of streams available on Nasdaq Cloud DataService:
GIDS
@@ -158,8 +140,8 @@ NLSUTP
NLSCTA
```
- ### Getting schema for the stream
- This methods returns the schema for the stream in Apache Avro format (https://avro.apache.org/docs/current/spec.html).
+### Getting schema for the stream
+This methods returns the schema for the stream in Apache Avro format (https://avro.apache.org/docs/current/spec.html).
```java
// Example2.java
NCDSClient ncdsClient = new NCDSClient();
@@ -167,7 +149,7 @@ String topic = "GIDS";
String schema = ncdsClient.getSchemaForTheTopic(topic);
System.out.println(schema);
```
- Example output:
+Example output:
```
[ {
"type" : "record",
@@ -212,7 +194,7 @@ System.out.println(schema);
```
### Get first 10 messages of the stream
- This returns the first 10 available messages of the stream.
+This returns the first 10 available messages of the stream.
```java
// Example2.java
NCDSClient ncdsClient = new NCDSClient();
@@ -223,7 +205,7 @@ for (ConsumerRecord record : records) {
System.out.println("value:" + record.value().toString());
}
```
- Example output:
+Example output:
```
key:1
value:{"SoupPartition": 0, "SoupSequence": 1, "trackingID": 7238625218217, "msgType": "S", "event": "O"}
@@ -248,7 +230,7 @@ value:{"SoupPartition": 0, "SoupSequence": 10, "trackingID": 11231714853049, "ms
```
### Get first 10 messages of the stream from given timestamp
- This returns the first 10 available messages of the stream from given timestamp in milliseconds since the UNIX epoch.
+This returns the first 10 available messages of the stream from given timestamp in milliseconds since the UNIX epoch.
```java
// Example3.java
NCDSClient ncdsClient = new NCDSClient();
@@ -260,7 +242,7 @@ for (ConsumerRecord record : records) {
System.out.println("value:" + record.value().toString());
}
```
- Example output:
+Example output:
```
Offset: 105834100
Top 10 Records for the Topic:NLSCTA
@@ -287,13 +269,13 @@ value :{"SoupPartition": 0, "SoupSequence": 9362639, "trackingID": 5084560059456
```
### Get example message from stream
- Print message to the console for given message name.
+Print message to the console for given message name.
```java
NCDSClient ncdsClient = new NCDSClient();
String topic="GIDS"
ncdsClient.getSampleMessages(topic, "SeqIndexDirectory");
```
- Example output:
+Example output:
```
{"SoupPartition": 0, "SoupSequence": 193, "msgType": "R", "timeStamp": 224140137, "instrumentID": "NQJP3700LMCAD ", "disseminationFlag": "Y", "fpType": "I", "brand": "NQ", "series": "NQG", "strategy": "SEC", "assetType": "EQ", "marketCapSize": "X", "currency": "CAD", "geography": "JP ", "settlementType": " ", "calculationMethod": "PR ", "state": "A", "indexUsage": "L", "schedule": "ASI", "frequency": "ODCL", "numberOfIssueParticipation": 23, "baseValue": 100000000000000, "baseDate": 20140111, "instrumentName": "NASDAQ Japan Psnl & Hhld Goods Lg Md Cap CAD"}
```
@@ -330,7 +312,7 @@ while (true) {
}
```
- Example output:
+Example output:
```-----------------------------------------------------------------------------------------------
News :ReleaseTime: 2020/04/03 14:40:00
TransmissionID: A2136726
@@ -362,60 +344,56 @@ while (true) {
### Example syntax to run the Client Jar based on this SDK
1. To list of streams available on Nasdaq Cloud DataService
-
- ```java -jar ncdssdk-client.jar -opt TOPICS```
-
+
+```java -jar ncdssdk-client.jar -opt TOPICS```
+
2. To display the schema for the given topic
-
- ```java -jar ncdssdk-client.jar -opt SCHEMA -topic NLSUTP```
-
+
+```java -jar ncdssdk-client.jar -opt SCHEMA -topic NLSUTP```
+
3. To dump top n records from the given topic
-
- ```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP```
-
+
+```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP```
+
4. To use client based specific authorization file instead of using from the resources of client code base
- ```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -authprops clntauth.properties```
-
+```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -authprops clntauth.properties```
+
5. To use the specific kafka properties instead of using the kafka properties from the resources of the client base code
- ```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -kafkaprops kafkaprops.properties```
-
+```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -kafkaprops kafkaprops.properties```
+
6. To use the specific client based authorization file and specific kafka properties file
- ```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -authprops clntauth.properties -kafkaprops kafkaprops.properties```
-
-7. To display a specific message type
+```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -authprops clntauth.properties -kafkaprops kafkaprops.properties```
- ```java -jar ncdssdk-client.jar -opt GETMSG -topic UTPBIN-UF30 -msgName SeqTradeLong```
-
-8. To install the certificates
+7. To display a specific message type
- ```java -jar ncdssdk-client.jar -opt INSTALLCERTS -path /home/ec2-user/testInstallCerts -pass testuser```
+```java -jar ncdssdk-client.jar -opt GETMSG -topic UTPBIN-UF30 -msgName SeqTradeLong```
9. To dump top n records from the given topic from given timestamp in milliseconds since the UNIX epoch
- ```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -timestamp 1590084445610 ```
+```java -jar ncdssdk-client.jar -opt TOP -n 10 -topic NLSUTP -timestamp 1590084445610 ```
10. To get filtered stream by symbols or/and message-types
- ```java -jar ncdssdk-client.jar -opt FILTERSTREAM -topic NLSUTP -symbols AAPL,NDAQ -msgtypes SeqTradeReportMessage```
+```java -jar ncdssdk-client.jar -opt FILTERSTREAM -topic NLSUTP -symbols AAPL,NDAQ -msgtypes SeqTradeReportMessage```
11. To get all messages for given type messagtype
- ```java -jar ncdssdk-client.jar -opt GETALLMSGS -topic NLSUTP -msgName SeqTradeReportMessage```
+```java -jar ncdssdk-client.jar -opt GETALLMSGS -topic NLSUTP -msgName SeqTradeReportMessage```
-
-## Documentation
-
- An addition to the example application, there is extra documentation at the package and class level within the JavaDocs, which are located in project ```https://github.com/Nasdaq/CloudDataService/tree/master/ncds-sdk/docs```
-
- If you make an update, you can run `mvn javadocs:javadocs` to update documents.
+
+## Documentation
+
+An addition to the example application, there is extra documentation at the package and class level within the JavaDocs, which are located in project ```https://github.com/Nasdaq/CloudDataService/tree/master/ncds-sdk/docs```
+
+If you make an update, you can run `mvn javadocs:javadocs` to update documents.
## Docker
-
- Docker images are already configured to run the SDK. View the instructions (https://github.com/Nasdaq/CloudDataService/blob/master/docker/README.md)
+
+Docker images are already configured to run the SDK. View the instructions (https://github.com/Nasdaq/CloudDataService/blob/master/docker/README.md)
## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
@@ -424,4 +402,4 @@ Please make sure to update tests as appropriate.
## License
-Code and documentation released under the [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0)
+Code and documentation released under the [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0)
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 599b390..78f0ef1 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -18,10 +18,6 @@ COPY --from=builder /ncdssdk-client/target/ncdssdk-client.jar /home/kafka/app.ja
WORKDIR /home/kafka
-RUN mkdir truststore
-
-ENV JAVAX_NET_SSL_TRUSTSTORE=truststore/ncdsTrustStore.p12
-
ENTRYPOINT ["bash","docker/run-sdk-app.sh"]
CMD ["-opt", "TOPICS"]
diff --git a/docker/README.md b/docker/README.md
index 2c47a2a..5328598 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -6,21 +6,21 @@ Replace example `bootstrap.servers` property in the file kafka.properties (https
Replace example `oauth.token.endpoint.uri` property in the file auth.properties (https://github.com/Nasdaq/CloudDataService/blob/master/docker/auth.properties) with provided values during on-boarding.
## Building
-Run docker build in project home directory
-
+Run `docker build` in project home directory
+
```
docker build -f docker/Dockerfile . -t sdk-app --no-cache
```
-
+
## Running Locally Built Images
-Replace client id(`{clinet-id-value}`) and client secret(`{client-secret-value}`) provided during on-boarding from Nasdaq team. Also, provide the password (`{trsustore-pass}`) for java truststore.
+Replace client id(`{clinet-id-value}`) and client secret(`{client-secret-value}`) provided during on-boarding from Nasdaq team.
```
-docker run -e "OAUTH_CLIENT_ID={clinet-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value} -e "JAVAX_NET_SSL_TRUSTSTOREPASSWORD={trsustore-pass}" sdk-app:latest
+docker run -e "OAUTH_CLIENT_ID={clinet-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value} sdk-app:latest
```
-
- User can pass arguments to run the application with specific commands
+
+User can pass arguments to run the application with specific commands
```
-opt -- Provide the operation you want to perform \n" +
" * TOP - View the top nnn records in the Topic/Stream\n"+
@@ -34,11 +34,11 @@ docker run -e "OAUTH_CLIENT_ID={clinet-id-value}" -e "OAUTH_CLIENT_SECRET={clien
"-n -- Provide number of messages to retrieve --- REQUIRED for TOP \n"+
"-msgName -- Provide name of message based on schema --- REQUIRED for GETMSG \n"+
```
-
- Example to get `TOP 10` messages from GIDS stream
-
+
+Example to get `TOP 10` messages from GIDS stream
+
```
- docker run -e "OAUTH_CLIENT_ID={clinet-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value} -e "JAVAX_NET_SSL_TRUSTSTOREPASSWORD={trsustore-pass}" sdk-app:latest -opt TOP -n 10 -topic GIDS
+ docker run -e "OAUTH_CLIENT_ID={clinet-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value} sdk-app:latest -opt TOP -n 10 -topic GIDS
```
## Nasdaq Cloud Data Service - Kafka mirroring with MirrorMaker
diff --git a/docker/auth.properties b/docker/auth.properties
index f84931f..76dfbae 100644
--- a/docker/auth.properties
+++ b/docker/auth.properties
@@ -1,3 +1,2 @@
oauth.token.endpoint.uri=https://{auth_endpoint_url}/auth/realms/pro-realm/protocol/openid-connect/token
-oauth.username.claim=preferred_username
-javax.net.ssl.trustStoreType=PKCS12
+oauth.username.claim=preferred_username
\ No newline at end of file
diff --git a/docker/mirrormaker/README.md b/docker/mirrormaker/README.md
index 214e101..64ccc1c 100644
--- a/docker/mirrormaker/README.md
+++ b/docker/mirrormaker/README.md
@@ -2,18 +2,23 @@
This tool uses a Kafka consumer to consume messages from the source cluster, and re-publishes those messages to the local (target) cluster using an embedded Kafka producer. (https://kafka.apache.org/documentation.html#basic_ops_mirror_maker)
## Running Mirror Maker on docker
-This example shows how to setup standalone Mirror Maker instance application.
+This example shows how to setup standalone Mirror Maker instance application.
#### Consumer Configuration (NCDS cluster)
-- Replace example `bootstrap.servers` property in the file kafka.properties (https://github.com/Nasdaq/CloudDataService/blob/master/docker/mirrormaker/consumer.properties) with provided values during on-boarding.
+- Replace example `bootstrap.servers` property in the kafka.properties file (https://github.com/Nasdaq/CloudDataService/blob/master/docker/mirrormaker/consumer.properties) with provided values during on-boarding.
#### Producer Configuration (Target Cluster)
- The producer is the part of Mirror Maker that uses the data read by the and replicates it to the destination cluster.
- Update the producer.properties based target cluster. (https://github.com/Nasdaq/CloudDataService/tree/master/docker/mirrormaker/producer.properties)
- Make sure the bootstrap.server IPs, truststore location if using SSL, and password are correct.
+#### Group Id Suffix
+- User can customize the group id suffix (property name : `groupidsuffix`)
+ - If custom suffix is reused then consumer will start where it left off on restart
+ - If you don't pass a custom suffix, the SDK will create a new group id and start from the earliest message available on topic.
+
#### Creating docker build
-- Run docker build in the project home directory. (https://github.com/Nasdaq/CloudDataService)
+- Run `docker build` in the project home directory. (https://github.com/Nasdaq/CloudDataService)
```
docker build -f docker/Dockerfile . -t sdk-app --no-cache
@@ -21,24 +26,29 @@ docker build -f docker/Dockerfile . -t sdk-app --no-cache
#### Running mirror maker
- Run mirror maker for given topics list.
-- Replace client id(`{client-id-value}`) and client secret(`{client-secret-value}`) provided during on-boarding from Nasdaq team. Also, provide the password (`{truststore-pass}`) for java truststore.
+- Replace client id(`{client-id-value}`) and client secret(`{client-secret-value}`) provided during on-boarding from Nasdaq team.
```
-docker run -e "OAUTH_CLIENT_ID={client-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value} -e "JAVAX_NET_SSL_TRUSTSTOREPASSWORD={truststore-pass}" sdk-app:latest -opt mirrormaker -topics NLSUTP.stream
+docker run -e "OAUTH_CLIENT_ID={client-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value}" sdk-app:latest -opt mirrormaker -topics NLSUTP.stream
+```
+
+example with custom groupid suffix
+
+```
+docker run -e "OAUTH_CLIENT_ID={client-id-value}" -e "OAUTH_CLIENT_SECRET={client-secret-value}" sdk-app:latest -opt mirrormaker -topics NLSUTP.stream -groupidsuffix mycustomsuffix
```
## Deploying Kafka Mirror Maker on Strimzi kafka cluster
Strimzi is an open source project that provides container images and operators for running Apache Kafka on Kubernetes.(https://github.com/strimzi/strimzi-kafka-operator)
-The Cluster Operator deploys one or more Kafka Mirror Maker replicas to replicate data between Kafka clusters.
+The Cluster Operator deploys one or more Kafka Mirror Maker replicas to replicate data between Kafka clusters.
-### Prerequisites
+### Prerequisites
- Before deploying Kafka Mirror Maker, the Cluster Operator must be deployed.
### Deploying mirror maker
-- Download kafka bootstrap server certificate from NCDS endpoint and add that to Kubernetes secret.
- Create Kubernetes secret for Oauth Client Secret.
- Update Oauth Client Id in kafka-mirror-maker.yaml.
- Create a Kafka Mirror Maker cluster from the command-line:
- ```kubectl apply -f mirrormaker/template/kafka-mirror-maker.yaml```
+ ```kubectl apply -f mirrormaker/template/kafka-mirror-maker.yaml```
-Provided example script `install_mirror_maker.sh` to deploy the mirror maker in your cluster.
+Provided example script `install_mirror_maker.sh` to deploy the mirror maker in your cluster.
\ No newline at end of file
diff --git a/docker/mirrormaker/consumer.properties b/docker/mirrormaker/consumer.properties
index a4b7670..600e10a 100644
--- a/docker/mirrormaker/consumer.properties
+++ b/docker/mirrormaker/consumer.properties
@@ -1,8 +1,7 @@
bootstrap.servers={streams_endpoint_url}:9094
-ssl.endpoint.identification.algorithm=
group.id=
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
-auto.offset.reset=earliest
\ No newline at end of file
+auto.offset.reset=earliest
diff --git a/docker/mirrormaker/run-mirror-maker.sh b/docker/mirrormaker/run-mirror-maker.sh
index 38dc5b4..2ecb467 100644
--- a/docker/mirrormaker/run-mirror-maker.sh
+++ b/docker/mirrormaker/run-mirror-maker.sh
@@ -4,15 +4,14 @@ export OAUTH_TOKEN_ENDPOINT_URI="https://clouddataservice.auth.nasdaq.com/auth/r
export OAUTH_CLIENT_ID="$OAUTH_CLIENT_ID"
export OAUTH_CLIENT_SECRET=$OAUTH_CLIENT_SECRET
export OAUTH_USERNAME_CLAIM=preferred_username
-export KAFKA_OPTS=" \
- -Djavax.net.ssl.trustStore=/home/kafka/truststore/ncdsTrustStore.p12 \
- -Djavax.net.ssl.trustStorePassword=$JAVAX_NET_SSL_TRUSTSTOREPASSWORD \
- -Djavax.net.ssl.trustStoreType=PKCS12"
-uuid=$(uuidgen)
-groupid="$OAUTH_CLIENT_ID$uuid"
+topics=$1
+group_id_suffix=$2
+
+groupid="$OAUTH_CLIENT_ID$group_id_suffix"
+echo $groupid
#update the client Id in consumer
sed -i "s/group.id=/group.id=$groupid/" /home/kafka/docker/mirrormaker/consumer.properties
-/opt/kafka/bin/kafka-mirror-maker.sh --consumer.config /home/kafka/docker/mirrormaker/consumer.properties --producer.config /home/kafka/docker/mirrormaker/producer.properties --num.streams 3 --whitelist $@
\ No newline at end of file
+/opt/kafka/bin/kafka-mirror-maker.sh --consumer.config /home/kafka/docker/mirrormaker/consumer.properties --producer.config /home/kafka/docker/mirrormaker/producer.properties --num.streams 3 --whitelist $topics
diff --git a/docker/run-sdk-app.sh b/docker/run-sdk-app.sh
old mode 100644
new mode 100755
index 01e64cf..4941d83
--- a/docker/run-sdk-app.sh
+++ b/docker/run-sdk-app.sh
@@ -1,30 +1,28 @@
#!/usr/bin/env sh
-file="/home/kafka/truststore/ncdsTrustStore.p12"
-
-# Remove the existing truststore if exists
-if [ -f $file ] ; then
- rm $file
-fi
-
OPT=$2
-
#Function to get Topics
function get_topics {
- if [ $3 == "-topics" ]; then
+ if [[ $3 == "-topics" ]]; then
TOPICS=$4
else
TOPICS='.*'
fi
}
-# Install Trust Store
-java -jar app.jar -opt INSTALLCERTS -path /home/kafka/truststore -pass $JAVAX_NET_SSL_TRUSTSTOREPASSWORD &&
+function get_groupids_suffix {
+ if [[ $5 == "-groupidsuffix" ]]; then
+ group_id_suffix=$6
+ else
+ group_id_suffix=$(uuidgen)
+ fi
+}
-if [ $OPT == "mirrormaker" ]; then
+if [[ $OPT == "mirrormaker" ]]; then
get_topics $@
- bash /home/kafka/docker/mirrormaker/run-mirror-maker.sh $TOPICS
+ get_groupids_suffix $@
+ bash /home/kafka/docker/mirrormaker/run-mirror-maker.sh $TOPICS $group_id_suffix
else
# Run the user command
- java -Djavax.net.ssl.trustStore="/home/kafka/truststore/ncdsTrustStore.p12" -Djavax.net.ssl.trustStorePassword=$JAVAX_NET_SSL_TRUSTSTOREPASSWORD -Doauth.client.id=$OAUTH_CLIENT_ID -Doauth.client.secret=$OAUTH_CLIENT_SECRET -jar app.jar -kafkaprops /home/kafka/docker/kafka.properties -authprops /home/kafka/docker/auth.properties $@
-fi
\ No newline at end of file
+ java -Doauth.client.id=$OAUTH_CLIENT_ID -Doauth.client.secret=$OAUTH_CLIENT_SECRET -jar app.jar -kafkaprops /home/kafka/docker/kafka.properties -authprops /home/kafka/docker/auth.properties $@
+fi
diff --git a/ncds-sdk/pom.xml b/ncds-sdk/pom.xml
index 21d2d58..17b1d4d 100644
--- a/ncds-sdk/pom.xml
+++ b/ncds-sdk/pom.xml
@@ -7,8 +7,7 @@
com.nasdaq.ncdsncds
- 0.4.0
- ../pom.xml
+ 0.5.0ncds-sdk
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java
index 8875460..b177886 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java
@@ -34,7 +34,7 @@ public class NCDSClient {
*/
public NCDSClient(Properties securityCfg,Properties kafkaCfg) throws Exception {
try {
- if (securityCfg != null && AuthenticationConfigLoader.validateSecurityConfig(securityCfg)) {
+ if (securityCfg != null && AuthenticationConfigLoader.validateSecurityConfig(securityCfg, kafkaCfg)) {
nasdaqKafkaAvroConsumer = new NasdaqKafkaAvroConsumer(securityCfg,kafkaCfg);
}
else if (IsItJunit.isJUnitTest()) {
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java
index ecebaab..7cdd3d8 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java
@@ -14,9 +14,6 @@ public class AuthenticationConfigLoader {
public static String OAUTH_CLIENT_ID ="oauth.client.id";
public static String OAUTH_CLIENT_SECRET="oauth.client.secret";
public static String OAUTH_USERNAME_CLAIM="oauth.username.claim";
- public static String JAVAX_NET_SSL_TRUSTSTORE="javax.net.ssl.trustStore";
- public static String JAVAX_NET_SSL_TRUSTSTOREPASSWORD="javax.net.ssl.trustStorePassword";
- public static String JAVAX_NET_SSL_TRUSTSTORETYPE="javax.net.ssl.trustStoreType";
public static String getClientID(){
String clientID;
@@ -59,7 +56,7 @@ public static String getClientID(Properties cfg){
}
}
- public static boolean validateSecurityConfig(Properties cfg) throws Exception {
+ public static boolean validateSecurityConfig(Properties cfg, Properties kafkaCfg) throws Exception {
addNasdaqSpecificAuthProperties(cfg);
if (cfg.getProperty(OAUTH_TOKEN_ENDPOINT_URI) == null) {
@@ -74,15 +71,6 @@ public static boolean validateSecurityConfig(Properties cfg) throws Exception {
if (cfg.getProperty(OAUTH_USERNAME_CLAIM) == null) {
throw new Exception("Authentication Setting :" + OAUTH_USERNAME_CLAIM + " Missing" );
}
- if (cfg.getProperty(JAVAX_NET_SSL_TRUSTSTORE) == null && System.getenv("JAVAX_NET_SSL_TRUSTSTORE") == null) {
- throw new Exception("Authentication Setting :" + JAVAX_NET_SSL_TRUSTSTORE + " Missing" );
- }
- if (cfg.getProperty(JAVAX_NET_SSL_TRUSTSTOREPASSWORD) == null && System.getenv("JAVAX_NET_SSL_TRUSTSTOREPASSWORD") == null) {
- throw new Exception ("Authentication Setting :" + JAVAX_NET_SSL_TRUSTSTOREPASSWORD + " Missing" );
- }
- if (cfg.getProperty(JAVAX_NET_SSL_TRUSTSTORETYPE) == null) {
- throw new Exception ("Authentication Setting :" + JAVAX_NET_SSL_TRUSTSTORETYPE + " Missing" );
- }
return true;
}
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java
deleted file mode 100644
index d74622e..0000000
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package com.nasdaq.ncdsclient.internal.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.util.EntityUtils;
-import org.bouncycastle.asn1.x509.*;
-import org.json.JSONObject;
-import sun.security.x509.URIName;
-import sun.security.x509.X509CertImpl;
-
-import javax.net.ssl.HttpsURLConnection;
-import java.io.*;
-import java.net.URL;
-import java.security.KeyStore;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateFactory;
-import java.security.cert.X509Certificate;
-import java.util.List;
-
-
-public class InstallCertificates {
- private final CloseableHttpClient httpClient;
- private final String streamsCertPath;
- private final String trustStorePath;
- private final String keyStorePassword;
- private String oneTimeUrl;
- private final String authUrl = "http://clouddataservice.auth.nasdaq.com/";
- private final String kafkaCertUrl="https://clouddataservice.nasdaq.com/api/v1/get-certificate";
-
- public InstallCertificates (String keyStorePath, String password) {
- this.httpClient = HttpClients.createDefault();
- this.streamsCertPath = keyStorePath + File.separator + "streams.crt";
- this.trustStorePath = keyStorePath + File.separator +"ncdsTrustStore.p12";
- this.keyStorePassword = password;
- }
-
- public void install() throws Exception {
- installCertsToTrustStore(keyStorePassword);
- }
-
- private void downloadCertificates() throws Exception {
- try {
- URL certificateURL = new URL(getStreamsCertificate());
- FileUtils.copyURLToFile(certificateURL, new File(this.streamsCertPath));
- }catch (Exception e){
- System.out.println("Error Downloading Certificate");
- throw (e);
- }
- }
-
- private String getStreamsCertificate() throws Exception {
- HttpGet request = new HttpGet(kafkaCertUrl);
- request.setHeader(new BasicHeader("Prama", "no-cache"));
- request.setHeader(new BasicHeader("Cache-Control", "no-cache"));
- try (CloseableHttpResponse response = httpClient.execute(request)) {
-
- // Get HttpResponse Status
- if(response.getStatusLine().getStatusCode()!= 200){
- throw (new Exception("Internal Server Error"));
- }
-
- HttpEntity entity = response.getEntity();
-
- if (entity != null) {
-
- String result = EntityUtils.toString(entity);
- JSONObject obj = new JSONObject(result);
- oneTimeUrl = obj.get("one_time_url").toString();
- }
- this.httpClient.close();
- }
- return oneTimeUrl;
- }
-
- private void createKeyStore(String password) throws Exception {
- try {
- KeyStore keystore = KeyStore.getInstance("PKCS12"); //X.509, PKCS12
- File file = new File(trustStorePath);
-
- if (!file.exists()) {
- keystore.load(null, password.toCharArray());
- try (FileOutputStream fos = new FileOutputStream(trustStorePath)) {
- keystore.store(fos, password.toCharArray());
- }
- }
- }catch (Exception e){
- System.out.println("Error Creating Key Store");
- throw (e);
- }
- }
-
- private void installCertsToTrustStore(String password) throws Exception {
- try {
- //Download Kafka Certificates
- downloadCertificates();
-
- //create Keystore if it doesnt exists
- createKeyStore(password);
-
- KeyStore keystore = KeyStore.getInstance("PKCS12"); //X.509, PKCS12
-
- //Loading keystore
- keystore.load(new FileInputStream(trustStorePath), password.toCharArray());
-
- // Opening streams certificate
- CertificateFactory cf = CertificateFactory.getInstance("X.509");
- Certificate cert = cf.generateCertificate(new FileInputStream(streamsCertPath));
-
- //adding streams certificate
- keystore.setCertificateEntry("streams", cert);
-
- // Adding auth certificate
- keystore.setCertificateEntry("auth", getAuthCertificate() );
-
- try (FileOutputStream fos = new FileOutputStream(trustStorePath)) {
- keystore.store(fos, password.toCharArray());
- }
- } catch (Exception e){
- System.out.println("Error installing Certificate");
- throw (e);
- }
-
- }
-
- private Certificate getAuthCertificate() throws Exception {
- URL url = new URL(null, authUrl, new sun.net.www.protocol.https.Handler());
- HttpsURLConnection con = (HttpsURLConnection)url.openConnection();
- con.connect();
- Certificate[] certs = con.getServerCertificates();
- CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
- for(Certificate cert : certs) {
- List descriptions = ((X509CertImpl) cert).getAuthorityInfoAccessExtension().getAccessDescriptions();
- for (sun.security.x509.AccessDescription ad : descriptions) {
- // check if it's a URL to issuer's certificate
- if (ad.getAccessMethod().toString().equals(X509ObjectIdentifiers.id_ad_caIssuers.toString())) {
- sun.security.x509.GeneralName location = ad.getAccessLocation();
- if (location.getType() == GeneralName.uniformResourceIdentifier) {
- // Get issuer's URL
- String issuerUrl = ((URIName) location.getName()).getURI().toString();
- URL url1 = new URL(issuerUrl);
- X509Certificate issuer = (X509Certificate) certificateFactory.generateCertificate(url1.openStream());
- return issuer;
- }
- }
- }
- }
- return null;
- }
-}
diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java
index 3697e55..525ea2c 100644
--- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java
+++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java
@@ -9,7 +9,11 @@
* Utility to load the kafka configuration parameters.
*/
public class KafkaConfigLoader {
- public static String BOOTSTRAP_SERVERS="bootstrap.servers";
+ public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+ public static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type";
+ public static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location";
+ public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password";
+
public static Properties loadConfig() throws Exception {
Properties cfg = new Properties();
InputStream inputStream;
@@ -35,7 +39,6 @@ private static Properties nasdaqSepecificConfig(Properties p) throws KafkaProper
p.setProperty("sasl.mechanism", "OAUTHBEARER");
p.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;");
p.setProperty("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
- p.setProperty("ssl.endpoint.identification.algorithm","");
}
return p;
}
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractKafkaTestResource.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractKafkaTestResource.java
deleted file mode 100644
index 3428f18..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractKafkaTestResource.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-import com.nasdaq.ncdsclient.listeners.BrokerListener;
-import com.nasdaq.ncdsclient.listeners.PlainListener;
-
-import java.util.Properties;
-
-/**
- * Shared code between JUnit4 and JUnit5 shared resources.
- * @param The concrete implementation of this class, to allow for method chaining.
- */
-public abstract class AbstractKafkaTestResource> {
- /**
- * Our internal Kafka Test Server instance.
- */
- private KafkaCluster kafkaCluster = null;
-
- /**
- * Additional broker properties.
- */
- private final Properties brokerProperties = new Properties();
-
- /**
- * How many brokers to put into the cluster.
- */
- private int numberOfBrokers = 1;
-
- /**
- * Defines which listener has been set to be configured on the brokers.
- */
- private BrokerListener registeredListener = new PlainListener();
-
- /**
- * Default constructor.
- */
- public AbstractKafkaTestResource() {
- this(new Properties());
- }
-
- /**
- * Constructor allowing passing additional broker properties.
- * @param brokerProperties properties for Kafka broker.
- */
- public AbstractKafkaTestResource(final Properties brokerProperties) {
- this.brokerProperties.putAll(brokerProperties);
- }
-
- /**
- * Helper to allow overriding Kafka broker properties. Can only be called prior to the service
- * being started.
- * @param name Kafka broker configuration property name.
- * @param value Value to set for the configuration property.
- * @return SharedKafkaTestResource instance for method chaining.
- * @throws IllegalArgumentException if name argument is null.
- * @throws IllegalStateException if method called after service has started.
- */
- @SuppressWarnings("unchecked")
- public T withBrokerProperty(final String name, final String value) {
- // Validate state.
- validateState(false, "Cannot add properties after service has started.");
-
- // Validate input.
- if (name == null) {
- throw new IllegalArgumentException("Cannot pass null name argument");
- }
-
- // Add or set property.
- if (value == null) {
- brokerProperties.remove(name);
- } else {
- brokerProperties.put(name, value);
- }
- return (T) this;
- }
-
- /**
- * Set how many brokers to include in the test cluster.
- * @param brokerCount The number of brokers.
- * @return SharedKafkaTestResource for method chaining.
- */
- @SuppressWarnings("unchecked")
- public T withBrokers(final int brokerCount) {
- // Validate state.
- validateState(false, "Cannot set brokers after service has started.");
-
- if (brokerCount < 1) {
- throw new IllegalArgumentException("Cannot have 0 or fewer brokers");
- }
- this.numberOfBrokers = brokerCount;
- return (T) this;
- }
-
- /**
- * Register additional listeners on the kafka brokers.
- * @param listener listener instance to register.
- * @return SharedKafkaTestResource for method chaining.
- */
- @SuppressWarnings("unchecked")
- public T registerListener(final BrokerListener listener) {
- if (listener == null) {
- throw new IllegalArgumentException("Listener argument may not be null.");
- }
- registeredListener = listener;
- return (T) this;
- }
-
- /**
- * KafkaTestUtils is a collection of re-usable/common access patterns for interacting with the Kafka cluster.
- * @return Instance of KafkaTestUtils configured to operate on the Kafka cluster.
- */
- public KafkaTestUtils getKafkaTestUtils() {
- // Validate internal state.
- validateState(true, "Cannot access KafkaTestUtils before Kafka service has been started.");
- return new KafkaTestUtils(kafkaCluster);
- }
-
- /**
- * Returns connection string for zookeeper clients.
- * @return Connection string to connect to the Zookeeper instance.
- */
- public String getZookeeperConnectString() {
- validateState(true, "Cannot access Zookeeper before service has been started.");
- return kafkaCluster.getZookeeperConnectString();
- }
-
- /**
- * bootstrap.servers string to configure Kafka consumers or producers to access the Kafka cluster.
- * @return Connect string to use for Kafka clients.
- */
- public String getKafkaConnectString() {
- validateState(true, "Cannot access Kafka before service has been started.");
- return kafkaCluster.getKafkaConnectString();
- }
-
- /**
- * Returns an immutable list of broker hosts for the kafka cluster.
- * @return immutable list of hosts for brokers within the cluster.
- */
- public KafkaBrokers getKafkaBrokers() {
- validateState(true, "Cannot access Kafka before service has been started.");
- return kafkaCluster.getKafkaBrokers();
- }
-
- /**
- * Returns all registered listener.
- * @return The configured listener.
- */
- protected BrokerListener getRegisteredListener() {
- return registeredListener;
- }
-
- protected KafkaCluster getKafkaCluster() {
- return kafkaCluster;
- }
-
- protected void setKafkaCluster(final KafkaCluster kafkaCluster) {
- this.kafkaCluster = kafkaCluster;
- }
-
- protected Properties getBrokerProperties() {
- return brokerProperties;
- }
-
- protected int getNumberOfBrokers() {
- return numberOfBrokers;
- }
-
- /**
- * Helper method for ensure state consistency.
- * @param shouldKafkaExistYet True if KafkaCluster should exist, false if it should not.
- * @param errorMessage Error message to throw if the state is not consistent.
- * @throws IllegalStateException if the kafkaCluster state is not consistent.
- */
- protected void validateState(final boolean shouldKafkaExistYet, final String errorMessage) throws IllegalStateException {
- if (shouldKafkaExistYet && kafkaCluster == null) {
- throw new IllegalStateException(errorMessage);
- } else if (!shouldKafkaExistYet && kafkaCluster != null) {
- throw new IllegalStateException(errorMessage);
- }
- }
-}
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractZookeeperTestResource.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractZookeeperTestResource.java
deleted file mode 100644
index a2c6c3b..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/AbstractZookeeperTestResource.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-public abstract class AbstractZookeeperTestResource {
- /**
- * Internal Zookeeper test server instance.
- */
- private final ZookeeperTestServer zookeeperTestServer = new ZookeeperTestServer();
-
- /**
- * Access to the underlying zookeeper test server instance.
- * @return Shared Zookeeper test server instance.
- * @throws IllegalStateException if before() has not been called yet.
- */
- public ZookeeperTestServer getZookeeperTestServer() throws IllegalStateException {
- return zookeeperTestServer;
- }
-
- /**
- * Returns connection string for zookeeper clients.
- * @return Connection string to connect to the Zookeeper instance.
- * @throws IllegalStateException if before() has not been called yet.
- */
- public String getZookeeperConnectString() throws IllegalStateException {
- return zookeeperTestServer.getConnectString();
- }
-}
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBroker.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBroker.java
deleted file mode 100644
index 26ce4ca..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBroker.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-/**
- * Contains information about a single Kafka broker within a cluster.
- * Provides accessors to get connection information for a specific broker, as well as
- * the ability to individually start/stop a specific Broker.
- */
-public class KafkaBroker {
-
- private final KafkaTestServer kafkaTestServer;
-
- /**
- * Constructor.
- * @param kafkaTestServer Internal KafkaTestServer instance.
- */
- public KafkaBroker(final KafkaTestServer kafkaTestServer) {
- this.kafkaTestServer = kafkaTestServer;
- }
-
- public int getBrokerId() {
- return kafkaTestServer.getBrokerId();
- }
-
- /**
- * bootstrap.servers string to configure Kafka consumers or producers to access the Kafka cluster.
- * @return Connect string to use for Kafka clients.
- */
- public String getConnectString() {
- return kafkaTestServer.getKafkaConnectString();
- }
-
- /**
- * Starts the Kafka broker.
- * @throws Exception on startup errors.
- */
- public void start() throws Exception {
- kafkaTestServer.start();
- }
-
- /**
- * Stop/shutdown Kafka broker.
- * @throws Exception on shutdown errors.
- */
- public void stop() throws Exception {
- kafkaTestServer.stop();
- }
-
- @Override
- public String toString() {
- return "KafkaBroker{"
- + "brokerId=" + getBrokerId()
- + ", connectString='" + getConnectString() + '\''
- + '}';
- }
-}
\ No newline at end of file
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBrokers.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBrokers.java
deleted file mode 100644
index 917a644..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaBrokers.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Spliterator;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Wrapper containing immutable list of Kafka Brokers indexable by their brokerId.
- */
-public class KafkaBrokers implements Iterable {
- /**
- * Immutable mapping of brokerId to KafkaBroker instance.
- */
- private Map brokerMap;
-
- /**
- * Constructor.
- * @param brokers List of KafkaBrokers in a cluster.
- */
- public KafkaBrokers(final Collection brokers) {
- // Build immutable map.
- this.brokerMap = Collections.unmodifiableMap(brokers
- .stream()
- .collect(Collectors.toMap(KafkaBroker::getBrokerId, Function.identity()))
- );
- }
-
- /**
- * Lookup and return Kafka Broker by its id.
- * @param brokerId id of the broker.
- * @return KafkaBroker.
- * @throws IllegalArgumentException if requested an invalid broker id.
- */
- public KafkaBroker getBrokerById(final int brokerId) throws IllegalArgumentException {
- if (!brokerMap.containsKey(brokerId)) {
- throw new IllegalArgumentException("No broker exists with id " + brokerId);
- }
- return brokerMap.get(brokerId);
- }
-
- /**
- * Returns an immutable list of KafkaBroker instances.
- * @return Immutable List of Brokers.
- */
- public List asList() {
- return Collections.unmodifiableList(
- new ArrayList<>(brokerMap.values())
- );
- }
-
- /**
- * Returns a stream of brokers.
- * @return Stream of Brokers.
- */
- public Stream stream() {
- return asList().stream();
- }
-
- /**
- * Returns how many brokers are represented.
- * @return Number of brokers.
- */
- public int size() {
- return brokerMap.size();
- }
-
- @Override
- public Iterator iterator() {
- return asList().iterator();
- }
-
- @Override
- public void forEach(final Consumer super KafkaBroker> action) {
- asList().forEach(action);
- }
-
- @Override
- public Spliterator spliterator() {
- return asList().spliterator();
- }
-
- @Override
- public String toString() {
- return "KafkaBrokers{"
- + asList()
- + '}';
- }
-}
\ No newline at end of file
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaCluster.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaCluster.java
deleted file mode 100644
index 022964e..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaCluster.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-/**
- * This interface abstracts away knowing if the underlying 'kafka cluster' is a single server (KafkaTestServer)
- * or a cluster of 1 or more brokers (KafkaTestCluster).
- */
-public interface KafkaCluster extends KafkaProvider, AutoCloseable {
-
- /**
- * Creates and starts ZooKeeper and Kafka server instances.
- * @throws Exception on startup errors.
- */
- void start() throws Exception;
-
- /**
- * Closes the internal servers. Failing to call this at the end of your tests will likely
- * result in leaking instances.
- */
- void close() throws Exception;
-}
\ No newline at end of file
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaProvider.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaProvider.java
deleted file mode 100644
index 986773f..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaProvider.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-import java.util.List;
-
-/**
- * Provides a slimmed down view onto KafkaCluster to avoid circular references in code.
- */
-public interface KafkaProvider {
- /**
- * Returns an immutable list of broker hosts for the kafka cluster.
- * @return immutable list of hosts for brokers within the cluster.
- */
- KafkaBrokers getKafkaBrokers();
-
- /**
- * bootstrap.servers string to configure Kafka consumers or producers to access the Kafka cluster.
- * @return Connect string to use for Kafka clients.
- */
- String getKafkaConnectString();
-
- /**
- * Connection details about each of the registered listeners on the kafka broker.
- * @return details about each of the registered listeners on the kafka broker.
- */
- List getListenerProperties();
-
- /**
- * Returns connection string for zookeeper clients.
- * @return Connection string to connect to the Zookeeper instance.
- */
- String getZookeeperConnectString();
-}
\ No newline at end of file
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestCluster.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestCluster.java
deleted file mode 100644
index 7d75af4..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestCluster.java
+++ /dev/null
@@ -1,295 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-import com.nasdaq.ncdsclient.listeners.BrokerListener;
-import com.nasdaq.ncdsclient.listeners.PlainListener;
-import org.apache.kafka.common.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * Utility for setting up a Cluster of KafkaTestServers.
- */
-public class KafkaTestCluster implements KafkaCluster, KafkaProvider, AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(KafkaTestCluster.class);
-
- /**
- * Clock instance.
- */
- private final Clock clock = Clock.systemUTC();
-
- /**
- * Internal Test Zookeeper service shared by all Kafka brokers.
- */
- private final ZookeeperTestServer zkTestServer = new ZookeeperTestServer();
-
- /**
- * Defines how many brokers will be created and started within the cluster.
- */
- private final int numberOfBrokers;
-
- /**
- * Defines overridden broker properties.
- */
- private final Properties overrideBrokerProperties = new Properties();
-
- /**
- * Collection of listeners that get registered with the broker.
- */
- private final List registeredListeners;
-
- /**
- * List containing all of the brokers in the cluster. Since each broker is quickly accessible via it's 'brokerId' property
- * by simply removing 1 from it's id and using it as the index into the list.
- *
- * Example: to get brokerId = 4, retrieve index 3 in this list.
- */
- private final List brokers = new ArrayList<>();
-
- /**
- * Constructor.
- * @param numberOfBrokers How many brokers you want in your Kafka cluster.
- */
- public KafkaTestCluster(final int numberOfBrokers) {
- this(numberOfBrokers, new Properties(), Collections.emptyList());
- }
-
- /**
- * Constructor.
- * @param numberOfBrokers How many brokers you want in your Kafka cluster.
- * @param overrideBrokerProperties Define Kafka broker properties.
- */
- public KafkaTestCluster(final int numberOfBrokers, final Properties overrideBrokerProperties) {
- this(numberOfBrokers, overrideBrokerProperties, Collections.emptyList());
- }
-
- /**
- * Constructor.
- * @param numberOfBrokers How many brokers you want in your Kafka cluster.
- * @param overrideBrokerProperties Define Kafka broker properties.
- * @param listeners List of listeners to register on each broker.
- */
- public KafkaTestCluster(
- final int numberOfBrokers,
- final Properties overrideBrokerProperties,
- final Collection listeners
- ) {
- if (numberOfBrokers <= 0) {
- throw new IllegalArgumentException("numberOfBrokers argument must be 1 or larger.");
- }
- if (overrideBrokerProperties == null) {
- throw new IllegalArgumentException("overrideBrokerProperties argument must not be null.");
- }
-
- final List brokerListeners = new ArrayList<>();
- if (listeners == null || listeners.isEmpty()) {
- // If we have no listeners defined, use default plain listener.
- brokerListeners.add(new PlainListener());
- } else {
- brokerListeners.addAll(listeners);
- }
-
- // Save references.
- this.numberOfBrokers = numberOfBrokers;
- this.overrideBrokerProperties.putAll(overrideBrokerProperties);
- this.registeredListeners = Collections.unmodifiableList(brokerListeners);
- }
-
- /**
- * Starts the cluster.
- * @throws Exception on startup errors.
- * @throws TimeoutException When the cluster fails to start up within a timely manner.
- */
- public void start() throws Exception, TimeoutException {
- // Ensure zookeeper instance has been started.
- zkTestServer.start();
-
- // If we have no brokers defined yet...
- if (brokers.isEmpty()) {
- // Loop over brokers, starting with brokerId 1.
- for (int brokerId = 1; brokerId <= numberOfBrokers; brokerId++) {
- // Create properties for brokers
- final Properties brokerProperties = new Properties();
-
- // Add user defined properties.
- brokerProperties.putAll(overrideBrokerProperties);
-
- // Set broker.id
- brokerProperties.put("broker.id", String.valueOf(brokerId));
-
- // Create new KafkaTestServer and add to our broker list
- brokers.add(
- new KafkaTestServer(brokerProperties, zkTestServer, registeredListeners)
- );
- }
- }
-
- // Loop over each broker and start it
- for (final KafkaTestServer broker : brokers) {
- broker.start();
- }
-
- // Block until the cluster is 'up' or the timeout is exceeded.
- waitUntilClusterReady(10_000L);
- }
-
- /**
- * Returns an immutable list of broker hosts for the kafka cluster.
- * @return immutable list of hosts for brokers within the cluster.
- */
- @Override
- public KafkaBrokers getKafkaBrokers() {
- // If we have no brokers yet, the cluster has not yet started.
- if (brokers.isEmpty()) {
- throw new IllegalStateException("Cannot access brokers before cluster has been started.");
- }
-
- return new KafkaBrokers(
- brokers
- .stream()
- .flatMap((kafkaTestServer) -> (kafkaTestServer.getKafkaBrokers().stream()))
- .collect(toList())
- );
- }
-
- /**
- * Retrieve a broker by its brokerId.
- * @param brokerId the Id of the broker to retrieve.
- * @return KafkaTestServer instance for the given broker Id.
- */
- public KafkaBroker getKafkaBrokerById(final int brokerId) {
- // If we have no brokers yet, the cluster has not yet started.
- if (brokers.isEmpty()) {
- throw new IllegalStateException("Cannot access brokers before cluster has been started.");
- }
-
- // Find the requested broker.
- final Optional kafkaTestServer = brokers
- .stream()
- .filter((testServer) -> testServer.getBrokerId() == brokerId)
- .findFirst();
-
- // If we found a match
- if (kafkaTestServer.isPresent()) {
- // Return it!
- return new KafkaBroker(kafkaTestServer.get());
- }
- // Otherwise toss an IllegalArgument exception.
- throw new IllegalArgumentException("Broker with id " + brokerId + " does not exist.");
- }
-
- /**
- * bootstrap.servers string to configure Kafka consumers or producers to access the Kafka cluster.
- * @return Connect string to use for Kafka clients.
- */
- public String getKafkaConnectString() {
- // If we have no brokers yet, the cluster has not yet started.
- if (brokers.isEmpty()) {
- throw new IllegalStateException("Cannot access brokers before cluster has been started.");
- }
-
- return brokers
- .stream()
- .map((KafkaTestServer::getKafkaConnectString))
- .collect(Collectors.joining(","));
- }
-
- @Override
- public List getListenerProperties() {
- // Collect all the properties from all brokers in the cluster.
- final List listenerProperties = new ArrayList<>();
- brokers.forEach((broker) -> {
- listenerProperties.addAll(broker.getListenerProperties());
- });
- return Collections.unmodifiableList(listenerProperties);
- }
-
- /**
- * Returns connection string for zookeeper clients.
- * @return Connection string to connect to the Zookeeper instance.
- */
- public String getZookeeperConnectString() {
- return zkTestServer.getConnectString();
- }
-
- /**
- * Shuts the cluster down.
- * @throws Exception on shutdown errors.
- */
- public void stop() throws Exception {
- // Loop over brokers
- for (final KafkaTestServer kafkaBroker : brokers) {
- kafkaBroker.stop();
- }
-
- // Stop zkServer
- zkTestServer.stop();
- }
-
- /**
- * Alias for stop().
- * @throws Exception on shutdown errors.
- */
- @Override
- public void close() throws Exception {
- stop();
- }
-
- /**
- * This method will block up to timeoutMs milliseconds waiting for the cluster to become available and ready.
- * @param timeoutMs How long to block for, in milliseconds.
- * @throws TimeoutException if the timeout period is exceeded.
- */
- private void waitUntilClusterReady(final long timeoutMs) throws TimeoutException {
- // Get AdminClient for cluster.
- final KafkaTestUtils kafkaTestUtils = new KafkaTestUtils(this);
-
- // Start looping.
- final long startTime = clock.millis();
- int numberOfBrokersReady = 0;
- do {
- try {
- // Ask for the nodes in the cluster.
- final Collection nodes = kafkaTestUtils.describeClusterNodes();
-
- // We should know how many nodes there are
- if (nodes.size() >= numberOfBrokers) {
- // Looks like the cluster is ready to go.
- logger.info("Found {} brokers on-line, cluster is ready.", nodes.size());
- return;
- }
-
- // Log when brokers found on-line changes.
- if (nodes.size() > numberOfBrokersReady) {
- numberOfBrokersReady = nodes.size();
- logger.info(
- "Found {} of {} brokers ready, continuing to wait for cluster to start.",
- numberOfBrokersReady,
- nodes.size()
- );
- }
-
- // Small wait to throttle cycling.
- Thread.sleep(100);
- } catch (final InterruptedException exception) {
- // Caught interrupt, break out of loop.
- break;
- }
- }
- while (clock.millis() <= startTime + timeoutMs);
-
- // If we got here, throw timeout exception
- throw new TimeoutException("Cluster failed to come online within " + timeoutMs + " milliseconds.");
- }
-}
\ No newline at end of file
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestServer.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestServer.java
deleted file mode 100644
index 3bc04f2..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestServer.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-import com.nasdaq.ncdsclient.listeners.BrokerListener;
-import com.nasdaq.ncdsclient.listeners.PlainListener;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import org.apache.curator.test.InstanceSpec;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-/**
- * This will spin up a ZooKeeper and Kafka server for use in integration tests. Simply
- * create an instance of KafkaTestServer and call start() and you can publish to Kafka
- * topics in an integration test. Be sure to call shutdown() when the test is complete
- * or use the AutoCloseable interface.
- */
-public class KafkaTestServer implements KafkaCluster, KafkaProvider, AutoCloseable {
- /**
- * This defines the hostname the kafka instance will listen on by default.
- * To alter the value used, set broker override property 'host.name'
- */
- private static final String DEFAULT_HOSTNAME = "localhost";
-
- /**
- * Internal Test Kafka service.
- */
- private KafkaServerStartable broker;
-
- /**
- * Holds the broker configuration.
- */
- private KafkaConfig brokerConfig;
-
- /**
- * Internal Test Zookeeper service.
- */
- private ZookeeperTestServer zookeeperTestServer;
-
- /**
- * Flag to know if we are managing the zookeeper server.
- */
- private boolean isManagingZookeeper = true;
-
- /**
- * Defines overridden broker properties.
- */
- private final Properties overrideBrokerProperties = new Properties();
-
- /**
- * Definition of listeners defined on the broker.
- */
- private final List registeredListeners;
-
- /**
- * Properties defining how to connect to each available/registered listener on the broker.
- */
- private final List listenerProperties = new ArrayList<>();
-
- /**
- * Default constructor, no overridden broker properties.
- */
- public KafkaTestServer() {
- this(new Properties());
- }
-
- /**
- * Alternative constructor allowing override of brokerProperties.
- * @param overrideBrokerProperties Define Kafka broker properties.
- * @throws IllegalArgumentException if overrideBrokerProperties argument is null.
- */
- public KafkaTestServer(final Properties overrideBrokerProperties) throws IllegalArgumentException {
- this(overrideBrokerProperties, new ArrayList<>());
- }
-
- /**
- * Alternative constructor allowing override of brokerProperties.
- * @param overrideBrokerProperties Define Kafka broker properties.
- * @param listeners Listener definitions.
- * @throws IllegalArgumentException if overrideBrokerProperties argument is null.
- */
- public KafkaTestServer(
- final Properties overrideBrokerProperties,
- final Collection listeners
- ) throws IllegalArgumentException {
- // Validate argument.
- if (overrideBrokerProperties == null) {
- throw new IllegalArgumentException("Cannot pass null overrideBrokerProperties argument.");
- }
-
- final List brokerListeners = new ArrayList<>();
- if (listeners == null || listeners.isEmpty()) {
- brokerListeners.add(new PlainListener());
- } else {
- brokerListeners.addAll(listeners);
- }
-
- // Add passed in properties.
- this.overrideBrokerProperties.putAll(overrideBrokerProperties);
- this.registeredListeners = Collections.unmodifiableList(new ArrayList<>(brokerListeners));
- }
-
- /**
- * Package protected constructor allowing override of ZookeeperTestServer instance.
- * @param overrideBrokerProperties Define Kafka broker properties.
- * @param zookeeperTestServer Zookeeper server instance to use.
- */
- KafkaTestServer(
- final Properties overrideBrokerProperties,
- final ZookeeperTestServer zookeeperTestServer,
- final Collection listeners
- ) {
- this(overrideBrokerProperties, listeners);
-
- // If instance is passed,
- if (zookeeperTestServer != null) {
- // We are no longer in charge of managing it.
- isManagingZookeeper = false;
- }
- // Save reference.
- this.zookeeperTestServer = zookeeperTestServer;
- }
-
- /**
- * bootstrap.servers string to configure Kafka consumers or producers to access the Kafka cluster.
- * @return Connect string to use for Kafka clients.
- */
- @Override
- public String getKafkaConnectString() {
- validateState(true, "Cannot get connect string prior to service being started.");
-
- // Return all of the connection properties.
- return listenerProperties.stream()
- .map(ListenerProperties::getConnectString)
- .collect(Collectors.joining(","));
- }
-
- @Override
- public List getListenerProperties() {
- return Collections.unmodifiableList(listenerProperties);
- }
-
- /**
- * Returns an immutable list of broker hosts for the kafka cluster.
- * @return immutable list of hosts for brokers within the cluster.
- */
- @Override
- public KafkaBrokers getKafkaBrokers() {
- validateState(true, "Cannot get brokers before service has been started.");
-
- return new KafkaBrokers(
- Collections.singletonList(new KafkaBroker(this))
- );
- }
-
- /**
- * Returns the brokers Id.
- * @return This brokers Id.
- */
- public int getBrokerId() {
- validateState(true, "Cannot get brokerId prior to service being started.");
- return brokerConfig.brokerId();
- }
-
- /**
- * Returns properly formatted zookeeper connection string for zookeeper clients.
- * @return Connect string to use for Zookeeper clients.
- */
- public String getZookeeperConnectString() {
- validateState(true, "Cannot get connect string prior to service being started.");
- return zookeeperTestServer.getConnectString();
- }
-
- /**
- * Creates and starts ZooKeeper and Kafka server instances.
- * @throws Exception on startup errors.
- */
- public void start() throws Exception {
- // If we have no zkTestServer instance
- if (zookeeperTestServer == null) {
- // Create it.
- zookeeperTestServer = new ZookeeperTestServer();
- }
-
- // If we're managing the zookeeper instance
- if (isManagingZookeeper) {
- // Call restart which allows us to restart KafkaTestServer instance w/o issues.
- zookeeperTestServer.restart();
- } else {
- // If we aren't managing the Zookeeper instance, call start() to ensure it's been started.
- // Starting an already started instance is a NOOP.
- zookeeperTestServer.start();
- }
-
- // If broker has not yet been created...
- if (broker == null) {
- // Build properties using a baseline from overrideBrokerProperties.
- final Properties brokerProperties = new Properties();
- brokerProperties.putAll(overrideBrokerProperties);
-
- // Put required zookeeper connection properties.
- setPropertyIfNotSet(brokerProperties, "zookeeper.connect", zookeeperTestServer.getConnectString());
-
- // If log.dir is not set.
- if (brokerProperties.getProperty("log.dir") == null) {
- // Create temp path to store logs and set property.
- brokerProperties.setProperty("log.dir", Utils.createTempDirectory().getAbsolutePath());
- }
-
- // Ensure that we're advertising correct hostname appropriately
- setPropertyIfNotSet(brokerProperties, "host.name", getConfiguredHostname());
-
- // Set other defaults if not defined.
- setPropertyIfNotSet(brokerProperties, "auto.create.topics.enable", "true");
- setPropertyIfNotSet(brokerProperties, "zookeeper.session.timeout.ms", "30000");
- setPropertyIfNotSet(brokerProperties, "broker.id", "1");
- setPropertyIfNotSet(brokerProperties, "auto.offset.reset", "latest");
-
- // Lower active threads.
- setPropertyIfNotSet(brokerProperties, "num.io.threads", "2");
- setPropertyIfNotSet(brokerProperties, "num.network.threads", "2");
- setPropertyIfNotSet(brokerProperties, "log.flush.interval.messages", "1");
-
- // Define replication factor for internal topics to 1
- setPropertyIfNotSet(brokerProperties, "offsets.topic.replication.factor", "1");
- setPropertyIfNotSet(brokerProperties, "offset.storage.replication.factor", "1");
- setPropertyIfNotSet(brokerProperties, "transaction.state.log.replication.factor", "1");
- setPropertyIfNotSet(brokerProperties, "transaction.state.log.min.isr", "1");
- setPropertyIfNotSet(brokerProperties, "transaction.state.log.num.partitions", "4");
- setPropertyIfNotSet(brokerProperties, "config.storage.replication.factor", "1");
- setPropertyIfNotSet(brokerProperties, "status.storage.replication.factor", "1");
- setPropertyIfNotSet(brokerProperties, "default.replication.factor", "1");
-
- // Loop over registered listeners and add each
- for (final BrokerListener listener : registeredListeners) {
- // Generate port to listen on.
- int port = InstanceSpec.getRandomPort();
- port = 9095;
- final String listenerDefinition = listener.getProtocol() + "://" + getConfiguredHostname() + ":" + port;
- listenerProperties.add(
- new ListenerProperties(listener.getProtocol(), listenerDefinition, listener.getClientProperties())
- );
- appendProperty(brokerProperties, "advertised.listeners", listenerDefinition);
- appendProperty(brokerProperties,"listeners", listenerDefinition);
-
- // Apply other options
- brokerProperties.putAll(listener.getBrokerProperties());
- }
-
- // Retain the brokerConfig.
- brokerConfig = new KafkaConfig(brokerProperties);
-
- // Create and start kafka service.
- broker = new KafkaServerStartable(brokerConfig);
- }
- // Start broker.
- broker.startup();
- }
-
- /**
- * Closes the internal servers. Failing to call this at the end of your tests will likely
- * result in leaking instances.
- *
- * Provided alongside close() to stay consistent with start().
- * @throws Exception on shutdown errors.
- */
- public void stop() throws Exception {
- close();
- }
-
- /**
- * Closes the internal servers. Failing to call this at the end of your tests will likely
- * result in leaking instances.
- * @throws Exception on shutdown errors.
- */
- @Override
- public void close() throws Exception {
- if (broker != null) {
- // Shutdown and reset.
- broker.shutdown();
- }
-
- // Conditionally close zookeeper
- if (zookeeperTestServer != null && isManagingZookeeper) {
- // Call stop() on zk server instance. This will not cleanup temp data.
- zookeeperTestServer.stop();
- }
- }
-
- /**
- * Helper method to conditionally set a property if no value is already set.
- * @param properties The properties instance to update.
- * @param key The key to set if not already set.
- * @param defaultValue The value to set if no value is already set for key.
- * @return The value set.
- */
- private Object setPropertyIfNotSet(final Properties properties, final String key, final String defaultValue) {
- // Validate inputs
- if (properties == null) {
- throw new NullPointerException("properties argument cannot be null.");
- }
- if (key == null) {
- throw new NullPointerException("key argument cannot be null.");
- }
-
- // Conditionally set the property if its not already set.
- properties.setProperty(
- key,
- properties.getProperty(key, defaultValue)
- );
-
- // Return the value that is being used.
- return properties.get(key);
- }
-
- /**
- * If the key already exists on the properties, it will append the value to the existing value, comma delimited.
- * @param properties Properties to update.
- * @param key Key to update.
- * @param appendValue Value to set or append.
- * @return Updated value.
- */
- private Object appendProperty(final Properties properties, final String key, final String appendValue) {
- // Validate inputs
- if (properties == null) {
- throw new NullPointerException("properties argument cannot be null.");
- }
- if (key == null) {
- throw new NullPointerException("key argument cannot be null.");
- }
-
- String originalValue = properties.getProperty(key);
- if (originalValue != null && !originalValue.isEmpty()) {
- originalValue = originalValue + ", ";
- } else {
- originalValue = "";
- }
- properties.setProperty(key, originalValue + appendValue);
-
- return properties.get(key);
- }
-
- /**
- * Returns which hostname/IP address Kafka will bind/listen/advertise with. To change this value
- * use the constructor: KafkaTestServer(final Properties overrideBrokerProperties) and set the property
- * 'host.name' to the appropriate value.
- *
- * @return Which hostname/IP Kafka should bind/listen/advertise using.
- */
- private String getConfiguredHostname() {
- return overrideBrokerProperties.getProperty("host.name", DEFAULT_HOSTNAME);
- }
-
- /**
- * Helper method for ensure state consistency.
- * @param requireServiceStarted True if service should have been started, false if not.
- * @param errorMessage Error message to throw if the state is not consistent.
- * @throws IllegalStateException if the kafkaCluster state is not consistent.
- */
- private void validateState(final boolean requireServiceStarted, final String errorMessage) throws IllegalStateException {
- if (requireServiceStarted && broker == null) {
- throw new IllegalStateException(errorMessage);
- } else if (!requireServiceStarted && broker != null) {
- throw new IllegalStateException(errorMessage);
- }
- }
-}
\ No newline at end of file
diff --git a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestUtils.java b/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestUtils.java
deleted file mode 100644
index 342c56a..0000000
--- a/ncds-sdk/src/test/java/com/nasdaq/ncdsclient/core/KafkaTestUtils.java
+++ /dev/null
@@ -1,521 +0,0 @@
-package com.nasdaq.ncdsclient.core;
-
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DescribeClusterResult;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.admin.TopicListing;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.TopicPartitionInfo;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-/**
- * A collection of re-usable patterns for interacting with embedded Kafka server.
- */
-public class
-KafkaTestUtils {
- private static final Logger logger = LoggerFactory.getLogger(KafkaTestUtils.class);
-
- // The embedded Kafka server to interact with.
- private final KafkaProvider kafkaProvider;
-
- /**
- * Constructor.
- * @param kafkaProvider The kafka cluster to operate on.
- */
- public KafkaTestUtils(KafkaProvider kafkaProvider) {
- if (kafkaProvider == null) {
- throw new IllegalArgumentException("KafkaCluster argument cannot be null.");
- }
- this.kafkaProvider = kafkaProvider;
- }
-
- /**
- * Produce some records into the defined kafka topic.
- *
- * @param keysAndValues Records you want to produce.
- * @param topicName the topic to produce into.
- * @param partitionId the partition to produce into.
- * @return List of ProducedKafkaRecords.
- */
- public List> produceRecords(
- final Map keysAndValues,
- final String topicName,
- final int partitionId
- ) {
- // Defines customized producer properties. Ensure data written to all ISRs.
- final Properties producerProperties = new Properties();
- producerProperties.put("acks", "all");
-
- // This holds the records we produced
- final List> producedRecords = new ArrayList<>();
-
- // This holds futures returned
- final List> producerFutures = new ArrayList<>();
-
- try (final KafkaProducer producer = getKafkaProducer(
- ByteArraySerializer.class,
- ByteArraySerializer.class,
- producerProperties
- )) {
- for (final Map.Entry entry: keysAndValues.entrySet()) {
- // Construct filter
- final ProducerRecord record
- = new ProducerRecord<>(topicName, partitionId, entry.getKey(), entry.getValue());
-
- producedRecords.add(record);
-
- // Send it.
- producerFutures.add(producer.send(record));
- }
-
- // Publish to the topic and close.
- producer.flush();
- logger.debug("Produce completed");
- }
-
- // Loop thru the futures, and build KafkaRecord objects
- final List> kafkaRecords = new ArrayList<>();
- try {
- for (int x = 0; x < keysAndValues.size(); x++) {
- final RecordMetadata metadata = producerFutures.get(x).get();
- final ProducerRecord producerRecord = producedRecords.get(x);
-
- kafkaRecords.add(ProducedKafkaRecord.newInstance(metadata, producerRecord));
- }
- } catch (final InterruptedException | ExecutionException exception) {
- throw new RuntimeException(exception);
- }
-
- return kafkaRecords;
- }
-
- /**
- * Produce randomly generated records into the defined kafka topic.
- *
- * @param numberOfRecords how many records to produce
- * @param topicName the topic to produce into.
- * @param partitionId the partition to produce into.
- * @return List of ProducedKafkaRecords.
- */
- public List> produceRecords(
- final int numberOfRecords,
- final String topicName,
- final int partitionId
- ) {
- final Map keysAndValues = new HashMap<>();
-
- // Generate random & unique data
- for (int index = 0; index < numberOfRecords; index++) {
- // Construct key and value
- final long timeStamp = Clock.systemUTC().millis();
- final String key = "key" + timeStamp;
- final String value = "value" + timeStamp;
-
- // Add to map
- keysAndValues.put(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
- }
-
- return produceRecords(keysAndValues, topicName, partitionId);
- }
-
- /**
- * This will consume all records from all partitions on the given topic.
- * @param topic Topic to consume from.
- * @return List of ConsumerRecords consumed.
- */
- public List> consumeAllRecordsFromTopic(final String topic) {
- return consumeAllRecordsFromTopic(topic, ByteArrayDeserializer.class, ByteArrayDeserializer.class);
- }
-
- /**
- * This will consume all records from only the partitions given.
- * @param topic Topic to consume from.
- * @param partitionIds Collection of PartitionIds to consume.
- * @return List of ConsumerRecords consumed.
- */
- public List> consumeAllRecordsFromTopic(final String topic, Collection partitionIds) {
- return consumeAllRecordsFromTopic(topic, partitionIds, ByteArrayDeserializer.class, ByteArrayDeserializer.class);
- }
-
- /**
- * This will consume all records from all partitions on the given topic.
- * @param Type of key values.
- * @param Type of message values.
- * @param topic Topic to consume from.
- * @param keyDeserializer How to deserialize the key values.
- * @param valueDeserializer How to deserialize the messages.
- * @return List of ConsumerRecords consumed.
- */
- public List> consumeAllRecordsFromTopic(
- final String topic,
- final Class extends Deserializer> keyDeserializer,
- final Class extends Deserializer> valueDeserializer
- ) {
- // Find all partitions on topic.
- final TopicDescription topicDescription = describeTopic(topic);
- final Collection partitions = topicDescription
- .partitions()
- .stream()
- .map(TopicPartitionInfo::partition)
- .collect(Collectors.toList());
-
- // Consume messages
- return consumeAllRecordsFromTopic(topic, partitions, keyDeserializer, valueDeserializer);
- }
-
- /**
- * This will consume all records from the partitions passed on the given topic.
- * @param Type of key values.
- * @param Type of message values.
- * @param topic Topic to consume from.
- * @param partitionIds Which partitions to consume from.
- * @param keyDeserializer How to deserialize the key values.
- * @param valueDeserializer How to deserialize the messages.
- * @return List of ConsumerRecords consumed.
- */
- public List> consumeAllRecordsFromTopic(
- final String topic,
- final Collection partitionIds,
- final Class extends Deserializer> keyDeserializer,
- final Class extends Deserializer> valueDeserializer
- ) {
- // Create topic Partitions
- final List topicPartitions = partitionIds
- .stream()
- .map((partitionId) -> new TopicPartition(topic, partitionId))
- .collect(Collectors.toList());
-
- // Holds our results.
- final List> allRecords = new ArrayList<>();
-
- // Connect Consumer
- try (final KafkaConsumer kafkaConsumer = getKafkaConsumer(keyDeserializer, valueDeserializer, new Properties())) {
-
- // Assign topic partitions & seek to head of them
- kafkaConsumer.assign(topicPartitions);
- kafkaConsumer.seekToBeginning(topicPartitions);
-
- // Pull records from kafka, keep polling until we get nothing back
- ConsumerRecords records;
- final int maxEmptyLoops = 2;
- int loopsLeft = maxEmptyLoops;
- do {
- // Grab records from kafka
- records = kafkaConsumer.poll(2000L);
- logger.debug("Found {} records in kafka", records.count());
-
- // Add to our array list
- records.forEach(allRecords::add);
-
- // We want two full poll() calls that return empty results to break the loop.
- if (!records.isEmpty()) {
- // If we found records, reset our loop control variable.
- loopsLeft = maxEmptyLoops;
- } else {
- // Otherwise decrement the loop control variable.
- loopsLeft--;
- }
-
- }
- while (loopsLeft > 0);
- }
-
- // return all records
- return allRecords;
- }
-
- /**
- * Creates a topic in Kafka. If the topic already exists this does nothing.
- * @param topicName the topic name to create.
- * @param partitions the number of partitions to create.
- * @param replicationFactor the number of replicas for the topic.
- */
- public void createTopic(final String topicName, final int partitions, final short replicationFactor) {
- // Create admin client
- try (final AdminClient adminClient = getAdminClient()) {
- // Define topic
- final NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
-
- // Create topic, which is async call.
- final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
-
- // Since the call is Async, Lets wait for it to complete.
- createTopicsResult.values().get(topicName).get();
- } catch (InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof TopicExistsException)) {
- throw new RuntimeException(e.getMessage(), e);
- }
- // TopicExistsException - Swallow this exception, just means the topic already exists.
- }
- }
-
- /**
- * Describes a topic in Kafka.
- * @param topicName the topic to describe.
- * @return Description of the topic.
- */
- public TopicDescription describeTopic(final String topicName) {
- // Create admin client
- try (final AdminClient adminClient = getAdminClient()) {
- // Make async call to describe the topic.
- final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
-
- return describeTopicsResult.values().get(topicName).get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * List all names of topics in Kafka.
- * @return Set of topics found in Kafka.
- */
- public Set getTopicNames() {
- try (final AdminClient adminClient = getAdminClient()) {
- return adminClient.listTopics().names().get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * Get information about all topics in Kafka.
- * @return Set of topics found in Kafka.
- */
- public List getTopics() {
- try (final AdminClient adminClient = getAdminClient()) {
- return new ArrayList<>(adminClient.listTopics().listings().get());
- } catch (final InterruptedException | ExecutionException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * Describe nodes within Kafka cluster.
- * @return Collection of nodes within the Kafka cluster.
- */
- public List describeClusterNodes() {
- // Create admin client
- try (final AdminClient adminClient = getAdminClient()) {
- final DescribeClusterResult describeClusterResult = adminClient.describeCluster();
- return new ArrayList<>(describeClusterResult.nodes().get());
- } catch (final InterruptedException | ExecutionException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * Creates a Kafka AdminClient connected to our test server.
- * @return Kafka AdminClient instance.
- */
- public AdminClient getAdminClient() {
- return KafkaAdminClient.create(buildDefaultClientConfig());
- }
-
- /**
- * Creates a kafka producer that is connected to our test server.
- * @param Type of message key
- * @param Type of message value
- * @param keySerializer Class of serializer to be used for keys.
- * @param valueSerializer Class of serializer to be used for values.
- * @return KafkaProducer configured to produce into Test server.
- */
- public KafkaProducer getKafkaProducer(
- final Class extends Serializer> keySerializer,
- final Class extends Serializer> valueSerializer
- ) {
-
- return getKafkaProducer(keySerializer, valueSerializer, new Properties());
- }
-
- /**
- * Creates a kafka producer that is connected to our test server.
- * @param Type of message key
- * @param Type of message value
- * @param keySerializer Class of serializer to be used for keys.
- * @param valueSerializer Class of serializer to be used for values.
- * @param config Additional producer configuration options to be set.
- * @return KafkaProducer configured to produce into Test server.
- */
- public KafkaProducer getKafkaProducer(
- final Class extends Serializer> keySerializer,
- final Class extends Serializer> valueSerializer,
- final Properties config
- ) {
-
- // Build config
- final Map kafkaProducerConfig = buildDefaultClientConfig();
- kafkaProducerConfig.put("max.in.flight.requests.per.connection", 1);
- kafkaProducerConfig.put("retries", 5);
- kafkaProducerConfig.put("client.id", getClass().getSimpleName() + " Producer");
- kafkaProducerConfig.put("batch.size", 0);
- kafkaProducerConfig.put("key.serializer", keySerializer);
- kafkaProducerConfig.put("value.serializer", valueSerializer);
-
- // Override config
- if (config != null) {
- for (final Map.Entry