diff --git a/Makefile b/Makefile index 1be335e3..70117c7e 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,10 @@ ci-test-unit: ci-test-all: pytest -s -v --log-format="%(asctime)s %(levelname)s %(message)s" --log-level DEBUG --cov aiokafka --cov-report xml --color=yes --docker-image $(DOCKER_IMAGE) $(FLAGS) tests +.PHONY: manual-test +manual-test: + docker compose up --build --exit-code-from aiokafka --attach aiokafka + coverage.xml: .coverage coverage xml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..2a36067d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,64 @@ +# Network and IPs are hard coded otherwise a broker might be confused on restarting from a different IP + +services: + kafka1: + build: + context: ./docker/kafka + command: "start-broker.sh" + environment: + BROKER_ID: "1" + KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M" + CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.11 + stop_grace_period: 30s + healthcheck: + test: [ "CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 || exit 1" ] + retries: 30 + interval: 1s + + kafka2: + extends: + service: kafka1 + environment: + BROKER_ID: "2" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.12 + + kafka3: + extends: + service: kafka1 + environment: + BROKER_ID: "3" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.13 + + + aiokafka: + build: + context: . + dockerfile: docker/aiokafka/Dockerfile + command: [ "python", "-u", "-m", "tests.manual.topic_management" ] + environment: + BOOTSTRAP_SERVERS: "kafka3:9092,kafka2:9092,kafka1:9092" + networks: + aiokafka-test-network: + ipv4_address: 172.16.23.100 + depends_on: + kafka1: + condition: service_healthy + kafka2: + condition: service_healthy + kafka3: + condition: service_healthy + +networks: + aiokafka-test-network: + ipam: + driver: default + config: + - subnet: 172.16.23.0/24 + ip_range: 172.28.23.0/24 diff --git a/docker/aiokafka/Dockerfile b/docker/aiokafka/Dockerfile new file mode 100644 index 00000000..e7071d2c --- /dev/null +++ b/docker/aiokafka/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.12 + +WORKDIR /opt/project + +COPY setup.py pyproject.toml requirements-* /opt/project/ + +RUN pip install -r requirements-ci.txt + +COPY aiokafka /opt/project/aiokafka +COPY tests /opt/project/tests diff --git a/docker/kafka/Dockerfile b/docker/kafka/Dockerfile new file mode 100644 index 00000000..f38be719 --- /dev/null +++ b/docker/kafka/Dockerfile @@ -0,0 +1,21 @@ +FROM ubuntu:22.04 + +RUN apt-get update && \ + apt-get install -y --no-install-recommends default-jre wget && \ + rm -rf /var/lib/apt/lists/* + +ENV PATH="/opt/kafka/bin:${PATH}" +WORKDIR /opt/kafka + +# API like CreateTopics are redirected automatically to the controller starting to 2.8.0 +# https://issues.apache.org/jira/browse/KAFKA-10181 +# To reproduce the error, we must use a previous version of kafka +ARG SCALA_VERSION=2.13 +ARG KAFKA_VERSION=3.3.0 + +RUN wget -q -O kafka.tgz "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" \ + && tar xfvz kafka.tgz --strip 1 \ + && rm -rf kafka.tgz site-docs + +COPY start-broker.sh /opt/kafka/bin/ +COPY base-server.properties /opt/kafka/config \ No newline at end of file diff --git a/docker/kafka/base-server.properties b/docker/kafka/base-server.properties new file mode 100644 index 00000000..8a617425 --- /dev/null +++ b/docker/kafka/base-server.properties @@ -0,0 +1,13 @@ +num.network.threads=3 +num.io.threads=8 +socket.send.buffer.bytes=102400 +socket.receive.buffer.bytes=102400 +socket.request.max.bytes=104857600 +log.dirs=/tmp/kafka-logs +num.partitions=1 +num.recovery.threads.per.data.dir=1 +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 +log.retention.hours=168 +log.retention.check.interval.ms=300000 diff --git a/docker/kafka/start-broker.sh b/docker/kafka/start-broker.sh new file mode 100755 index 00000000..7ba06ba5 --- /dev/null +++ b/docker/kafka/start-broker.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + + +echo " +broker.id=${BROKER_ID:-0} +process.roles=broker,controller +listeners=PLAINTEXT://:9092,CONTROLLER://:9093 +advertised.listeners=PLAINTEXT://$(hostname -i):9092 +listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT +controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS} +controller.listener.names=CONTROLLER +offsets.topic.replication.factor=${OFFSETS_REPLICATIONS:-1} +" > config/runtime.properties + +cat config/base-server.properties config/runtime.properties > config/server.properties + +if [ ! -e "/tmp/kafka-logs/meta.properties" ]; then + kafka-storage.sh format --config config/server.properties --cluster-id "YPKJRKEhT06jEqGlBQar5A" +fi + +exec kafka-server-start.sh config/server.properties diff --git a/tests/manual/__init__.py b/tests/manual/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/manual/topic_management.py b/tests/manual/topic_management.py new file mode 100644 index 00000000..801342e2 --- /dev/null +++ b/tests/manual/topic_management.py @@ -0,0 +1,27 @@ +import asyncio +import os + +from aiokafka.admin import AIOKafkaAdminClient, NewTopic + + +async def main() -> None: + client = AIOKafkaAdminClient(bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"]) + await client.start() + try: + for i in range(20): + topic = f"test-{i}" + print("Creating topic:", topic) + await client.create_topics( + [NewTopic(name=topic, num_partitions=3, replication_factor=2)] + ) + await asyncio.sleep(1) + print("Deleting topic:", topic) + await client.delete_topics([topic]) + await asyncio.sleep(1) + finally: + await client.close() + + +if __name__ == "__main__": + # Start the asyncio loop by running the main function + asyncio.run(main())