diff --git a/.github/workflows/build-k8s-container.yaml b/.github/workflows/build-k8s-container.yaml new file mode 100644 index 00000000..87977b9a --- /dev/null +++ b/.github/workflows/build-k8s-container.yaml @@ -0,0 +1,70 @@ +--- +name: Build and Push Kubernetes Probe Docker image + +on: + push: + branches: [ 2.x ] + paths: + - '.github/workflows/build-k8s-container.yml' + - 'gratia-output-kapel/**' + - 'common/**' + repository_dispatch: + types: + - dispatch-build + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + if: startsWith(github.repository, 'opensciencegrid/') + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: make date tag + id: mkdatetag + run: echo "dtag=$(date +%Y%m%d-%H%M)" >> $GITHUB_OUTPUT + + - name: make tags + id: mktags + env: + DTAG=${{ steps.mkdatetag.outputs.dtag }} + run: | + reponame="osg-htc/gratia-probe-k8s" + OSGVER=23 + tags=() + for registry in docker.io hub.opensciencegrid.org; do + tags+=("$registry/$reponame:$OSGVER-release") + tags+=("$registry/$reponame:$OSGVER-release-$DTAG) + done + + IFS=, + echo "image_tags=${tags[*]}" >> $GITHUB_OUTPUT + echo "ts_image=hub.opensciencegrid.org/$reponame:$OSGVER-release-$DTAG" + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2.7.0 + + - name: Log in to Docker Hub + uses: docker/login-action@v2.2.0 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Log in to OSG Harbor + uses: docker/login-action@v2.2.0 + with: + registry: hub.opensciencegrid.org + username: ${{ secrets.OSG_HARBOR_ROBOT_USER }} + password: ${{ secrets.OSG_HARBOR_ROBOT_PASSWORD }} + + - name: Build and push Docker images + uses: docker/build-push-action@v4 + with: + context: . + file: osg-pilot-container/Dockerfile + push: true + build-args: | + TIMESTAMP_IMAGE=${{ steps.mktags.outputs.ts_image }} + tags: "${{ steps.mktags.outputs.image_tags }}" diff --git a/kubernetes/Dockerfile b/kubernetes/Dockerfile new file mode 100644 index 00000000..5c272ed8 --- /dev/null +++ b/kubernetes/Dockerfile @@ -0,0 +1,22 @@ +FROM almalinux:9 + +ARG TIMESTAMP_IMAGE=gratia-probe-k8s:release-$(date +%Y%m%d-%H%M) +ENV GRATIA_PROBE_VERSION=$TIMESTAMP_IMAGE +ARG UID=10000 +ARG GID=10000 + +# Install EPEL, the OSG software base repo, and gratia-probe-common +RUN yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm && \ + yum install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el9-release-latest.rpm && \ + yum install -y gratia-probe-common python3-pip + +# Make probe runnable as non-root +RUN chown -R $UID:$GID /var/lock/gratia /var/lib/gratia/data /var/lib/gratia/tmp /var/log/gratia +WORKDIR /gratia +COPY requirements.txt /gratia +RUN pip install -r requirements.txt +COPY ProbeConfig /etc/gratia/kubernetes/ +COPY *.py /gratia/ + +USER $UID:$GID +CMD python3 kubernetes_meter.py diff --git a/kubernetes/ProbeConfig b/kubernetes/ProbeConfig new file mode 100644 index 00000000..841b0676 --- /dev/null +++ b/kubernetes/ProbeConfig @@ -0,0 +1,75 @@ + + + diff --git a/kubernetes/docker-compose.yaml b/kubernetes/docker-compose.yaml new file mode 100644 index 00000000..33d08582 --- /dev/null +++ b/kubernetes/docker-compose.yaml @@ -0,0 +1,10 @@ +services: + gratia-output: + image: hub.opensciencegrid.org/osgpreview/kapel-gratia-output:latest + build: + network: host + context: . + dockerfile: Dockerfile + volumes: + - /tmp/dirq/:/srv/kapel + - ./sample.env:/gratia/.env diff --git a/kubernetes/gratia_k8s_config.py b/kubernetes/gratia_k8s_config.py new file mode 100644 index 00000000..fc4705f2 --- /dev/null +++ b/kubernetes/gratia_k8s_config.py @@ -0,0 +1,29 @@ +# Configuration module for KAPEL + +from environs import Env + +# Read config settings from environment variables (and a named env file in CWD if specified), +# do input validation, and return a config object. Note, if a '.env' file exists in CWD it will be used by default. +class GratiaK8sConfig: + def __init__(self, envFile=None): + env = Env() + # Read a .env file if one is specified, otherwise only environment variables will be used. + env.read_env(envFile, recurse=False, verbose=True) + + # Where to write the APEL message output. + self.output_path = env.path("OUTPUT_PATH", "/srv/kapel") + + # infrastructure info + self.infrastructure_type = env.str("INFRASTRUCTURE_TYPE", "grid") + self.infrastructure_description = env.str("INFRASTRUCTURE_DESCRIPTION", "GRATIA-KUBERNETES") + + # optionally define number of nodes and processors. Should not be necessary to + # set a default of 0 here but see https://github.com/apel/apel/issues/241 + self.nodecount = env.int("NODECOUNT", 0) + self.processors = env.int("PROCESSORS", 0) + + + # Gratia config + self.gratia_config_path = env.str("GRATIA_CONFIG_PATH", "/etc/gratia/kubernetes/") + + self.gratia_probe_version = env.str("GRATIA_PROBE_VERSION", None) diff --git a/kubernetes/kubernetes_meter.py b/kubernetes/kubernetes_meter.py new file mode 100644 index 00000000..1b29bd33 --- /dev/null +++ b/kubernetes/kubernetes_meter.py @@ -0,0 +1,179 @@ +import argparse +import datetime + +from gratia_k8s_config import GratiaK8sConfig +from dirq.QueueSimple import QueueSimple +import os +from datetime import datetime, timezone +import re + +from gratia.common.Gratia import DebugPrint +from gratia.common.debug import DebugPrintTraceback +import gratia.common.GratiaCore as GratiaCore +import gratia.common.GratiaWrapper as GratiaWrapper +import gratia.common.Gratia as Gratia +import gratia.common.config as config + +probe_version = "%%%RPMVERSION%%%" + +probe_name = os.path.basename(os.path.dirname(os.path.abspath(__file__))) + +# log levels +CRITICAL = 0 +ERROR = 1 +WARNING = 2 +INFO = 3 +DEBUG = 4 + +# Gratia record constants +SECONDS = "Was entered in seconds" +LOCAL_USER = "osgvo-container-pilot" +USER = "user" +RESOURCE_TYPE = "Batch" + +BATCH_SIZE = 100 + +class ApelRecordConverter(): + apel_dict: dict + + def __init__(self, apel_record: bytes): + self._parse_apel_str(apel_record.decode()) + + def _parse_apel_str(self, apel_record: str): + self.apel_dict = {} + lines = apel_record.split('\n') + for line in lines: + kv_pair = [v.strip() for v in line.split(':')] + if len(kv_pair) == 2: + self.apel_dict[kv_pair[0]] = kv_pair[1] + + def getint(self, key): + return int(float(self.apel_dict.get(key, 0))) + + def get(self, key): + return self.apel_dict.get(key) + + + def site_probe(self): + site_dns = re.sub(r'[^a-zA-Z0-9-]', '-', self.get('Site')).strip('-') # sanitize site + return "kubernetes:%s.gratia.osg-htc.org" % site_dns + + def to_gratia_record(self): + # TODO the following fields are not currently tracked: + # memory, machine name, grid + r = Gratia.UsageRecord(RESOURCE_TYPE) + r.StartTime( self.getint('StartTime'), SECONDS) + r.MachineName (self.get('MachineName')) + r.LocalJobId( self.get('LocalJobId')) + r.EndTime( self.getint('EndTime'), SECONDS) + r.WallDuration(self.getint('WallDuration'), SECONDS) + r.CpuDuration( self.getint('CpuDuration'), USER, SECONDS) + r.Memory( self.getint('MemoryVirtual'), 'KB', description='RSS') + r.Processors( self.getint('Processors'), metric="max") + r.SiteName( self.get('Site')) + r.ProbeName( self.site_probe()) + r.Grid( self.get('InfrastructureType')) # Best guess + r.LocalUserId( LOCAL_USER) + r.VOName( self.get('VO')) + r.ReportableVOName(self.get('VO')) + return r + + @classmethod + def is_individual_record(cls, data:bytes): + return data.decode().startswith('APEL-individual-job-message:') + + +def send_gratia_records(records: list[ApelRecordConverter]): + # TODO the assumption of uniform site/probe might not be true + site = records[0].get('Site') + probe = records[0].site_probe() + + # GratiaCore.Initialize(gratia_config) + + config.Config.setSiteName(site) + config.Config.setMeterName(probe) + + GratiaCore.Handshake() + + try: + GratiaCore.SearchOutstandingRecord() + except Exception as e: + print(f"Failed to search outstanding records: {e}") + raise + + GratiaCore.Reprocess() + + for record in records: + resp = GratiaCore.Send(record.to_gratia_record()) + print(resp) + + GratiaCore.ProcessCurrentBundle() + + + + +def setup_gratia(config: GratiaK8sConfig): + + print(config.gratia_config_path) + if not config.gratia_config_path or not os.path.exists(config.gratia_config_path): + raise Exception("No valid gratia config path given") + GratiaCore.Config = GratiaCore.ProbeConfiguration(config.gratia_config_path) + + GratiaWrapper.CheckPreconditions() + GratiaWrapper.ExclusiveLock() + + # Register gratia + GratiaCore.RegisterReporter("kubernetes_meter") + GratiaCore.RegisterService("Kubernetes", config.gratia_probe_version) + GratiaCore.setProbeBatchManager("kubernetes") + + GratiaCore.Initialize(config.gratia_config_path) + +def batch_dirq(queue, batch_size): + """ batch the records in a dirq into groups of a fixed size """ + # TODO this is not the most elegant approach + records = [] + for name in queue: + records.append(name) + if len(records) >= batch_size: + yield records + records = [] + # Yield the last entries as well + yield records + +def main(envFile: str): + print(f'Starting Gratia post-processor: {__file__} with envFile {envFile} at {datetime.now(tz=timezone.utc).isoformat()}') + with open(envFile) as envf: + print(f'===== envFile contents: =====\n{envf.read()}') + + cfg = GratiaK8sConfig(envFile) + + setup_gratia(cfg) + + dirq = QueueSimple(str(cfg.output_path)) + for batch in batch_dirq(dirq, BATCH_SIZE): + apel_records = [] + for name in batch: + if not dirq.lock(name): + continue + data = dirq.get(name) + if ApelRecordConverter.is_individual_record(data): + apel_records.append(ApelRecordConverter(data)) + dirq.unlock(name) + if apel_records: + send_gratia_records(apel_records) + # Clear out the chunk of the queue if all the sends succeed + for name in batch: + if not dirq.lock(name): + continue + dirq.remove(name) + print("done") + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.") + # This should be the only CLI argument, since all other config should be specified via env. + parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration") + args = parser.parse_args() + main(args.env_file) diff --git a/kubernetes/requirements.txt b/kubernetes/requirements.txt new file mode 100644 index 00000000..6da4c936 --- /dev/null +++ b/kubernetes/requirements.txt @@ -0,0 +1,4 @@ +# Useful for handling configuration +environs==11.0.0 +# Useful for adding messages to outgoing queue +dirq==1.8 diff --git a/kubernetes/sample.env b/kubernetes/sample.env new file mode 100644 index 00000000..e5aca167 --- /dev/null +++ b/kubernetes/sample.env @@ -0,0 +1,7 @@ +NAMESPACE=OSG +SITE_NAME=OSG +SUBMIT_HOST=OSG +BENCHMARK_VALUE=5 +VO_NAME=OSG + +GRATIA_CONFIG_PATH=/etc/gratia/kubernetes/ProbeConfig