Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka initial implementation #1416

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions jenkins_pipelines/scripts/kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
FROM apache/kafka:3.7.1

USER root

RUN \
apk \
update \
&& \
apk \
add \
git=2.43.5-r0 \
python3=3.11.10-r0 \
python3-dev=3.11.10-r0 \
py3-pip=23.3.1-r0 \
gcc=13.2.1_git20231014-r0 \
g++=13.2.1_git20231014-r0 \
librdkafka-dev=2.3.0-r1

USER appuser

COPY --chown=appuser "consumer.py" "producer.py" "/home/appuser/"

WORKDIR "/home/appuser"

RUN \
python3 \
-m \
venv \
"/home/appuser/venv" \
&& \
ktsamis marked this conversation as resolved.
Show resolved Hide resolved
. \
"/home/appuser/venv/bin/activate" \
&& \
pip \
ktsamis marked this conversation as resolved.
Show resolved Hide resolved
install \
confluent_kafka==2.3.0 \
GitPython==3.1.43 \
requests==2.32.3 \
&& \
git \
clone \
"https://github.com/SUSE/susemanager-ci"

ENTRYPOINT ["/bin/bash", "-c", \
"/etc/kafka/docker/run & \
/opt/kafka/bin/kafka-topics.sh \
--create \
--if-not-exists \
--topic sle_mu_43 \
--bootstrap-server localhost:9092 \
&& \
. \
/home/appuser/venv/bin/activate \
&& \
python3 \
consumer.py" \
]

62 changes: 62 additions & 0 deletions jenkins_pipelines/scripts/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Kafka automation concept

Messaging system to speed up business processes via services API.
ktsamis marked this conversation as resolved.
Show resolved Hide resolved

## Requirements

### Host

Tested on the SLE 15 SP6 host deployed in a fully trusted environment with the following packages installed:
* `docker` package from [Virtualization repository](https://download.opensuse.org/repositories/Virtualization:/containers/15.6/).

### Variables

The following environment variables need to be exported on the container's host:
* [`JENKINS_API_TOKEN`](https://ci.suse.de/user/manager/configure).

### Networking

The following websites needs to be resolvable within the docker container network:
* `https://smelt.suse.de`
* `https://hooks.slack.com`
* `https://ci.suse.de`
* `https://github.com`

## Usage

### Building and Running

Being in the `susemanager-ci/jenkins_pipelines/scripts/kafka` catalog, build `kafka` container:

```bash
docker build . --tag "kafka"
```

With exported `JENKINS_API_TOKEN`, run `kafka` container:

```bash
docker run --env JENKINS_API_TOKEN=${JENKINS_API_TOKEN} --network "host" kafka
```

### Topics Available

* `sle_mu_43`:
1. Pulls the latest [MU requests](https://smelt.suse.de/overview/) to be accepted and generates json based on the latest [susemanager-ci](https://github.com/SUSE/susemanager-ci/tree/master) scripts.
2. Start a [new manager-4.3-qe-sle-update pipeline](https://ci.suse.de/view/Manager/view/Manager-4.3/job/manager-4.3-qe-sle-update/) and monitors the status running.
3. Send message to the dedicated Slack channel [andy-test](https://app.slack.com/client/T02863RC2AC/C033KJKDF9V) informing about the status.

⚠️ _Producing script should be integrated to the https://smelt.suse.de site, at the moment it is sending requests from container_.

### Debugging

Alongside kafka logging, built-in logger should capture API requests to external services with the corresponding return codes and return messages:

```bash
docker logs "kafka"
```

## Additional resources

* [SLE MU pipeline automation concept](https://github.com/SUSE/spacewalk/issues/24966).
* [SLE Maintenance updates document](https://confluence.suse.com/display/SUSEMANAGER/QE+SLE+Maintenance+Updates).

205 changes: 205 additions & 0 deletions jenkins_pipelines/scripts/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@

import functools
import json
import logging
import os
import subprocess
import time
import dataclasses

import confluent_kafka
import git
import requests

import producer


logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s [KAFKA CONSUMER]: %(message)s'
)


class APIClients:
authorization_parameters: tuple = ('manager', os.getenv('JENKINS_API_TOKEN'))

@staticmethod
def log_http_requests(request):
@functools.wraps(request)
def wrapper(self, *args, **kwargs):
response = request(self, *args, **kwargs)
logging.info(f"{request.__name__.upper()} {args[0]}, STATUS: {response.status_code}")
if response.status_code not in (200, 201):
logging.error(f"{response.content}")
return response
return wrapper

@log_http_requests
def get(self, endpoint: str):
return requests.get(endpoint, auth=self.authorization_parameters, verify=False, timeout=10)

@log_http_requests
def post(self, endpoint: str, params=None, data=None):
if endpoint.startswith('https://hooks.slack.com'):
return requests.post(endpoint, headers={'Content-Type': 'application/json'}, data=json.dumps(data), timeout=10)
return requests.post(endpoint, auth=self.authorization_parameters, params=params, verify=False, timeout=10)


@dataclasses.dataclass
class KafkaConsumer:
consumer = confluent_kafka.Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'jenkins_pipelines',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 86400000
})
kafka_topic = 'sle_mu_43'
api_clients = APIClients()

def __post_init__(self) -> None:
self.consumer.subscribe([self.kafka_topic])

@staticmethod
def pull_latest_susemanager_ci():
susemanager_ci_repository = git.Repo('/home/appuser/susemanager-ci')
try:
susemanager_ci_repository.remotes.origin.pull()
except git.exc.GitError as stderr:
logging.warning(f"Error during git pull on susemanager-ci repository: {stderr}")

@staticmethod
def generate_custom_repositories(incidents: dict):
incident_numbers = ','.join(
str(incident['incident']['incident_id'])
for incident in incidents['data']
)
subprocess.run(
NamelessOne91 marked this conversation as resolved.
Show resolved Hide resolved
[
"python3",
"susemanager-ci/jenkins_pipelines/scripts/json_generator/maintenance_json_generator.py",
"-i", incident_numbers
],
check=True
)

def run_jenkins_pipeline(self) -> int | None:
instances_involved = ['server', 'proxy', 'sle15sp4_client', 'sle15sp4_minion']
with open('custom_repositories.json', 'r', encoding='utf-8') as custom_repositories:
custom_repositories_formatted = json.dumps(
{
key: value for key, value in json.load(custom_repositories).items()
if key in instances_involved
},
indent=4
)
build_parameters = {
'cucumber_gitrepo': 'https://github.com/SUSE/spacewalk.git',
'cucumber_ref': 'Manager-4.3',
'tf_file': 'susemanager-ci/terracumber_config/tf_files/SUSEManager-4.3-SLE-update.tf',
'sumaform_gitrepo': 'https://github.com/uyuni-project/sumaform.git',
'sumaform_ref': 'master',
'sumaform_backend': 'libvirt',
'terraform_bin': '/usr/bin/terraform',
'terraform_bin_plugins': '/usr/bin',
'terraform_parallelism': '',
'terracumber_gitrepo': 'https://github.com/uyuni-project/terracumber.git',
'terracumber_ref': 'master',
'minions_to_run': 'sle15sp4_minion',
'use_previous_terraform_state': 'false',
'must_deploy': 'true',
'must_run_core': 'true',
'must_sync': 'true',
'enable_proxy_stages': 'true',
'enable_client_stages': 'true',
'must_add_MU_repositories': 'true',
'must_add_non_MU_repositories': 'true',
'must_add_keys': 'true',
'must_create_bootstrap_repos': 'true',
'must_boot_node': 'true',
'must_run_tests': 'true',
'must_run_containerization_tests': 'false',
'confirm_before_continue': 'false',
'custom_repositories': custom_repositories_formatted
}

request = self.api_clients.post(
'https://ci.suse.de/job/manager-4.3-qe-sle-update/buildWithParameters',
params=build_parameters
)

if request.status_code == 201:
time.sleep(10) # to avoid "In the quiet period. Expires in <10 sec"
request = self.api_clients.get(f"{request.headers['Location']}/api/json")
response = request.json()
try:
build_number = response['executable']['number']
os.rename('custom_repositories.json', f'custom_repositories_{build_number}.json')
return build_number
except KeyError:
logging.error(f"Build number {build_number} was not found in the currently running pipelines, latest output: {response['why']}. Please check if someone else is working on the pipeline.")
return None
if request.status_code == 431:
logging.error(f'Request is too big, perhaps too many RRs to be accepted generated big JSON, run pipeline manually using generated custom_repositories.json in {os.getcwd()}')
return None

def pipeline_status(self):
NamelessOne91 marked this conversation as resolved.
Show resolved Hide resolved
return not self.api_clients.get(f'https://ci.suse.de/job/manager-4.3-qe-sle-update/api/json').json()['color'] == 'disabled'

def build_status(self, build_number: int) -> str:
request = self.api_clients.get(f'https://ci.suse.de/job/manager-4.3-qe-sle-update/{build_number}/api/json')
response = request.json()
if response['inProgress']:
return 'INPROGRESS'
return response['result']

def send_message_slack(self, incidents: dict, build_number: int, status: str) -> None:
mu_requests = [
f"https://build.suse.de/request/show/{incident['request_id']}"
for incident in incidents['data']
]
message = {
'message': f'SLE MU pipeline https://ci.suse.de/job/manager-4.3-qe-sle-update/{build_number} has status: {status} with the following requests: {mu_requests}'
}
self.api_clients.post(
'https://hooks.slack.com/triggers/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',
data=message
)

def listen(self) -> None:
build_number: None | int = None
try:
while True:
time.sleep(300)
if self.pipeline_status() or build_number:
if build_number:
status = self.build_status(build_number)
logging.info(f'Pipeline build {build_number} STATUS: {status}')
if status != 'INPROGRESS':
self.send_message_slack(incidents, build_number, status)
build_number = None
else:
message = self.consumer.poll(timeout=1.0)
if message is None:
producer.produce()
else:
try:
incidents = json.loads(message.value().decode('utf-8'))
except json.decoder.JSONDecodeError:
logging.error(f'Could not decode kafka message: {message.value()}')
raise
if incidents["recordsFiltered"] > 0:
self.pull_latest_susemanager_ci()
self.generate_custom_repositories(incidents)
build_number = self.run_jenkins_pipeline()
self.consumer.commit(message)
else:
logging.info('Pipeline disabled')
except:
self.consumer.close()
raise


if __name__ == '__main__':
KafkaConsumer().listen()

25 changes: 25 additions & 0 deletions jenkins_pipelines/scripts/kafka/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

ktsamis marked this conversation as resolved.
Show resolved Hide resolved
import confluent_kafka
import requests


def produce():
kafka_broker = 'localhost:9092'
kafka_topic = 'sle_mu_43'
smelt_testing_site = 'https://smelt.suse.de/api/v1/overview/testing/?format=datatables&draw=7&columns%5B0%5D%5Bdata%5D=category&columns%5B0%5D%5Bname%5D=category.id&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=true&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B1%5D%5Bdata%5D=request_id&columns%5B1%5D%5Bname%5D=&columns%5B1%5D%5Bsearchable%5D=false&columns%5B1%5D%5Borderable%5D=false&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B2%5D%5Bdata%5D=request_id&columns%5B2%5D%5Bname%5D=request_id%2C%20incident.incident_id&columns%5B2%5D%5Bsearchable%5D=true&columns%5B2%5D%5Borderable%5D=true&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B3%5D%5Bdata%5D=comments_exists&columns%5B3%5D%5Bname%5D=&columns%5B3%5D%5Bsearchable%5D=false&columns%5B3%5D%5Borderable%5D=false&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B4%5D%5Bdata%5D=created&columns%5B4%5D%5Bname%5D=&columns%5B4%5D%5Bsearchable%5D=true&columns%5B4%5D%5Borderable%5D=true&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B5%5D%5Bdata%5D=due_date&columns%5B5%5D%5Bname%5D=due_date&columns%5B5%5D%5Bsearchable%5D=false&columns%5B5%5D%5Borderable%5D=true&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B6%5D%5Bdata%5D=incident.priority&columns%5B6%5D%5Bname%5D=&columns%5B6%5D%5Bsearchable%5D=false&columns%5B6%5D%5Borderable%5D=true&columns%5B6%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B6%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B7%5D%5Bdata%5D=unfinished_reviews&columns%5B7%5D%5Bname%5D=&columns%5B7%5D%5Bsearchable%5D=true&columns%5B7%5D%5Borderable%5D=false&columns%5B7%5D%5Bsearch%5D%5Bvalue%5D=qam-manager&columns%5B7%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B8%5D%5Bdata%5D=packages&columns%5B8%5D%5Bname%5D=packages&columns%5B8%5D%5Bsearchable%5D=true&columns%5B8%5D%5Borderable%5D=false&columns%5B8%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B8%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B9%5D%5Bdata%5D=incident.references&columns%5B9%5D%5Bname%5D=incident.patchinfo.references.name&columns%5B9%5D%5Bsearchable%5D=true&columns%5B9%5D%5Borderable%5D=false&columns%5B9%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B9%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B10%5D%5Bdata%5D=channellist&columns%5B10%5D%5Bname%5D=channels.name&columns%5B10%5D%5Bsearchable%5D=true&columns%5B10%5D%5Borderable%5D=false&columns%5B10%5D%5Bsearch%5D%5Bvalue%5D=sp4&columns%5B10%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B11%5D%5Bdata%5D=created_by.username&columns%5B11%5D%5Bname%5D=created_by.username&columns%5B11%5D%5Bsearchable%5D=true&columns%5B11%5D%5Borderable%5D=true&columns%5B11%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B11%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B12%5D%5Bdata%5D=url&columns%5B12%5D%5Bname%5D=&columns%5B12%5D%5Bsearchable%5D=false&columns%5B12%5D%5Borderable%5D=false&columns%5B12%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B12%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B13%5D%5Bdata%5D=kind&columns%5B13%5D%5Bname%5D=&columns%5B13%5D%5Bsearchable%5D=false&columns%5B13%5D%5Borderable%5D=false&columns%5B13%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B13%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B14%5D%5Bdata%5D=rating&columns%5B14%5D%5Bname%5D=&columns%5B14%5D%5Bsearchable%5D=false&columns%5B14%5D%5Borderable%5D=false&columns%5B14%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B14%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B15%5D%5Bdata%5D=qa_comments_exist&columns%5B15%5D%5Bname%5D=&columns%5B15%5D%5Bsearchable%5D=false&columns%5B15%5D%5Borderable%5D=false&columns%5B15%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B15%5D%5Bsearch%5D%5Bregex%5D=false&order%5B0%5D%5Bcolumn%5D=6&order%5B0%5D%5Bdir%5D=desc&start=0&length=250&search%5Bvalue%5D=&search%5Bregex%5D=false&_=1726339618939'
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'en-US,en;q=0.6',
'Connection': 'keep-alive',
'Referer': 'https://smelt.suse.de/overview/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-GPC': '1',
'X-Requested-With': 'XMLHttpRequest'
}
maintenance_incidents_on_qa = requests.get(smelt_testing_site, headers=headers, verify=False, timeout=10).content
producer = confluent_kafka.Producer({'bootstrap.servers': kafka_broker})
producer.produce(kafka_topic, maintenance_incidents_on_qa)
producer.flush()