Skip to content

Commit

Permalink
Merge pull request #7 from aperim/5-correct-user-and-group-id
Browse files Browse the repository at this point in the history
5 correct user and group
  • Loading branch information
troykelly authored Oct 6, 2024
2 parents c4ce72f + 978ce7e commit 19fd559
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 87 deletions.
81 changes: 54 additions & 27 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
# Troy Kelly <[email protected]>
#
# Code History:
# - 2024-10-06: Initial creation.
# - 2023-10-06: Initial creation.
# - 2023-10-06: Modified to honor PUID=1 and PGID=1 to run as root.

set -e
set -u
Expand All @@ -30,39 +31,65 @@ main() {
PUID="${PUID:-1000}"
PGID="${PGID:-1000}"

# Get the current UID and GID of the 'cleanup' user and group.
CURRENT_UID=$(id -u cleanup)
CURRENT_GID=$(id -g cleanup)
# If PUID and PGID are 1, we assume the intent is to run as root.
if [ "$PUID" -eq 1 ] && [ "$PGID" -eq 1 ]; then
echo "PUID and PGID are set to 1. Running as root."

# If the current UID does not match PUID, update it.
if [ "$CURRENT_UID" -ne "$PUID" ]; then
echo "Updating UID of 'cleanup' from $CURRENT_UID to $PUID"
usermod -u "$PUID" cleanup
fi
# Change to the application directory.
cd /app

# If the current GID does not match PGID, update it.
if [ "$CURRENT_GID" -ne "$PGID" ]; then
echo "Updating GID of 'cleanup' from $CURRENT_GID to $PGID"
groupmod -g "$PGID" cleanup
fi
# If no arguments are provided or the first argument starts with '-', default to running acme_cleanup.py.
if [ "$#" -eq 0 ] || [ "$(printf '%s' "$1" | cut -c1)" = "-" ]; then
# Ensure required environment variables are set.
if [ -z "${TRAEFIK_DASHBOARD_URL:-}" ]; then
echo "Error: TRAEFIK_DASHBOARD_URL environment variable is not set."
exit 1
fi

# If no arguments are provided or the first argument starts with '-', default to running acme_cleanup.py.
if [ "$#" -eq 0 ] || [ "$(printf '%s' "$1" | cut -c1)" = "-" ]; then
# Ensure required environment variables are set.
if [ -z "${TRAEFIK_DASHBOARD_URL:-}" ]; then
echo "Error: TRAEFIK_DASHBOARD_URL environment variable is not set."
exit 1
# Prepend the Python command and script name.
set -- /usr/bin/env python3 /app/acme_cleanup.py "$@"
fi

# Prepend the Python command and script name.
set -- /usr/bin/env python3 /app/acme_cleanup.py "$@"
fi
# Execute the command as root.
exec "$@"
else
# Get the current UID and GID of the 'cleanup' user and group.
CURRENT_UID=$(id -u cleanup)
CURRENT_GID=$(id -g cleanup)

# If the current UID does not match PUID, update it.
if [ "$CURRENT_UID" -ne "$PUID" ]; then
echo "Updating UID of 'cleanup' from $CURRENT_UID to $PUID"
usermod -u "$PUID" cleanup
fi

# If the current GID does not match PGID, update it.
if [ "$CURRENT_GID" -ne "$PGID" ]; then
echo "Updating GID of 'cleanup' from $CURRENT_GID to $PGID"
groupmod -g "$PGID" cleanup
fi

# Ensure 'cleanup' owns its home directory and application directory.
chown -R cleanup:cleanup /home/cleanup /app

# If no arguments are provided or the first argument starts with '-', default to running acme_cleanup.py.
if [ "$#" -eq 0 ] || [ "$(printf '%s' "$1" | cut -c1)" = "-" ]; then
# Ensure required environment variables are set.
if [ -z "${TRAEFIK_DASHBOARD_URL:-}" ]; then
echo "Error: TRAEFIK_DASHBOARD_URL environment variable is not set."
exit 1
fi

# Change to the application directory.
cd /app
# Prepend the Python command and script name.
set -- /usr/bin/env python3 /app/acme_cleanup.py "$@"
fi

# Change to the application directory.
cd /app

# Execute the command as the 'cleanup' user.
exec su-exec cleanup "$@"
# Execute the command as the 'cleanup' user.
exec su-exec cleanup "$@"
fi
}

# Invoke the main function with all script arguments.
Expand Down
62 changes: 28 additions & 34 deletions src/acme_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
Troy Kelly <[email protected]>
Code History:
- 2024-10-06: Initial creation.
- 2024-10-06: Fixed certificate decoding issue.
- 2024-10-06: Resolved DeprecationWarning for datetime.utcnow().
- 2024-10-06: Added markdown report generation functionality.
- 2024-10-06: Implemented Traefik certificate in-use checking.
- 2024-10-06: Improved pagination handling and in-use domain matching.
- 2024-10-06: Added unsatisfied certificates table to report.
- 2024-10-06: Included router name and service in unsatisfied domains report.
- 2024-10-06: Resolved JSON serialization error by avoiding modification of acme_data.
- 2023-10-06: Initial creation.
- 2023-10-06: Fixed certificate decoding issue.
- 2023-10-06: Resolved DeprecationWarning for datetime.utcnow().
- 2023-10-06: Added markdown report generation functionality.
- 2023-10-06: Implemented Traefik certificate in-use checking.
- 2023-10-06: Improved pagination handling and in-use domain matching.
- 2023-10-06: Added unsatisfied certificates table to report.
- 2023-10-06: Included router name and service in unsatisfied domains report.
- 2023-10-06: Resolved JSON serialization error by avoiding modification of acme_data.
- 2023-10-06: Updated in-use domain matching to consider entire domain sets.
"""

Expand All @@ -32,7 +33,7 @@
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Set, Optional
from typing import Any, Dict, List, Set

import requests
from OpenSSL import crypto
Expand Down Expand Up @@ -78,9 +79,9 @@ def __init__(self, args: argparse.Namespace) -> None:
self.acme_data: Dict[str, Any] = {}
self.analyzed_certs: List[Dict[str, Any]] = []
self.certs_to_remove: List[Dict[str, Any]] = []
self.in_use_domains: Dict[str, List[Dict[str, str]]] = {}
self.in_use_domain_sets: List[Set[str]] = []
self.certificate_domains: Set[str] = set()
self.unsatisfied_domains: Dict[str, List[Dict[str, str]]] = {}
self.unsatisfied_domains: Set[str] = set()

@staticmethod
def str_to_bool(value: str) -> bool:
Expand Down Expand Up @@ -122,15 +123,16 @@ def check_dashboard_access(self) -> None:
sys.exit(1)

def load_in_use_domains(self) -> None:
"""Load in-use domains from Traefik API."""
logging.info('Fetching in-use domains from Traefik API')
"""Load in-use domain sets from Traefik API."""
logging.info('Fetching in-use domain sets from Traefik API')
traefik_api = TraefikAPI(
base_url=self.dashboard_url,
username=self.dashboard_username,
password=self.dashboard_password
)
self.in_use_domains = traefik_api.get_tls_domains()
logging.info(f'Found {len(self.in_use_domains)} in-use domains')
self.in_use_domain_sets = traefik_api.get_tls_domain_sets()
logging.info(
f'Found {len(self.in_use_domain_sets)} in-use domain sets')

def load_acme_file(self) -> None:
"""Load and parse the acme.json file."""
Expand All @@ -149,7 +151,6 @@ def analyse_certificates(self) -> None:
"""Analyse certificates in the acme.json file."""
logging.info('Analysing certificates in acme.json')
now = datetime.now(timezone.utc)
in_use_domains_set = set(self.in_use_domains.keys())
for resolver_name, resolver in self.acme_data.items():
certificates = resolver.get('Certificates', [])
for cert_entry in certificates:
Expand Down Expand Up @@ -192,11 +193,11 @@ def analyse_certificates(self) -> None:
analysis['status'] = 'expired'
else:
analysis['status'] = 'valid'
# Determine if certificate is in use
domains_in_use = all(
domain in in_use_domains_set for domain in all_domains
# Determine if certificate is in use by exact domain set matching
cert_domains_set = set(all_domains)
analysis['in_use'] = any(
cert_domains_set == in_use_set for in_use_set in self.in_use_domain_sets
)
analysis['in_use'] = domains_in_use
except (crypto.Error, ValueError, UnicodeDecodeError) as e:
logging.error(
f'Invalid certificate for domain {main_domain}: {e}'
Expand All @@ -205,10 +206,9 @@ def analyse_certificates(self) -> None:
analysis['in_use'] = False
self.analyzed_certs.append(analysis)
# Identify unsatisfied domains
in_use_domains_set = set().union(*self.in_use_domain_sets)
unsatisfied_domains_set = in_use_domains_set - self.certificate_domains
self.unsatisfied_domains = {
domain: self.in_use_domains[domain] for domain in unsatisfied_domains_set
}
self.unsatisfied_domains = unsatisfied_domains_set
logging.info(
f'Found {len(self.unsatisfied_domains)} unsatisfied domains')

Expand Down Expand Up @@ -314,16 +314,10 @@ def generate_report(self) -> None:
report_lines.append(
'The following domains are used in Traefik routers with TLS but do not have corresponding certificates in acme.json:\n'
)
report_lines.append('| Domain | Routers | Services |')
report_lines.append('|--------|---------|----------|')
for domain, routers_info in self.unsatisfied_domains.items():
router_names = ', '.join(
{info['name'] for info in routers_info})
services = ', '.join({info['service']
for info in routers_info})
report_lines.append(
f'| {domain} | {router_names} | {services} |'
)
report_lines.append('| Domain |')
report_lines.append('|--------|')
for domain in sorted(self.unsatisfied_domains):
report_lines.append(f'| {domain} |')
report_lines.append('\n')
# Group certificates by resolver
resolvers: Dict[str, List[Dict[str, Any]]] = {}
Expand Down
53 changes: 27 additions & 26 deletions src/traefik_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
Troy Kelly <[email protected]>
Code History:
- 2024-10-06: Initial creation.
- 2024-10-06: Added pagination handling and improved domain extraction.
- 2024-10-06: Updated pagination to handle Traefik's non-standard implementation.
- 2024-10-06: Modified get_tls_domains to map domains to router info.
- 2023-10-06: Initial creation.
- 2023-10-06: Added pagination handling and improved domain extraction.
- 2023-10-06: Updated pagination to handle Traefik's non-standard implementation.
- 2023-10-06: Modified get_tls_domains to map domains to router info.
- 2023-10-06: Added get_tls_domain_sets method.
"""

import logging
import re
from typing import Any, Dict, List
from typing import Any, Dict, List, Set

import requests

Expand All @@ -44,37 +45,35 @@ def get_routers(self) -> List[Dict[str, Any]]:
try:
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
routers.extend(response.json())
x_next_page = response.headers.get('X-Next-Page')
# Check if there is another page and it is greater than the current page
if x_next_page and int(x_next_page) > page:
page = int(x_next_page)
else:
page_data = response.json()
# Check if data is empty
if not page_data:
break
routers.extend(page_data)
# Traefik's pagination may not provide Next-Page header
if len(page_data) < per_page:
break
page += 1
except requests.RequestException as e:
logging.error(f'Error fetching routers from Traefik API: {e}')
break
return routers

def get_tls_domains(self) -> Dict[str, List[Dict[str, str]]]:
"""Get domains from routers that have TLS configured.
def get_tls_domain_sets(self) -> List[Set[str]]:
"""Get sets of domains from routers that have TLS configured.
Returns:
A dictionary mapping domains to a list of router info dictionaries, each containing 'name' and 'service'.
A list of sets, each containing the domains used in a router's rule.
"""
routers = self.get_routers()
domains_map: Dict[str, List[Dict[str, str]]] = {}
domain_sets = []
for router in routers:
if 'tls' in router:
rule = router.get('rule', '')
router_name = router.get('name', 'Unknown')
service = router.get('service', 'Unknown')
# Parse rule to extract domains
extracted_domains = self.extract_domains_from_rule(rule)
for domain in extracted_domains:
domain_info = {'name': router_name, 'service': service}
domains_map.setdefault(domain, []).append(domain_info)
return domains_map
domains = self.extract_domains_from_rule(rule)
if domains:
domain_sets.append(set(domains))
return domain_sets

@staticmethod
def extract_domains_from_rule(rule: str) -> List[str]:
Expand All @@ -89,9 +88,11 @@ def extract_domains_from_rule(rule: str) -> List[str]:
domains = []
# Remove any negations and path prefixes/suffixes
rule = re.sub(r'!\s*PathPrefix\([^\)]*\)', '', rule)
# Find all Host(`domain`) patterns
host_matches = re.findall(r'Host\((`[^`]+`(?:,\s*`[^`]+`)*)\)', rule)
for match in host_matches:
# Find all Host(`domain`), HostSNI(`domain`), and HostRegexp(`domain`) patterns
host_pattern = re.compile(
r'(Host|HostSNI|HostRegexp)\((`[^`]+`(?:,\s*`[^`]+`)*)\)')
matches = host_pattern.findall(rule)
for _, match in matches:
# Extract multiple hosts if present
hosts = re.findall(r'`([^`]+)`', match)
domains.extend(hosts)
Expand Down

0 comments on commit 19fd559

Please sign in to comment.