Skip to content

Commit

Permalink
feat: cleanup and move functionality around
Browse files Browse the repository at this point in the history
  • Loading branch information
johnson2427 committed May 1, 2024
1 parent 9ff7a91 commit 207089b
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 44 deletions.
31 changes: 7 additions & 24 deletions ape_aws/accounts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any

from typing import Iterator, List, Optional
from typing import Iterator, Optional

from eth_account.messages import _hash_eip191_message, encode_defunct
from eth_account._utils.legacy_transactions import serializable_unsigned_transaction_from_dict
Expand All @@ -11,29 +11,18 @@
from ape.types import AddressType, MessageSignature, SignableMessage, TransactionSignature
from ape.utils import cached_property

from .utils import AliasResponse, _convert_der_to_rsv
from .utils import _convert_der_to_rsv
from .client import kms_client


class AwsAccountContainer(AccountContainerAPI):

@cached_property
def raw_aliases(self) -> List[AliasResponse]:
paginator = kms_client.client.get_paginator('list_aliases')
pages = paginator.paginate()
return [
AliasResponse(**page)
for alias_data in pages
for page in alias_data['Aliases']
if "alias/aws" not in page["AliasName"]
]

@property
def aliases(self) -> Iterator[str]:
return map(lambda x: x.alias, self.raw_aliases)
return map(lambda x: x.alias, kms_client.raw_aliases)

def __len__(self) -> int:
return len(self.raw_aliases)
return len(kms_client.raw_aliases)

@property
def accounts(self) -> Iterator[AccountAPI]:
Expand All @@ -43,7 +32,7 @@ def accounts(self) -> Iterator[AccountAPI]:
key_id=x.key_id,
key_arn=x.arn,
),
self.raw_aliases
kms_client.raw_aliases
)


Expand All @@ -54,7 +43,7 @@ class KmsAccount(AccountAPI):

@cached_property
def public_key(self):
return kms_client.client.get_public_key(KeyId=self.key_id)["PublicKey"]
return kms_client.get_public_key(self.key_id)

@cached_property
def address(self) -> AddressType:
Expand All @@ -63,13 +52,7 @@ def address(self) -> AddressType:
)

def _sign_raw_hash(self, msghash: HexBytes) -> Optional[bytes]:
response = kms_client.client.sign(
KeyId=self.key_id,
Message=msghash,
MessageType='DIGEST',
SigningAlgorithm='ECDSA_SHA_256',
)
return response.get('Signature')
return kms_client.sign(self.key_id, msghash)

def sign_raw_msghash(self, msghash: HexBytes) -> Optional[MessageSignature]:
if len(msghash) != 32:
Expand Down
67 changes: 61 additions & 6 deletions ape_aws/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from typing import List

import boto3

from pydantic import BaseModel
from ape.utils import cached_property

from .utils import AliasResponse


class Client(BaseModel):
client_name: str
_client: boto3.client = None
class Client:

def __init__(self, client_name: str):
self.client_name = client_name
self._client = None

@property
def client(self):
Expand All @@ -14,5 +20,54 @@ def client(self):
return self._client


kms_client = Client(client_name='kms')
iam_client = Client(client_name='iam')
class KmsClient(Client):

def __init__(self):
super().__init__(client_name='kms')

@cached_property
def raw_aliases(self) -> List[AliasResponse]:
paginator = self.client.get_paginator('list_aliases')
pages = paginator.paginate()
return [
AliasResponse(**page)
for alias_data in pages
for page in alias_data['Aliases']
if "alias/aws" not in page["AliasName"]
]

def get_public_key(self, key_id: str):
return self.client.get_public_key(KeyId=key_id)["PublicKey"]

def sign(self, key_id, msghash):
response = self.client.sign(
KeyId=key_id,
Message=msghash,
MessageType='DIGEST',
SigningAlgorithm='ECDSA_SHA_256',
)
return response.get('Signature')


class IamClient(Client):

def __init__(self):
super().__init__(client_name='iam')

def list_users(self):
result = iam_client.client.list_users()
return result.get('Users')

def list_admins(self):
admins = []
for user in self.list_users():
user_name = user['UserName']
user_policies = self.client.list_attached_user_policies(UserName=user_name)
for policy in user_policies['AttachedPolicies']:
if policy['PolicyName'] == 'AdministratorAccess':
admins.append(user_name)
return admins


kms_client = KmsClient()
iam_client = IamClient()
22 changes: 8 additions & 14 deletions ape_aws/iam/_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import click

from ape.cli import ape_cli_context

from ape_aws.client import iam_client


Expand All @@ -9,20 +11,12 @@ def iam():


@iam.command()
def list_admins():
response = iam_client.client.list_users()
admins = []
for user in response['Users']:
user_name = user['UserName']
user_policies = iam_client.client.list_attached_user_policies(UserName=user_name)
for policy in user_policies['AttachedPolicies']:
if policy['PolicyName'] == 'AdministratorAccess':
admins.append(user_name)

click.echo(f'Administrators: {admins}')
@ape_cli_context()
def list_admins(cli_ctx):
cli_ctx.logger.success(f'Administrators: {iam_client.list_admins()}')


@iam.command()
def list_users():
response = iam_client.client.list_users()
click.echo(f'Users: {response.get("Users")}')
@ape_cli_context()
def list_users(cli_ctx):
cli_ctx.logger.success(f'Users: {iam_client.list_users()}')

0 comments on commit 207089b

Please sign in to comment.