-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
176 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# Copyright (C) 2024, Hadron Industries, Inc. | ||
# Carthage is free software; you can redistribute it and/or modify | ||
# it under the terms of the GNU Lesser General Public License version 3 | ||
# as published by the Free Software Foundation. It is distributed | ||
# WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the file | ||
# LICENSE for details. | ||
|
||
import asyncio | ||
from carthage import * | ||
import carthage.pki | ||
from . import Vault | ||
|
||
__all__ = [] | ||
|
||
async def run_in_executor(func, *args): | ||
return await asyncio.get_event_loop().run_in_executor(None, func, *args) | ||
|
||
@inject_autokwargs( | ||
vault=Vault) | ||
class VaultPkiManager(carthage.pki.PkiManager): | ||
|
||
''' | ||
A PKI manager mounted corresponding to a vault pki secrets engine. | ||
''' | ||
path:str = 'pki' #: Path at which secrets engine is mounted | ||
role:str = None #:Role to issue against. | ||
|
||
def __init__(self, *, path=None, role=None, **kwargs): | ||
if path is not None: | ||
self.path = Path | ||
if role is None and self.role is None: | ||
raise TypeError('role must be set in the constructor or subclass') | ||
if role: | ||
self.role = role | ||
super().__init__(**kwargs) | ||
|
||
async def issue_credentials(self, hostname:str, tag:str): | ||
def cb(): | ||
return self.vault.client.write( | ||
f'{self.path}/issue/{self.role}', | ||
common_name=hostname | ||
) | ||
result = await run_in_executor(cb) | ||
key = result['data']['private_key'] | ||
cert = result['data']['certificate'] | ||
return key, cert | ||
|
||
async def trust_store(self): | ||
def cb(): | ||
res = self.vault.client.read(f'{self.path}/ca/pem') | ||
return res.text | ||
ca_pem = await run_in_executor(cb) | ||
return await self.ainjector( | ||
carthage.pki.SimpleTrustStore, | ||
'vault_'+self.path, | ||
dict(ca=ca_pem)) | ||
|
||
async def certificates(self): | ||
def cb_list(): | ||
res = self.vault.client.list(f'{self.path}/certs') | ||
return res['data']['keys'] | ||
def cb_cert(key): | ||
res = self.vault.client.read(f'{self.path}/cert/{key}') | ||
return res['data']['certificate'] | ||
|
||
for key in await run_in_executor(cb_list): | ||
yield await run_in_executor(cb_cert, key) | ||
|
||
__all__ += ['VaultPkiManager'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# Copyright (C) 2024, Hadron Industries, Inc. | ||
# Carthage is free software; you can redistribute it and/or modify | ||
# it under the terms of the GNU Lesser General Public License version 3 | ||
# as published by the Free Software Foundation. It is distributed | ||
# WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the file | ||
# LICENSE for details. | ||
|
||
import asyncio | ||
import pytest | ||
import sys | ||
import time | ||
from carthage.pytest import * | ||
from carthage import * | ||
from carthage.vault import * | ||
|
||
|
||
|
||
|
||
@pytest.fixture(scope='module') | ||
def vault(): | ||
''' | ||
Start a vault in dev mode | ||
''' | ||
try: | ||
from sh import vault as vault_cmd | ||
except Exception: | ||
pytest.skip('vault not installed') | ||
vault_proc = vault_cmd('server', '-dev', _bg=True, _bg_exc=False, _out=sys.stdout, | ||
_err_to_out=True) | ||
time.sleep(1) | ||
yield | ||
vault_proc.kill() | ||
|
||
@pytest.fixture() | ||
def ainjector(ainjector, vault): | ||
ainjector.add_provider(ConfigLayout) | ||
cl = ainjector.get_instance(ConfigLayout) | ||
cl.vault.address = 'http://127.0.0.1:8200/' | ||
ainjector.add_provider(Vault) | ||
vault = ainjector.get_instance(Vault) | ||
vault.apply_config( | ||
{ | ||
'secrets':dict( | ||
pki={'type':'pki'}), | ||
'pki/root/generate/internal': dict( | ||
ttl='120h', | ||
common_name='ROOT'), | ||
'pki/roles/role': dict( | ||
allow_any_name=True, | ||
ttl='30h' | ||
), | ||
}) | ||
class pki(VaultPkiManager): | ||
role = 'role' | ||
ainjector.add_provider(pki) | ||
yield ainjector | ||
|
||
@async_test | ||
async def test_pki_issue(ainjector): | ||
pki = await ainjector.get_instance_async(VaultPkiManager) | ||
await pki.issue_credentials('evil.com', 'tag') | ||
|
||
|
||
@async_test | ||
async def test_pki_certs(ainjector): | ||
pki = await ainjector.get_instance_async(VaultPkiManager) | ||
key_1, cert_1 = await pki.issue_credentials('internet.com', 'tag') | ||
key_2,cert_2 = await pki.issue_credentials('dns.net', 'tag') | ||
certs = [c async for c in pki.certificates()] | ||
assert cert_1 in certs | ||
assert cert_2 in certs | ||
|
||
@async_test | ||
async def test_trust_store(ainjector): | ||
pki = await ainjector.get_instance_async(VaultPkiManager) | ||
trust_store = await pki.trust_store() | ||
certs = [c async for c in trust_store.trusted_certificates()] | ||
await trust_store.ca_file() | ||
|