-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandle_rate_limit_demo.py
executable file
·51 lines (42 loc) · 1.42 KB
/
handle_rate_limit_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
import argparse
from os import environ
import requests
from requests.adapters import HTTPAdapter, Retry
from urllib.parse import urlencode
ZUKO_SESSIONS_API_URL = 'https://egress.api.zuko.io/sessions'
try:
ZUKO_API_KEY = environ['ZUKO_API_KEY']
except KeyError:
exit('The ZUKO_API_KEY environment variable must be set with your Zuko API key')
parser = argparse.ArgumentParser()
parser.add_argument('--form-uuid', '-f', required=True)
parser.add_argument('--start-time', '-s', required=True)
parser.add_argument('--end-time', '-e', required=True)
args = parser.parse_args()
def fetch_sessions(params):
session = requests.Session()
session.mount('https://', HTTPAdapter(max_retries=Retry(total=3, backoff_factor=3.0, status_forcelist=[429])))
resp = session.get(
ZUKO_SESSIONS_API_URL + "?" + urlencode(params),
headers={
'X-Api-Key': ZUKO_API_KEY
}
)
resp.raise_for_status()
next_page_id = resp.json().get('next_page_id')
sessions = resp.json()['sessions']
return [next_page_id, sessions]
params = {
'form_uuid': args.form_uuid,
'time[from]': args.start_time,
'time[to]': args.end_time,
}
sessions = []
next_page_id = True
while next_page_id:
next_page_id, batch = fetch_sessions(params)
sessions += batch
params['next_page_id'] = next_page_id
print('Fetched {} session(s) for form {} between {} and {}'
.format(len(sessions), args.form_uuid, args.start_time, args.end_time))