-
Notifications
You must be signed in to change notification settings - Fork 85
/
gcpbucketbrute.py
237 lines (203 loc) · 12.7 KB
/
gcpbucketbrute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
import argparse
import time
import multiprocessing
import json
import sys
import textwrap
from functools import partial
from datetime import datetime, timedelta
import requests
import google.oauth2.credentials
from google.cloud import storage
from google.oauth2 import service_account
# Write to file and print to screen
def outprint(data='', file_path='', normal_print=''):
with open(file_path, 'a+') as f:
f.write('{}\n'.format(data))
normal_print(data)
def generate_bucket_permutations(keyword):
permutation_templates = [
'{keyword}-{permutation}',
'{permutation}-{keyword}',
'{keyword}_{permutation}',
'{permutation}_{keyword}',
'{keyword}{permutation}',
'{permutation}{keyword}'
]
with open('./permutations.txt', 'r') as f:
permutations = f.readlines()
buckets = []
for perm in permutations:
perm = perm.rstrip()
for template in permutation_templates:
generated_string = template.replace('{keyword}', keyword).replace('{permutation}', perm)
buckets.append(generated_string)
buckets.append(keyword)
buckets.append('{}.com'.format(keyword))
buckets.append('{}.net'.format(keyword))
buckets.append('{}.org'.format(keyword))
buckets = list(set(buckets))
# Strip any guesses less than 3 characters or more than 63 characters
for bucket in buckets:
if len(bucket) < 3 or len(bucket) > 63:
del buckets[bucket]
print('\nGenerated {} bucket permutations.\n'.format(len(buckets)))
return buckets
def read_wordlist(filename):
try:
file = open(filename, 'r')
lines = file.read().splitlines()
file.close()
return lines
except FileNotFoundError:
print('Error: File not found')
exit(1)
except PermissionError:
print('Error: Permission denied')
exit(1)
def main(args):
if args.out_file:
global print
normal_print = print
print = partial(outprint, file_path=args.out_file, normal_print=normal_print)
if args.unauthenticated:
client = None
elif args.service_account_credential_file_path:
credentials = service_account.Credentials.from_service_account_file(args.service_account_credential_file_path)
client = storage.Client(project=None, credentials=credentials)
else:
use_access_token = input('No credential file passed in, enter an access token to authenticate? (y/n) ')
if use_access_token.rstrip().lower() == 'y':
access_token = input('Enter an access token to use for authentication: ')
credentials = google.oauth2.credentials.Credentials(access_token.rstrip())
client = storage.Client(project=None, credentials=credentials)
else:
default = input('No credential file passed in and no access token entered, use the default credentials? (y/n) ')
if default.rstrip().lower() == 'y':
client = storage.Client(project=None)
else:
print('\nNo authentication method selected. Only performing unauthenticated enumeration.')
client = None
subprocesses = []
if args.keyword:
buckets = generate_bucket_permutations(args.keyword)
elif args.wordlist:
buckets = read_wordlist(args.wordlist)
elif args.check:
buckets = args.check
elif args.check_list:
with sys.stdin if args.check_list == '-' else open(args.check_list, 'r') as fd:
buckets = fd.read().splitlines()
start_time = time.time()
for i in range(0, args.subprocesses):
start = int(len(buckets) / args.subprocesses * i)
end = int(len(buckets) / args.subprocesses * (i+1))
permutation_list = buckets[start:end]
subproc = Worker(client, print, permutation_list, args.out_file)
subprocesses.append(subproc)
subproc.start()
cancelled = False
while len(subprocesses) > 0:
try:
subprocesses = [s.join() for s in subprocesses if s is not None]
except KeyboardInterrupt:
cancelled = True
print('Ctrl+C pressed, killing subprocesses...')
if not cancelled:
end_time = time.time()
scanning_duration = timedelta(seconds=(end_time - start_time))
d = datetime(1, 1, 1) + scanning_duration
if d.day - 1 > 0:
print('\nScanned {} potential buckets in {} day(s), {} hour(s), {} minute(s), and {} second(s).'.format(len(buckets), d.day-1, d.hour, d.minute, d.second))
elif d.hour > 0:
print('\nScanned {} potential buckets in {} hour(s), {} minute(s), and {} second(s).'.format(len(buckets), d.hour, d.minute, d.second))
elif d.minute > 0:
print('\nScanned {} potential buckets in {} minute(s) and {} second(s).'.format(len(buckets), d.minute, d.second))
else:
print('\nScanned {} potential buckets in {} second(s).'.format(len(buckets), d.second))
print('\nGracefully exiting!')
if args.out_file:
print = normal_print
class Worker(multiprocessing.Process):
def __init__(self, client, print, permutation_list, out_file):
multiprocessing.Process.__init__(self)
self.client = client
self.print = print
self.permutation_list = permutation_list
self.out_file = out_file
def run(self):
try:
for bucket_name in self.permutation_list:
if self.check_existence(bucket_name):
self.check_permissions(bucket_name)
except KeyboardInterrupt:
return
def check_existence(self, bucket_name):
# Check if bucket exists before trying to TestIamPermissions on it
response = requests.head('https://www.googleapis.com/storage/v1/b/{}'.format(bucket_name))
if response.status_code not in [400, 404]:
return True
return False
def check_permissions(self, bucket_name):
authenticated_permissions = []
unauthenticated_permissions = []
# If client exists, use it to make an authenticated check
if self.client:
authenticated_permissions = self.client.bucket(bucket_name).test_iam_permissions(
permissions=[
'storage.buckets.delete',
'storage.buckets.get',
'storage.buckets.getIamPolicy',
'storage.buckets.setIamPolicy',
'storage.buckets.update',
'storage.objects.create',
'storage.objects.delete',
'storage.objects.get',
'storage.objects.list',
'storage.objects.update'
]
)
if authenticated_permissions:
self.print('\n AUTHENTICATED ACCESS ALLOWED: {}'.format(bucket_name))
if 'storage.buckets.setIamPolicy' in authenticated_permissions:
self.print(' - VULNERABLE TO PRIVILEGE ESCALATION (storage.buckets.setIamPolicy)')
if 'storage.objects.list' in authenticated_permissions:
self.print(' - AUTHENTICATED LISTABLE (storage.objects.list)')
if 'storage.objects.get' in authenticated_permissions:
self.print(' - AUTHENTICATED READABLE (storage.objects.get)')
if 'storage.objects.create' in authenticated_permissions or 'storage.objects.delete' in authenticated_permissions or 'storage.objects.update' in authenticated_permissions:
self.print(' - AUTHENTICATED WRITABLE (storage.objects.create, storage.objects.delete, and/or storage.objects.update)')
self.print(' - ALL PERMISSIONS:')
self.print(textwrap.indent('{}\n'.format(json.dumps(authenticated_permissions, indent=4)), ' '))
# If authenticated, both auth and unauth will be made because results could be different
# If not authenticated, then just unauth will go
unauthenticated_permissions = requests.get('https://www.googleapis.com/storage/v1/b/{}/iam/testPermissions?permissions=storage.buckets.delete&permissions=storage.buckets.get&permissions=storage.buckets.getIamPolicy&permissions=storage.buckets.setIamPolicy&permissions=storage.buckets.update&permissions=storage.objects.create&permissions=storage.objects.delete&permissions=storage.objects.get&permissions=storage.objects.list&permissions=storage.objects.update'.format(bucket_name)).json()
if unauthenticated_permissions.get('permissions'):
self.print('\n UNAUTHENTICATED ACCESS ALLOWED: {}'.format(bucket_name))
if 'storage.buckets.setIamPolicy' in unauthenticated_permissions['permissions']:
self.print(' - VULNERABLE TO PRIVILEGE ESCALATION (storage.buckets.setIamPolicy)')
if 'storage.objects.list' in unauthenticated_permissions['permissions']:
self.print(' - UNAUTHENTICATED LISTABLE (storage.objects.list)')
if 'storage.objects.get' in unauthenticated_permissions['permissions']:
self.print(' - UNAUTHENTICATED READABLE (storage.objects.get)')
if 'storage.objects.create' in unauthenticated_permissions['permissions'] or 'storage.objects.delete' in unauthenticated_permissions['permissions'] or 'storage.objects.update' in unauthenticated_permissions['permissions']:
self.print(' - UNAUTHENTICATED WRITABLE (storage.objects.create, storage.objects.delete, and/or storage.objects.update)')
self.print(' - ALL PERMISSIONS:')
self.print(textwrap.indent('{}\n'.format(json.dumps(unauthenticated_permissions['permissions'], indent=4)), ' '))
if not (authenticated_permissions or unauthenticated_permissions.get('permissions')):
self.print(' EXISTS: {}'.format(bucket_name))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='This script will generate a list of permutations from ./permutations.txt using the keyword passed into the -k/--keyword argument. Then it will attempt to enumerate Google Storage buckets with those names without any authentication. If a bucket is found to be listable, it will be reported (buckets that allow access to "allUsers"). If a bucket is found but it is not listable, it will use the default "gcloud" CLI credentials to try and list the bucket. If the bucket is listable with credentials it will be reported (buckets that allow access to "allAuthenticatedUsers"), otherwise it will reported as existing, but unlistable.')
# Add mutually exclusive arguments: keyword or a single bucket
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--check', required=False, action="append", help='Check a single bucket name instead of bruteforcing names based on a keyword. May be repeated to check multiple buckets.')
group.add_argument('--check-list', required=False, default=None, help='Check a list of buckets in the given file, one per line.')
group.add_argument('-k', '--keyword', required=False, help='The base keyword to use when guessing bucket names. This could be a simple string like "Google" or a URL like "google.com" or anything else. This string is used to generate permutations to search for.')
group.add_argument('-w', '--wordlist', required=False, default=None, help='The path to a wordlist file')
parser.add_argument('-s', '--subprocesses', required=False, default=5, type=int, help='The amount of subprocesses to delegate work to for enumeration. Default: 5. This is essentially how many threads you want to run the script with, but it is using subprocesses instead of threads.')
parser.add_argument('-f', '--service-account-credential-file-path', required=False, default=None, help='The path to the JSON file that contains the private key for a GCP service account. By default, you will be prompted for a user access token, then if you decline to enter one it will prompt you to default to the default system credentials. More information here: https://google-auth.readthedocs.io/en/latest/user-guide.html#service-account-private-key-files and here: https://google-auth.readthedocs.io/en/latest/user-guide.html#user-credentials')
parser.add_argument('-u', '--unauthenticated', required=False, default=False, action='store_true', help='Force an unauthenticated scan (you will not be prompted for credentials)')
parser.add_argument('-o', '--out-file', required=False, default=None, help='The path to a log file to write the scan results to. The file will be created if it does not exist and will append to it if it already exists. By default output will only print to the screen.')
args = parser.parse_args()
main(args)