-
Notifications
You must be signed in to change notification settings - Fork 4
/
adn.py
170 lines (123 loc) · 5.72 KB
/
adn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Python Library that wraps the alpha ADN API.
Aims to abstract away all the API endpoints
I love talking to people. If you have any questions or comments: [email protected] or @nava
"""
__author__ = "Fernando Nava <[email protected]>"
__version__ = "0.1"
import requests
import json
import re
from adn_endpoints import api_table, base_url
class Adn:
def __init__(self,client_id=None, client_secret=None, redirect_uri=None, access_token=None, scope=['stream', 'email', 'write_post', 'follow', 'messages', 'export']):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.access_token = access_token
self.oauth_anchor = 'https://alpha.app.net/%s'
self.api_anchor = 'https://alpha.app.net/stream/0/%s'
self.accepted_scope = ['stream', 'email', 'write_post', 'follow', 'messages', 'export']
self.scope = ' '.join([type_scope for type_scope in scope if type_scope in self.accepted_scope])
self.auth_url = self.getAuthUrl()
self.request_token_url = self.api_anchor % 'oauth/access_token'
self.client = requests.session()
def setFunc(key):
return lambda **kwargs: self._constructFunc(key, **kwargs)
for key in api_table.keys():
self.__dict__[key] = setFunc(key)
def _constructFunc(self, api_call, **kwargs):
# Go through and replace any mustaches that are in our API url.
fn = api_table[api_call]
url = re.sub(
'\{\{(?P<m>[a-zA-Z_]+)\}\}',
lambda m: "%s" % kwargs.get(m.group(1)),
base_url + fn['url']
)
content = self._request(url, method=fn['method'], params=kwargs)
return content
def _request(self, url, method='GET', params=None, files=None, api_call=None, headers=None, *args, **kwargs):
method = method.lower()
if not method in ('get', 'post', 'delete'):
return "ERROR: NOT CORRECT METHOD"
params = params or {}
headers = headers or {}
func = getattr(self.client, method)
headers['Authorization'] = 'Bearer %s' % self.access_token
if method == 'get' or method == 'delete':
kwargs['params'] = params
#response = func(url, params=params)
else:
kwargs.update({
'files': files,
'data': params
})
response = func(url, headers=headers, *args, **kwargs)
content = response.content.decode('utf-8')
# create stash for last function intel
self._last_call = {
'api_call': api_call,
'api_error': None,
'cookies': response.cookies,
'error': response.error,
'headers': response.headers,
'status_code': response.status_code,
'url': response.url,
'content': content,
}
content = json.loads(content)
return content
def api_request(self, url, *args, **kwargs):
url = self.api_anchor % (url,)
headers = kwargs.get('headers', {})
method = kwargs.get('method', 'get')
# If we are posting we are probably going to be posting JSON
if method == "POST":
headers.update({'Content-type': 'application/json'})
kwargs['headers'] = headers
return self._request(url, *args, **kwargs)
def getAuthUrl(self):
if self.client_id and self.redirect_uri:
url = self.api_anchor % "/oauth/authenticate?client_id="+\
self.client_id + "&response_type=code&redirect_uri=" +\
self.redirect_uri + "&scope=" + self.scope
return url
else:
return "ERROR: Need client_id and redirect_uri to generate Authenticate Url"
def getAccessToken(self, code):
post_data = {'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'redirect_uri': self.redirect_uri,
'code': code}
get_access_token = requests.post(self.request_token_url, data=post_data)
if get_access_token.ok:
access_token_info = json.loads(get_access_token.text)
self.access_token = access_token_info['access_token']
return self.access_token
elif self.access_token:
return self.access_token
else:
return "ERROR: Attempt to get AccessToken Failed - Try Again"
def getClientToken(self):
if self.client_id and self.client_secret:
url = self.request_token_url + "&scope=" + self.scope
post_data = {'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'}
r = requests.post(self.request_token_url, data=post_data)
token = json.loads(r.text)
return token['access_token']
def streams(self, *args, **kwargs):
return self.api_request('/streams', *args, **kwargs)
def filters(self, *args, **kwargs):
return self.api_request('/filters', *args, **kwargs)
def bug(code):
post_data = {'client_id': os.environ.get('CLIENT_ID'),
'client_secret': os.environ.get('CLIENT_SECRET'),
'grant_type': 'authorization_code',
'redirect_uri': os.environ.get('REDIRECT_URL'),
'code': code}
url = 'https://alpha.app.net/oauth/access_token'
r = requests.post(url, data=post_data)
return r