diff --git a/tools/aws-pentest-tools/aws_escalate.py b/tools/aws-pentest-tools/aws_escalate.py index 1c52f98..0252041 100644 --- a/tools/aws-pentest-tools/aws_escalate.py +++ b/tools/aws-pentest-tools/aws_escalate.py @@ -3,42 +3,51 @@ import boto3, argparse, os, sys, json, time from botocore.exceptions import ClientError -def main(args): - access_key_id = args.access_key_id - secret_access_key = args.secret_key - session_token = args.session_token - - if args.access_key_id is None or args.secret_key is None: - print('IAM keys not passed in as arguments, enter them below:') - access_key_id = input(' Access Key ID: ') - secret_access_key = input(' Secret Access Key: ') - session_token = input(' Session Token (Leave blank if none): ') - if session_token.strip() == '': - session_token = None +def main(args): # Begin permissions enumeration current_user = None users = [] - client = boto3.client( - 'iam', - aws_access_key_id=access_key_id, - aws_secret_access_key=secret_access_key, - aws_session_token=session_token - ) - if args.all_users is True: + client = boto3.client('iam') + + # Since we may be calling with role credentials, try to get the username of the current user. + # If that username is an exception, then let's just grab all users. + try: + current_user = client.get_user()['User'] + except: # disable + current_user = None + + # Now get all users if we explicitly say to, or if the current permissions come from a non-user + if args.all_users is True or current_user is None: response = client.list_users() for user in response['Users']: - users.append({'UserName': user['UserName'], 'Permissions': {'Allow': {}, 'Deny': {}}}) + users.append({ + 'UserName': user['UserName'], + 'Permissions': { + 'Allow': {}, + 'Deny': {} + } + }) while 'IsTruncated' in response and response['IsTruncated'] is True: - response = client.list_users( - Marker=response['Marker'] - ) + response = client.list_users(Marker=response['Marker']) for user in response['Users']: - users.append({'UserName': user['UserName'], 'Permissions': {'Allow': {}, 'Deny': {}}}) + users.append({ + 'UserName': user['UserName'], + 'Permissions': { + 'Allow': {}, + 'Deny': {} + } + }) elif args.user_name is not None: - users.append({'UserName': args.user_name, 'Permissions': {'Allow': {}, 'Deny': {}}}) - else: - current_user = client.get_user()['User'] + users.append({ + 'UserName': args.user_name, + 'Permissions': { + 'Allow': {}, + 'Deny': {} + } + }) + elif current_user is not None: + # if we get here, then we're a user, current_user = { 'UserName': current_user['UserName'], 'Permissions': { @@ -47,6 +56,9 @@ def main(args): } } users.append(current_user) + else: + raise Exception("Insufficient clarity on which users to scan for permissions issues.") + print('Collecting policies for {} users...'.format(len(users))) for user in users: user['Groups'] = [] @@ -56,15 +68,11 @@ def main(args): ## Get groups that the user is in try: - res = client.list_groups_for_user( - UserName=user['UserName'] - ) + res = client.list_groups_for_user(UserName=user['UserName']) user['Groups'] = res['Groups'] while 'IsTruncated' in res and res['IsTruncated'] is True: res = client.list_groups_for_user( - UserName=user['UserName'], - Marker=groups['Marker'] - ) + UserName=user['UserName'], Marker=res['Marker']) user['Groups'] += res['Groups'] except Exception as e: print('List groups for user failed: {}'.format(e)) @@ -76,14 +84,11 @@ def main(args): ## Get inline group policies try: res = client.list_group_policies( - GroupName=group['GroupName'] - ) + GroupName=group['GroupName']) policies = res['PolicyNames'] while 'IsTruncated' in res and res['IsTruncated'] is True: res = client.list_group_policies( - GroupName=group['GroupName'], - Marker=res['Marker'] - ) + GroupName=group['GroupName'], Marker=res['Marker']) policies += res['PolicyNames'] except Exception as e: print('List group policies failed: {}'.format(e)) @@ -96,8 +101,7 @@ def main(args): try: document = client.get_group_policy( GroupName=group['GroupName'], - PolicyName=policy - )['PolicyDocument'] + PolicyName=policy)['PolicyDocument'] except Exception as e: print('Get group policy failed: {}'.format(e)) user['PermissionsConfirmed'] = False @@ -107,14 +111,11 @@ def main(args): attached_policies = [] try: res = client.list_attached_group_policies( - GroupName=group['GroupName'] - ) + GroupName=group['GroupName']) attached_policies = res['AttachedPolicies'] while 'IsTruncated' in res and res['IsTruncated'] is True: res = client.list_attached_group_policies( - GroupName=group['GroupName'], - Marker=res['Marker'] - ) + GroupName=group['GroupName'], Marker=res['Marker']) attached_policies += res['AttachedPolicies'] group['Policies'] += attached_policies except Exception as e: @@ -127,20 +128,14 @@ def main(args): if 'Policies' not in user: user['Policies'] = [] try: - res = client.list_user_policies( - UserName=user['UserName'] - ) + res = client.list_user_policies(UserName=user['UserName']) policies = res['PolicyNames'] while 'IsTruncated' in res and res['IsTruncated'] is True: res = client.list_user_policies( - UserName=user['UserName'], - Marker=res['Marker'] - ) + UserName=user['UserName'], Marker=res['Marker']) policies += res['PolicyNames'] for policy in policies: - user['Policies'].append({ - 'PolicyName': policy - }) + user['Policies'].append({'PolicyName': policy}) except Exception as e: print('List user policies failed: {}'.format(e)) user['PermissionsConfirmed'] = False @@ -149,8 +144,7 @@ def main(args): try: document = client.get_user_policy( UserName=user['UserName'], - PolicyName=policy - )['PolicyDocument'] + PolicyName=policy)['PolicyDocument'] except Exception as e: print('Get user policy failed: {}'.format(e)) user['PermissionsConfirmed'] = False @@ -159,14 +153,11 @@ def main(args): attached_policies = [] try: res = client.list_attached_user_policies( - UserName=user['UserName'] - ) + UserName=user['UserName']) attached_policies = res['AttachedPolicies'] while 'IsTruncated' in res and res['IsTruncated'] is True: res = client.list_attached_user_policies( - UserName=user['UserName'], - Marker=res['Marker'] - ) + UserName=user['UserName'], Marker=res['Marker']) attached_policies += res['AttachedPolicies'] user['Policies'] += attached_policies except Exception as e: @@ -183,31 +174,16 @@ def main(args): # Begin privesc scanning all_perms = [ - 'iam:AddUserToGroup', - 'iam:AttachGroupPolicy', - 'iam:AttachRolePolicy', - 'iam:AttachUserPolicy', - 'iam:CreateAccessKey', - 'iam:CreatePolicyVersion', - 'iam:CreateLoginProfile', - 'iam:PassRole', - 'iam:PutGroupPolicy', - 'iam:PutRolePolicy', - 'iam:PutUserPolicy', - 'iam:SetDefaultPolicyVersion', - 'iam:UpdateAssumeRolePolicy', - 'iam:UpdateLoginProfile', - 'sts:AssumeRole', - 'ec2:RunInstances', - 'lambda:CreateEventSourceMapping', - 'lambda:CreateFunction', - 'lambda:InvokeFunction', - 'lambda:UpdateFunctionCode', - 'dynamodb:CreateTable', - 'dynamodb:PutItem', - 'glue:CreateDevEndpoint', - 'glue:UpdateDevEndpoint', - 'cloudformation:CreateStack', + 'iam:AddUserToGroup', 'iam:AttachGroupPolicy', 'iam:AttachRolePolicy', + 'iam:AttachUserPolicy', 'iam:CreateAccessKey', + 'iam:CreatePolicyVersion', 'iam:CreateLoginProfile', 'iam:PassRole', + 'iam:PutGroupPolicy', 'iam:PutRolePolicy', 'iam:PutUserPolicy', + 'iam:SetDefaultPolicyVersion', 'iam:UpdateAssumeRolePolicy', + 'iam:UpdateLoginProfile', 'sts:AssumeRole', 'ec2:RunInstances', + 'lambda:CreateEventSourceMapping', 'lambda:CreateFunction', + 'lambda:InvokeFunction', 'lambda:UpdateFunctionCode', + 'dynamodb:CreateTable', 'dynamodb:PutItem', 'glue:CreateDevEndpoint', + 'glue:UpdateDevEndpoint', 'cloudformation:CreateStack', 'datapipeline:CreatePipeline' ] @@ -301,37 +277,44 @@ def main(args): # Preliminary check to see if these permissions have already been enumerated in this session if 'Permissions' in user and 'Allow' in user['Permissions']: # Are they an admin already? - if '*' in user['Permissions']['Allow'] and user['Permissions']['Allow']['*'] == ['*']: - user['CheckedMethods'] = {'admin': {}, 'Confirmed':{}, 'Potential': {}} + if '*' in user['Permissions']['Allow'] and user['Permissions']['Allow']['*'] == [ + '*' + ]: + user['CheckedMethods'] = { + 'admin': {}, + 'Confirmed': {}, + 'Potential': {} + } print(' Already an admin!\n') continue for perm in all_perms: for effect in ['Allow', 'Deny']: if perm in user['Permissions'][effect]: - checked_perms[effect][perm] = user['Permissions'][effect][perm] + checked_perms[effect][perm] = user['Permissions'][ + effect][perm] else: for user_perm in user['Permissions'][effect].keys(): if '*' in user_perm: - pattern = re.compile(user_perm.replace('*', '.*')) + pattern = re.compile( + user_perm.replace('*', '.*')) if pattern.search(perm) is not None: - checked_perms[effect][perm] = user['Permissions'][effect][user_perm] + checked_perms[effect][perm] = user[ + 'Permissions'][effect][user_perm] - checked_methods = { - 'Potential': [], - 'Confirmed': [] - } + checked_methods = {'Potential': [], 'Confirmed': []} # Ditch each escalation method that has been confirmed not to be possible for method in escalation_methods: potential = True confirmed = True for perm in escalation_methods[method]: - if perm not in checked_perms['Allow']: # If this permission isn't Allowed, then this method won't work + if perm not in checked_perms[ + 'Allow']: # If this permission isn't Allowed, then this method won't work potential = confirmed = False break - elif perm in checked_perms['Deny'] and perm in checked_perms['Allow']: # Permission is both Denied and Allowed, leave as potential, not confirmed + elif perm in checked_perms['Deny'] and perm in checked_perms['Allow']: # Permission is both Denied and Allowed, leave as potential, not confirmed confirmed = False - elif perm in checked_perms['Allow'] and perm not in checked_perms['Deny']: # It is Allowed and not Denied + elif perm in checked_perms['Allow'] and perm not in checked_perms['Deny']: # It is Allowed and not Denied if not checked_perms['Allow'][perm] == ['*']: confirmed = False if confirmed is True: @@ -364,17 +347,25 @@ def main(args): file.write(',') file.write('\n') file.close() - print('Privilege escalation check completed. Results stored to ./all_user_privesc_scan_results_{}.csv'.format(now)) + print( + 'Privilege escalation check completed. Results stored to ./all_user_privesc_scan_results_{}.csv'. + format(now)) + # https://stackoverflow.com/a/24893252 def remove_empty_from_dict(d): if type(d) is dict: - return dict((k, remove_empty_from_dict(v)) for k, v in d.items() if v and remove_empty_from_dict(v)) + return dict((k, remove_empty_from_dict(v)) for k, v in d.items() + if v and remove_empty_from_dict(v)) elif type(d) is list: - return [remove_empty_from_dict(v) for v in d if v and remove_empty_from_dict(v)] + return [ + remove_empty_from_dict(v) for v in d + if v and remove_empty_from_dict(v) + ] else: return d + # Pull permissions from each policy document def parse_attached_policies(client, attached_policies, user): for policy in attached_policies: @@ -385,12 +376,11 @@ def parse_attached_policies(client, attached_policies, user): user = parse_document(document, user) return user + # Get the policy document of an attached policy def get_attached_policy(client, policy_arn): try: - policy = client.get_policy( - PolicyArn=policy_arn - )['Policy'] + policy = client.get_policy(PolicyArn=policy_arn)['Policy'] version = policy['DefaultVersionId'] can_get = True except Exception as e: @@ -401,133 +391,218 @@ def get_attached_policy(client, policy_arn): if can_get is True: document = client.get_policy_version( PolicyArn=policy_arn, - VersionId=version - )['PolicyVersion']['Document'] + VersionId=version)['PolicyVersion']['Document'] return document except Exception as e: print('Get policy version failed: {}'.format(e)) return False + # Loop permissions and the resources they apply to def parse_document(document, user): if type(document['Statement']) is dict: document['Statement'] = [document['Statement']] for statement in document['Statement']: if statement['Effect'] == 'Allow': - if 'Action' in statement and type(statement['Action']) is list: # Check if the action is a single action (str) or multiple (list) - statement['Action'] = list(set(statement['Action'])) # Remove duplicates to stop the circular reference JSON error + if 'Action' in statement and type( + statement['Action'] + ) is list: # Check if the action is a single action (str) or multiple (list) + statement['Action'] = list( + set(statement['Action']) + ) # Remove duplicates to stop the circular reference JSON error for action in statement['Action']: if action in user['Permissions']['Allow']: if type(statement['Resource']) is list: - user['Permissions']['Allow'][action] += statement['Resource'] + user['Permissions']['Allow'][action] += statement[ + 'Resource'] else: - user['Permissions']['Allow'][action].append(statement['Resource']) + user['Permissions']['Allow'][action].append( + statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Allow'][action] = statement['Resource'] + user['Permissions']['Allow'][action] = statement[ + 'Resource'] else: - user['Permissions']['Allow'][action] = [statement['Resource']] - user['Permissions']['Allow'][action] = list(set(user['Permissions']['Allow'][action])) # Remove duplicate resources + user['Permissions']['Allow'][action] = [ + statement['Resource'] + ] + user['Permissions']['Allow'][action] = list( + set(user['Permissions']['Allow'][ + action])) # Remove duplicate resources elif 'Action' in statement and type(statement['Action']) is str: if statement['Action'] in user['Permissions']['Allow']: if type(statement['Resource']) is list: - user['Permissions']['Allow'][statement['Action']] += statement['Resource'] + user['Permissions']['Allow'][statement[ + 'Action']] += statement['Resource'] else: - user['Permissions']['Allow'][statement['Action']].append(statement['Resource']) + user['Permissions']['Allow'][statement[ + 'Action']].append(statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Allow'][statement['Action']] = statement['Resource'] + user['Permissions']['Allow'][statement[ + 'Action']] = statement['Resource'] else: - user['Permissions']['Allow'][statement['Action']] = [statement['Resource']] # Make sure that resources are always arrays - user['Permissions']['Allow'][statement['Action']] = list(set(user['Permissions']['Allow'][statement['Action']])) # Remove duplicate resources - if 'NotAction' in statement and type(statement['NotAction']) is list: # NotAction is reverse, so allowing a NotAction is denying that action basically - statement['NotAction'] = list(set(statement['NotAction'])) # Remove duplicates to stop the circular reference JSON error + user['Permissions']['Allow'][statement['Action']] = [ + statement['Resource'] + ] # Make sure that resources are always arrays + user['Permissions']['Allow'][statement['Action']] = list( + set(user['Permissions']['Allow'][statement[ + 'Action']])) # Remove duplicate resources + if 'NotAction' in statement and type( + statement['NotAction'] + ) is list: # NotAction is reverse, so allowing a NotAction is denying that action basically + statement['NotAction'] = list( + set(statement['NotAction']) + ) # Remove duplicates to stop the circular reference JSON error for not_action in statement['NotAction']: if not_action in user['Permissions']['Deny']: if type(statement['Resource']) is list: - user['Permissions']['Deny'][not_action] += statement['Resource'] + user['Permissions']['Deny'][ + not_action] += statement['Resource'] else: - user['Permissions']['Deny'][not_action].append(statement['Resource']) + user['Permissions']['Deny'][not_action].append( + statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Deny'][not_action] = statement['Resource'] + user['Permissions']['Deny'][ + not_action] = statement['Resource'] else: - user['Permissions']['Deny'][not_action] = [statement['Resource']] - user['Permissions']['Deny'][not_action] = list(set(user['Permissions']['Deny'][not_action])) # Remove duplicate resources - elif 'NotAction' in statement and type(statement['NotAction']) is str: + user['Permissions']['Deny'][not_action] = [ + statement['Resource'] + ] + user['Permissions']['Deny'][not_action] = list( + set(user['Permissions']['Deny'][ + not_action])) # Remove duplicate resources + elif 'NotAction' in statement and type( + statement['NotAction']) is str: if statement['NotAction'] in user['Permissions']['Deny']: if type(statement['Resource']) is list: - user['Permissions']['Deny'][statement['NotAction']] += statement['Resource'] + user['Permissions']['Deny'][statement[ + 'NotAction']] += statement['Resource'] else: - user['Permissions']['Deny'][statement['NotAction']].append(statement['Resource']) + user['Permissions']['Deny'][statement[ + 'NotAction']].append(statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Deny'][statement['NotAction']] = statement['Resource'] + user['Permissions']['Deny'][statement[ + 'NotAction']] = statement['Resource'] else: - user['Permissions']['Deny'][statement['NotAction']] = [statement['Resource']] # Make sure that resources are always arrays - user['Permissions']['Deny'][statement['NotAction']] = list(set(user['Permissions']['Deny'][statement['NotAction']])) # Remove duplicate resources + user['Permissions']['Deny'][statement['NotAction']] = [ + statement['Resource'] + ] # Make sure that resources are always arrays + user['Permissions']['Deny'][statement['NotAction']] = list( + set(user['Permissions']['Deny'][statement[ + 'NotAction']])) # Remove duplicate resources if statement['Effect'] == 'Deny': if 'Action' in statement and type(statement['Action']) is list: - statement['Action'] = list(set(statement['Action'])) # Remove duplicates to stop the circular reference JSON error + statement['Action'] = list( + set(statement['Action']) + ) # Remove duplicates to stop the circular reference JSON error for action in statement['Action']: if action in user['Permissions']['Deny']: if type(statement['Resource']) is list: - user['Permissions']['Deny'][action] += statement['Resource'] + user['Permissions']['Deny'][action] += statement[ + 'Resource'] else: - user['Permissions']['Deny'][action].append(statement['Resource']) + user['Permissions']['Deny'][action].append( + statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Deny'][action] = statement['Resource'] + user['Permissions']['Deny'][action] = statement[ + 'Resource'] else: - user['Permissions']['Deny'][action] = [statement['Resource']] - user['Permissions']['Deny'][action] = list(set(user['Permissions']['Deny'][action])) # Remove duplicate resources + user['Permissions']['Deny'][action] = [ + statement['Resource'] + ] + user['Permissions']['Deny'][action] = list( + set(user['Permissions']['Deny'][ + action])) # Remove duplicate resources elif 'Action' in statement and type(statement['Action']) is str: if statement['Action'] in user['Permissions']['Deny']: if type(statement['Resource']) is list: - user['Permissions']['Deny'][statement['Action']] += statement['Resource'] + user['Permissions']['Deny'][statement[ + 'Action']] += statement['Resource'] else: - user['Permissions']['Deny'][statement['Action']].append(statement['Resource']) + user['Permissions']['Deny'][statement[ + 'Action']].append(statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Deny'][statement['Action']] = statement['Resource'] + user['Permissions']['Deny'][statement[ + 'Action']] = statement['Resource'] else: - user['Permissions']['Deny'][statement['Action']] = [statement['Resource']] # Make sure that resources are always arrays - user['Permissions']['Deny'][statement['Action']] = list(set(user['Permissions']['Deny'][statement['Action']])) # Remove duplicate resources - if 'NotAction' in statement and type(statement['NotAction']) is list: # NotAction is reverse, so allowing a NotAction is denying that action basically - statement['NotAction'] = list(set(statement['NotAction'])) # Remove duplicates to stop the circular reference JSON error + user['Permissions']['Deny'][statement['Action']] = [ + statement['Resource'] + ] # Make sure that resources are always arrays + user['Permissions']['Deny'][statement['Action']] = list( + set(user['Permissions']['Deny'][statement[ + 'Action']])) # Remove duplicate resources + if 'NotAction' in statement and type( + statement['NotAction'] + ) is list: # NotAction is reverse, so allowing a NotAction is denying that action basically + statement['NotAction'] = list( + set(statement['NotAction']) + ) # Remove duplicates to stop the circular reference JSON error for not_action in statement['NotAction']: if not_action in user['Permissions']['Allow']: if type(statement['Resource']) is list: - user['Permissions']['Allow'][not_action] += statement['Resource'] + user['Permissions']['Allow'][ + not_action] += statement['Resource'] else: - user['Permissions']['Allow'][not_action].append(statement['Resource']) + user['Permissions']['Allow'][not_action].append( + statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Allow'][not_action] = statement['Resource'] + user['Permissions']['Allow'][ + not_action] = statement['Resource'] else: - user['Permissions']['Allow'][not_action] = [statement['Resource']] - user['Permissions']['Allow'][not_action] = list(set(user['Permissions']['Allow'][not_action])) # Remove duplicate resources - elif 'NotAction' in statement and type(statement['NotAction']) is str: + user['Permissions']['Allow'][not_action] = [ + statement['Resource'] + ] + user['Permissions']['Allow'][not_action] = list( + set(user['Permissions']['Allow'][ + not_action])) # Remove duplicate resources + elif 'NotAction' in statement and type( + statement['NotAction']) is str: if statement['NotAction'] in user['Permissions']['Allow']: if type(statement['Resource']) is list: - user['Permissions']['Allow'][statement['NotAction']] += statement['Resource'] + user['Permissions']['Allow'][statement[ + 'NotAction']] += statement['Resource'] else: - user['Permissions']['Allow'][statement['NotAction']].append(statement['Resource']) + user['Permissions']['Allow'][statement[ + 'NotAction']].append(statement['Resource']) else: if type(statement['Resource']) is list: - user['Permissions']['Allow'][statement['NotAction']] = statement['Resource'] + user['Permissions']['Allow'][statement[ + 'NotAction']] = statement['Resource'] else: - user['Permissions']['Allow'][statement['NotAction']] = [statement['Resource']] # Make sure that resources are always arrays - user['Permissions']['Allow'][statement['NotAction']] = list(set(user['Permissions']['Allow'][statement['NotAction']])) # Remove duplicate resources + user['Permissions'][ + 'Allow'][statement['NotAction']] = [ + statement['Resource'] + ] # Make sure that resources are always arrays + user['Permissions']['Allow'][statement['NotAction']] = list( + set(user['Permissions']['Allow'][statement[ + 'NotAction']])) # Remove duplicate resources return user + if __name__ == '__main__': - parser = argparse.ArgumentParser(description='This script will fetch permissions for a set of users and then scan for permission misconfigurations to see what privilege escalation methods are possible. Available attack paths will be output to a .csv file in the same directory.') - parser.add_argument('--all-users', required=False, default=False, action='store_true', help='Run this module against every user in the account.') - parser.add_argument('--user-name', required=False, default=None, help='A single username of a user to run this module against. By default, the user to which the active AWS keys belong to will be used.') - parser.add_argument('--access-key-id', required=False, default=None, help='The AWS access key ID to use for authentication.') - parser.add_argument('--secret-key', required=False, default=None, help='The AWS secret access key to use for authentication.') - parser.add_argument('--session-token', required=False, default=None, help='The AWS session token to use for authentication, if there is one.') + parser = argparse.ArgumentParser( + description= + 'This script will fetch permissions for a set of users and then scan for permission misconfigurations to see what privilege escalation methods are possible. Available attack paths will be output to a .csv file in the same directory.' + ) + parser.add_argument( + '--all-users', + required=False, + default=False, + action='store_true', + help='Run this module against every user in the account.') + parser.add_argument( + '--user-name', + required=False, + default=None, + help= + 'A single username of a user to run this module against. By default, the user to which the active AWS keys belong to will be used.' + ) args = parser.parse_args() - main(args) \ No newline at end of file + main(args)