Skip to content

Commit

Permalink
chore: update charm libraries (#518)
Browse files Browse the repository at this point in the history
Fix utest

Co-authored-by: Github Actions <[email protected]>
  • Loading branch information
observability-noctua-bot and Github Actions authored Sep 23, 2023
1 parent ca30942 commit 656fdbe
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 24 deletions.
27 changes: 23 additions & 4 deletions lib/charms/observability_libs/v0/cert_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
This library requires a peer relation to be declared in the requirer's metadata. Peer relation data
is used for "persistent storage" of the private key and certs.
"""
import ipaddress
import json
import socket
from itertools import filterfalse
from typing import List, Optional, Union

try:
Expand Down Expand Up @@ -62,7 +64,16 @@

LIBID = "b5cd5cd580f3428fa5f59a8876dcbe6a"
LIBAPI = 0
LIBPATCH = 7
LIBPATCH = 8


def is_ip_address(value: str) -> bool:
"""Return True if the input value is a valid IPv4 address; False otherwise."""
try:
ipaddress.IPv4Address(value)
return True
except ipaddress.AddressValueError:
return False


class CertChanged(EventBase):
Expand All @@ -88,7 +99,7 @@ def __init__(
peer_relation_name: str,
certificates_relation_name: str = "certificates",
cert_subject: Optional[str] = None,
extra_sans_dns: Optional[List[str]] = None,
extra_sans_dns: Optional[List[str]] = None, # TODO: in v1, rename arg to `sans`
):
"""CertHandler is used to wrap TLS Certificates management operations for charms.
Expand All @@ -111,7 +122,9 @@ def __init__(
self.cert_subject = charm.unit.name.replace("/", "-") if not cert_subject else cert_subject

# Use fqdn only if no SANs were given, and drop empty/duplicate SANs
self.sans_dns = list(set(filter(None, (extra_sans_dns or [socket.getfqdn()]))))
sans = list(set(filter(None, (extra_sans_dns or [socket.getfqdn()]))))
self.sans_ip = list(filter(is_ip_address, sans))
self.sans_dns = list(filterfalse(is_ip_address, sans))

self.peer_relation_name = peer_relation_name
self.certificates_relation_name = certificates_relation_name
Expand Down Expand Up @@ -229,6 +242,7 @@ def _generate_csr(
private_key=private_key.encode(),
subject=self.cert_subject,
sans_dns=self.sans_dns,
sans_ip=self.sans_ip,
)

if renew and self._csr:
Expand All @@ -237,7 +251,12 @@ def _generate_csr(
new_certificate_signing_request=csr,
)
else:
logger.info("Creating CSR for %s with DNS %s", self.cert_subject, self.sans_dns)
logger.info(
"Creating CSR for %s with DNS %s and IPs %s",
self.cert_subject,
self.sans_dns,
self.sans_ip,
)
self.certificates.request_certificate_creation(certificate_signing_request=csr)

# Note: CSR is being replaced with a new one, so until we get the new cert, we'd have
Expand Down
6 changes: 3 additions & 3 deletions lib/charms/tempo_k8s/v0/charm_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def tracer(self) -> opentelemetry.trace.Tracer:

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2
LIBPATCH = 3

PYDEPS = ["opentelemetry-exporter-otlp-proto-grpc==1.17.0"]

Expand Down Expand Up @@ -313,7 +313,7 @@ def trace_charm(
method calls on instances of this class.
Usage:
>>> from charms.tempo_k8s.v0.charm_instrumentation import trace_charm
>>> from charms.tempo_k8s.v0.charm_tracing import trace_charm
>>> from charms.tempo_k8s.v0.tracing import TracingEndpointProvider
>>> from ops import CharmBase
>>>
Expand Down Expand Up @@ -370,7 +370,7 @@ def _autoinstrument(
Usage:
>>> from charms.tempo_k8s.v0.charm_instrumentation import _autoinstrument
>>> from charms.tempo_k8s.v0.charm_tracing import _autoinstrument
>>> from ops.main import main
>>> _autoinstrument(
>>> MyCharm,
Expand Down
83 changes: 67 additions & 16 deletions lib/charms/tls_certificates_interface/v2/tls_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven
)
from ops.framework import EventBase, EventSource, Handle, Object
from ops.jujuversion import JujuVersion
from ops.model import SecretNotFoundError
from ops.model import Relation, SecretNotFoundError

# The unique Charmhub library identifier, never change it
LIBID = "afd8c2bccf834997afce12c2706d2ede"
Expand All @@ -308,7 +308,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 12
LIBPATCH = 16

PYDEPS = ["cryptography", "jsonschema"]

Expand Down Expand Up @@ -640,6 +640,17 @@ def generate_ca(
private_key_object.public_key() # type: ignore[arg-type]
)
subject_identifier = key_identifier = subject_identifier_object.public_bytes()
key_usage = x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
key_cert_sign=True,
key_agreement=False,
content_commitment=False,
data_encipherment=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
Expand All @@ -657,6 +668,7 @@ def generate_ca(
),
critical=False,
)
.add_extension(key_usage, critical=True)
.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
Expand Down Expand Up @@ -689,7 +701,8 @@ def generate_certificate(
"""
csr_object = x509.load_pem_x509_csr(csr)
subject = csr_object.subject
issuer = x509.load_pem_x509_certificate(ca).issuer
ca_pem = x509.load_pem_x509_certificate(ca)
issuer = ca_pem.issuer
private_key = serialization.load_pem_private_key(ca_key, password=ca_key_password)

certificate_builder = (
Expand All @@ -700,6 +713,20 @@ def generate_certificate(
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=validity))
.add_extension(
x509.AuthorityKeyIdentifier(
key_identifier=ca_pem.extensions.get_extension_for_class(
x509.SubjectKeyIdentifier
).value.key_identifier,
authority_cert_issuer=None,
authority_cert_serial_number=None,
),
critical=False,
)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(csr_object.public_key()), critical=False
)
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=False)
)

extensions_list = csr_object.extensions
Expand Down Expand Up @@ -730,6 +757,7 @@ def generate_certificate(
extension.value,
critical=extension.critical,
)

certificate_builder._version = x509.Version.v3
cert = certificate_builder.sign(private_key, hashes.SHA256()) # type: ignore[arg-type]
return cert.public_bytes(serialization.Encoding.PEM)
Expand Down Expand Up @@ -827,7 +855,7 @@ def generate_csr(
sans_oid (list): List of registered ID SANs
sans_dns (list): List of DNS subject alternative names (similar to the arg: sans)
sans_ip (list): List of IP subject alternative names
additional_critical_extensions (list): List if critical additional extension objects.
additional_critical_extensions (list): List of critical additional extension objects.
Object must be a x509 ExtensionType.
Returns:
Expand Down Expand Up @@ -897,6 +925,22 @@ def __init__(self, charm: CharmBase, relationship_name: str):
self.charm = charm
self.relationship_name = relationship_name

def _load_app_relation_data(self, relation: Relation) -> dict:
"""Loads relation data from the application relation data bag.
Json loads all data.
Args:
relation_object: Relation data from the application databag
Returns:
dict: Relation data in dict format.
"""
# If unit is not leader, it does not try to reach relation data.
if not self.model.unit.is_leader():
return {}
return _load_relation_data(relation.data[self.charm.app])

def _add_certificate(
self,
relation_id: int,
Expand Down Expand Up @@ -931,7 +975,7 @@ def _add_certificate(
"ca": ca,
"chain": chain,
}
provider_relation_data = _load_relation_data(relation.data[self.charm.app])
provider_relation_data = self._load_app_relation_data(relation)
provider_certificates = provider_relation_data.get("certificates", [])
certificates = copy.deepcopy(provider_certificates)
if new_certificate in certificates:
Expand Down Expand Up @@ -964,7 +1008,7 @@ def _remove_certificate(
raise RuntimeError(
f"Relation {self.relationship_name} with relation id {relation_id} does not exist"
)
provider_relation_data = _load_relation_data(relation.data[self.charm.app])
provider_relation_data = self._load_app_relation_data(relation)
provider_certificates = provider_relation_data.get("certificates", [])
certificates = copy.deepcopy(provider_certificates)
for certificate_dict in certificates:
Expand Down Expand Up @@ -999,7 +1043,7 @@ def revoke_all_certificates(self) -> None:
This method is meant to be used when the Root CA has changed.
"""
for relation in self.model.relations[self.relationship_name]:
provider_relation_data = _load_relation_data(relation.data[self.charm.app])
provider_relation_data = self._load_app_relation_data(relation)
provider_certificates = copy.deepcopy(provider_relation_data.get("certificates", []))
for certificate in provider_certificates:
certificate["revoked"] = True
Expand Down Expand Up @@ -1081,7 +1125,7 @@ def get_issued_certificates(
else self.model.relations.get(self.relationship_name, [])
)
for relation in relations:
provider_relation_data = _load_relation_data(relation.data[self.charm.app])
provider_relation_data = self._load_app_relation_data(relation)
provider_certificates = provider_relation_data.get("certificates", [])

certificates[relation.app.name] = [] # type: ignore[union-attr]
Expand Down Expand Up @@ -1111,9 +1155,13 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
Returns:
None
"""
assert event.unit is not None
if event.unit is None:
logger.error("Relation_changed event does not have a unit.")
return
if not self.model.unit.is_leader():
return
requirer_relation_data = _load_relation_data(event.relation.data[event.unit])
provider_relation_data = _load_relation_data(event.relation.data[self.charm.app])
provider_relation_data = self._load_app_relation_data(event.relation)
if not self._relation_data_is_valid(requirer_relation_data):
logger.debug("Relation data did not pass JSON Schema validation")
return
Expand Down Expand Up @@ -1152,7 +1200,7 @@ def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None
)
if not certificates_relation:
raise RuntimeError(f"Relation {self.relationship_name} does not exist")
provider_relation_data = _load_relation_data(certificates_relation.data[self.charm.app])
provider_relation_data = self._load_app_relation_data(certificates_relation)
list_of_csrs: List[str] = []
for unit in certificates_relation.units:
requirer_relation_data = _load_relation_data(certificates_relation.data[unit])
Expand Down Expand Up @@ -1184,16 +1232,19 @@ def get_requirer_csrs_with_no_certs(
that don't have a certificate issued.
"""
all_unit_csr_mappings = copy.deepcopy(self.get_requirer_csrs(relation_id=relation_id))
filtered_all_unit_csr_mappings: List[Dict[str, Union[int, str, List[Dict[str, str]]]]] = []
for unit_csr_mapping in all_unit_csr_mappings:
csrs_without_certs = []
for csr in unit_csr_mapping["unit_csrs"]: # type: ignore[union-attr]
if self.certificate_issued_for_csr(
if not self.certificate_issued_for_csr(
app_name=unit_csr_mapping["application_name"], # type: ignore[arg-type]
csr=csr["certificate_signing_request"], # type: ignore[index]
):
unit_csr_mapping["unit_csrs"].remove(csr) # type: ignore[union-attr, arg-type]
if len(unit_csr_mapping["unit_csrs"]) == 0: # type: ignore[arg-type]
all_unit_csr_mappings.remove(unit_csr_mapping)
return all_unit_csr_mappings
csrs_without_certs.append(csr)
if csrs_without_certs:
unit_csr_mapping["unit_csrs"] = csrs_without_certs # type: ignore[assignment]
filtered_all_unit_csr_mappings.append(unit_csr_mapping)
return filtered_all_unit_csr_mappings

def get_requirer_csrs(
self, relation_id: Optional[int] = None
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def test_returns_errors_on_bad_rule_file(self):
}
)
self.assertEqual(valid, False)
self.assertIn("error validating:", errs)
self.assertIn("error validating", errs)

@unittest.mock.patch("platform.machine", lambda: "x86_64")
def test_successfully_validates_good_alert_rules(self):
Expand Down

0 comments on commit 656fdbe

Please sign in to comment.