Skip to content

Commit 7e89b4f

Browse files
authored
Merge pull request #287 from uds5501/feat/acl-integration-gopay
Feature : ACL integration features
2 parents 0ac5e04 + dad1197 commit 7e89b4f

15 files changed

+514
-141
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
All notable changes to this project will be documented in this file. This change log follows the conventions
44
of [keepachangelog.com](http://keepachangelog.com/).
55

6+
## 4.12.0
7+
- Adds support for ACL auth for kafka streams.
8+
69
## 4.11.1
710
- Fix retry-count returning nil if empty. Returns 0 by default now.
811

Makefile

+85-25
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,100 @@
1+
KAFKA_TOPICS = topic another-test-topic
2+
KAFKA_BROKERS = kafka1:9095 kafka2:9096 kafka3:9097
3+
ADMIN_CONFIG = /etc/kafka/secrets/config-admin.properties
4+
KAFKA_CONTAINER = ziggurat_kafka1_1
5+
16
.PHONY: all
2-
all: test
37

4-
topic="topic"
5-
another_test_topic="another-test-topic"
8+
# Main target to setup the entire cluster
9+
setup-cluster: down up wait-for-kafka create-scram-credentials create-topics setup-acls
610

7-
setup:
8-
docker-compose down
9-
lein deps
10-
docker-compose up -d
11-
sleep 10
12-
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
13-
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
11+
# Bring down all containers and clean volumes
12+
down:
13+
@echo "Bringing down all containers..."
14+
docker-compose -f docker-compose-cluster.yml down -v
1415

15-
test: setup
16-
TESTING_TYPE=local lein test
17-
docker-compose down
16+
# Start all containers
17+
up:
18+
@echo "Starting all containers..."
19+
docker-compose -f docker-compose-cluster.yml up -d
1820

19-
setup-cluster:
20-
rm -rf /tmp/ziggurat_kafka_cluster_data
21-
docker-compose -f docker-compose-cluster.yml -p ziggurat down
22-
lein deps
23-
docker-compose -f docker-compose-cluster.yml -p ziggurat up -d
24-
sleep 30
25-
# Sleeping for 30s to allow the cluster to come up
26-
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
27-
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
21+
# Wait for Kafka to be ready
22+
wait-for-kafka:
23+
@echo "Waiting for Kafka to be ready..."
24+
@sleep 30
25+
26+
# Restart everything
27+
restart: down up wait-for-kafka
28+
29+
# Create SCRAM credentials for admin user
30+
create-scram-credentials:
31+
@echo "Creating SCRAM credentials for admin user..."
32+
@docker exec $(KAFKA_CONTAINER) kafka-configs \
33+
--alter \
34+
--zookeeper zookeeper:2181 \
35+
--add-config 'SCRAM-SHA-256=[password=admin]' \
36+
--entity-type users \
37+
--entity-name admin
38+
39+
# Create all required topics
40+
create-topics:
41+
@for topic in $(KAFKA_TOPICS); do \
42+
echo "Creating topic: $$topic"; \
43+
docker exec $(KAFKA_CONTAINER) kafka-topics \
44+
--create \
45+
--zookeeper zookeeper:2181 \
46+
--if-not-exists \
47+
--topic $$topic \
48+
--partitions 3 \
49+
--replication-factor 3; \
50+
done
51+
52+
# Setup ACLs for admin user on all brokers
53+
setup-acls:
54+
@for broker in $(KAFKA_BROKERS); do \
55+
case $$broker in \
56+
kafka1:9095) \
57+
container="ziggurat_kafka1_1" ;; \
58+
kafka2:9096) \
59+
container="ziggurat_kafka2_1" ;; \
60+
kafka3:9097) \
61+
container="ziggurat_kafka3_1" ;; \
62+
esac; \
63+
for topic in $(KAFKA_TOPICS); do \
64+
echo "Setting up ACLs for topic: $$topic on broker: $$broker using container: $$container"; \
65+
docker exec $$container kafka-acls \
66+
--bootstrap-server $$broker \
67+
--command-config $(ADMIN_CONFIG) \
68+
--add \
69+
--allow-principal User:admin \
70+
--operation All \
71+
--topic $$topic; \
72+
done \
73+
done
74+
75+
# Clean up topics (can be used during development)
76+
clean-topics:
77+
@for topic in $(KAFKA_TOPICS); do \
78+
echo "Deleting topic: $$topic"; \
79+
docker exec $(KAFKA_CONTAINER) kafka-topics --bootstrap-server kafka1:9095 \
80+
--delete \
81+
--topic $$topic; \
82+
done
83+
84+
# Show logs
85+
logs:
86+
docker-compose -f docker-compose-cluster.yml logs -f
2887

2988
test-cluster: setup-cluster
3089
TESTING_TYPE=cluster lein test
3190
docker-compose -f docker-compose-cluster.yml down
3291
rm -rf /tmp/ziggurat_kafka_cluster_data
3392

34-
coverage: setup
93+
coverage: setup-cluster
3594
lein code-coverage
36-
docker-compose down
95+
docker-compose -f docker-compose-cluster.yml down
96+
3797

3898
proto:
3999
protoc -I=resources --java_out=test/ resources/proto/example.proto
40-
protoc -I=resources --java_out=test/ resources/proto/person.proto
100+
protoc -I=resources --java_out=test/ resources/proto/person.proto

config-admin.properties

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
security.protocol=SASL_PLAINTEXT
2+
sasl.mechanism=SCRAM-SHA-256
3+
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
4+
username="admin" \
5+
password="admin";

docker-compose-cluster.yml

+57-14
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@ services:
88
container_name: 'ziggurat_rabbitmq'
99

1010
zookeeper:
11-
image: zookeeper:3.4.9
11+
image: confluentinc/cp-zookeeper:5.5.0
1212
hostname: zookeeper
1313
ports:
1414
- "2181:2181"
1515
environment:
16-
ZOO_MY_ID: 1
17-
ZOO_PORT: 2181
18-
ZOO_SERVERS: server.1=zookeeper:2888:3888
19-
ZOO_TICK_TIME: 2000
16+
ZOOKEEPER_CLIENT_PORT: 2181
17+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
18+
-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
19+
-Dzookeeper.allowSaslFailedClients=true
20+
-Dzookeeper.requireClientAuthScheme=sasl"
2021
volumes:
22+
- ./zookeeper_server_jaas.conf:/etc/kafka/zookeeper_server_jaas.conf
2123
- /tmp/ziggurat_kafka_cluster_data/zookeeper/data:/data
2224
- /tmp/ziggurat_kafka_cluster_data/zookeeper/datalog:/datalog
2325

@@ -28,17 +30,32 @@ services:
2830
- SYS_ADMIN
2931
hostname: kafka1
3032
ports:
31-
- "9091:9091"
33+
- "9094:9094"
34+
- "9095:9095"
3235
environment:
33-
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
34-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
36+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9095
37+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
3538
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
36-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
39+
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
40+
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
3741
KAFKA_BROKER_ID: 1
3842
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
3943
KAFKA_NUM_PARTITIONS: 3
44+
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
45+
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
46+
username=\"client\" \
47+
password=\"client-secret\";"
48+
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
49+
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
50+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
51+
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
52+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf -Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client"
53+
KAFKA_ZOOKEEPER_SET_ACL: "true"
54+
KAFKA_ZOOKEEPER_SASL_ENABLED: "true"
4055
volumes:
4156
- /tmp/ziggurat_kafka_cluster_data/kafka1/data:/var/lib/kafka/data
57+
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
58+
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
4259
depends_on:
4360
- zookeeper
4461

@@ -50,16 +67,29 @@ services:
5067
hostname: kafka2
5168
ports:
5269
- "9092:9092"
70+
- "9096:9096"
5371
environment:
54-
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
55-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
72+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9096
73+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
5674
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
57-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
75+
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
76+
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
5877
KAFKA_BROKER_ID: 2
5978
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
6079
KAFKA_NUM_PARTITIONS: 3
80+
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
81+
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
82+
username=\"client\" \
83+
password=\"client-secret\";"
84+
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
85+
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
86+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
87+
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
88+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
6189
volumes:
6290
- /tmp/ziggurat_kafka_cluster_data/kafka2/data:/var/lib/kafka/data
91+
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
92+
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
6393
depends_on:
6494
- zookeeper
6595

@@ -71,15 +101,28 @@ services:
71101
hostname: kafka3
72102
ports:
73103
- "9093:9093"
104+
- "9097:9097"
74105
environment:
75-
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
76-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
106+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9097
107+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
77108
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
78109
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
110+
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
79111
KAFKA_BROKER_ID: 3
80112
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
81113
KAFKA_NUM_PARTITIONS: 3
114+
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
115+
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
116+
username=\"client\" \
117+
password=\"client-secret\";"
118+
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
119+
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
120+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
121+
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
122+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
82123
volumes:
83124
- /tmp/ziggurat_kafka_cluster_data/kafka3/data:/var/lib/kafka/data
125+
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
126+
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
84127
depends_on:
85128
- zookeeper

kafka_server_jaas.conf

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
KafkaServer {
2+
org.apache.kafka.common.security.scram.ScramLoginModule required
3+
username="admin"
4+
password="admin";
5+
};
6+
7+
Client {
8+
org.apache.zookeeper.server.auth.DigestLoginModule required
9+
username="admin"
10+
password="admin";
11+
};
12+
13+
KafkaClient {
14+
org.apache.kafka.common.security.scram.ScramLoginModule required
15+
username="client"
16+
password="client-secret";
17+
};

project.clj

+5-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
(cemerick.pomegranate.aether/register-wagon-factory!
33
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
44

5-
(defproject tech.gojek/ziggurat "4.11.1"
5+
(defproject tech.gojek/ziggurat "4.12.0"
66
:description "A stream processing framework to build stateless applications on kafka"
77
:url "https://github.com/gojektech/ziggurat"
88
:license {:name "Apache License, Version 2.0"
@@ -15,7 +15,7 @@
1515
[com.cemerick/url "0.1.1"]
1616
[com.datadoghq/java-dogstatsd-client "2.4"]
1717
[com.fasterxml.jackson.core/jackson-databind "2.9.9"]
18-
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure]]
18+
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure org.slf4j/slf4j-api]]
1919
[com.taoensso/nippy "3.1.1"]
2020
[io.dropwizard.metrics5/metrics-core "5.0.0" :scope "compile"]
2121
[medley "1.3.0" :exclusions [org.clojure/clojure]]
@@ -41,10 +41,7 @@
4141
[com.newrelic.agent.java/newrelic-api "6.5.0"]
4242
[yleisradio/new-reliquary "1.1.0" :exclusions [org.clojure/clojure]]
4343
[metosin/ring-swagger "0.26.2"
44-
:exclusions [cheshire
45-
com.fasterxml.jackson.core/jackson-core
46-
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
47-
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
44+
:exclusions [org.mozilla/rhino com.fasterxml.jackson.dataformat/jackson-dataformat-smile com.fasterxml.jackson.dataformat/jackson-dataformat-cbor cheshire com.google.code.findbugs/jsr305 com.fasterxml.jackson.core/jackson-core]]
4845
[metosin/ring-swagger-ui "3.46.0"]
4946
[cambium/cambium.core "1.1.0"]
5047
[cambium/cambium.codec-cheshire "1.0.0"]
@@ -72,8 +69,10 @@
7269
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
7370
[junit/junit "4.13.2"]
7471
[org.hamcrest/hamcrest-core "2.2"]
72+
[org.apache.kafka/kafka_2.12 "2.8.0"]
7573
[org.apache.kafka/kafka-streams "2.8.2" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
7674
[org.apache.kafka/kafka-clients "2.8.2" :classifier "test"]
75+
[org.apache.kafka/kafka-streams-test-utils "2.8.2" :classifier "test"]
7776
[org.clojure/test.check "1.1.0"]]
7877
:plugins [[lein-cloverage "1.2.2" :exclusions [org.clojure/clojure]]]
7978
:cloverage {:exclude-call ['cambium.core/info

resources/config.test.edn

+1
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,5 @@
9696
:new-relic {:report-errors false}
9797
:prometheus {:port 8002
9898
:enabled false}
99+
:ssl {:enabled false}
99100
:log-format "text"}}

0 commit comments

Comments
 (0)