forked from oysnet/curl-oauth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcurl-oauth
executable file
·277 lines (235 loc) · 9.6 KB
/
curl-oauth
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#! /bin/sh
""":"
exec python $0 ${1+"$@"}
"""
import oauth2 as oauth
import os, ConfigParser
import argparse
import urlparse
import json
import httplib2
import sys
try:
from pygments import highlight
from pygments.lexers import JavascriptLexer
from pygments.formatters import TerminalFormatter
pygment_support = True
except:
pygment_support = False
try:
import certifi
except ImportError:
certifi = None
settings_file = os.path.expanduser('~/.curl-oauth')
config = ConfigParser.RawConfigParser()
if os.path.isfile(settings_file):
config.read(settings_file)
parser = argparse.ArgumentParser(description='A curl like tool with oauth support')
#oauth
parser.add_argument('--domain', dest='domain',
help='config domain')
parser.add_argument('--list-domains', dest='list_domains', action='store_true',
help='list available domains')
"""
parser.add_argument('--delete-domain', dest='delete_domain',
help='delete domain')
parser.add_argument('--set-default-domains', dest='set_default_domain',
help='set default domain')
"""
parser.add_argument('--consumer-key', '-ck', dest='consumer_key',
help='consumer key')
parser.add_argument('--consumer-secret', '-cs', dest='consumer_secret',
help='consumer secret')
parser.add_argument('--token-key', '-tk', dest='token_key',
help='token key')
parser.add_argument('--token-secret', '-ts', dest='token_secret',
help='token secret')
parser.add_argument('--request-token', action='store_true', dest='request_token',
help='create new token')
parser.add_argument('--request-token-url', dest='request_token_url',
help='request token url')
parser.add_argument('--request-token-callback', dest='request_token_callback',
help='request token callback')
parser.add_argument('--request-token-authorize-url', dest='request_token_authorize_url',
help='request token authorize url')
parser.add_argument('--request-token-access-token-url', dest='request_token_access_token_url',
help='request token access verify url')
parser.add_argument('--json', dest='json', action='store_true',
help='add content type and prettify output (install pigments for colors)')
#curl like
parser.add_argument('-X', '--request', dest='http_method', default='GET',
help='Specify request command to use')
parser.add_argument('-H', '--header', dest='headers', action='append',
help='Custom header to pass to server')
parser.add_argument('-d', '--data', dest='data',
help='HTTP POST data')
parser.add_argument('-T', '--upload-file', dest='upload_file',
help='Transfer a file in the body of the http request. Defaults to PUT.')
parser.add_argument('-I', '--head', dest='head', action='store_true',
help='Show document info only')
parser.add_argument('-i', '--include', dest='include', action='store_true',
help='Include protocol headers in the output')
parser.add_argument('urls', nargs='*')
args = parser.parse_args()
def request_token(consumer_key, consumer_secret, request_token_url, request_token_callback, request_token_authorize_url, request_token_access_token_url, domain):
print "requesting token to %s for %s" % (request_token_url, consumer_key)
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
if certifi:
client.ca_certs = certifi.where()
try:
resp, content = client.request(request_token_url)
except httplib2.SSLHandshakeError:
print 'Please run "pip install certifi", and this script will automatically use the certificates that certifi provides.'
exit(1)
#print resp, content
if resp['status'] == '401':
print "Unauthorized access, check your consumer data"
exit(1)
if resp['status'] != '201' and resp['status'] != '200':
print "Error receiving request token"
exit(1)
request_token = dict(urlparse.parse_qsl(content))
print ""
print "Go to the following link in your browser:"
url = "%s?oauth_token=%s" % (request_token_authorize_url, request_token['oauth_token'])
if request_token_callback:
url += "&oauth_callback=%s" % request_token_callback
print url
accepted = 'n'
while accepted.lower() == 'n':
accepted = raw_input('Have you authorized me? (y/n) ')
oauth_verifier = raw_input('What is the PIN? ')
token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret'])
token.set_verifier(oauth_verifier)
client = oauth.Client(consumer, token)
if certifi:
client.ca_certs = certifi.where()
resp, content = client.request(request_token_access_token_url)
access_token = dict(urlparse.parse_qsl(content))
config.add_section(domain)
if not config.has_section('defaults'):
config.add_section('defaults')
config.set('defaults', 'domain', domain)
config.set(domain, 'consumer_key', consumer_key)
config.set(domain, 'consumer_secret', consumer_secret)
config.set(domain, 'token_key', access_token['oauth_token'])
config.set(domain, 'token_secret', access_token['oauth_token_secret'])
config.write(open(settings_file, "wb"))
def list_domains():
if len(config.sections()) < 2:
print 'No domains have been defined';
exit(0)
if config.has_section('defaults') and config.has_option('defaults', 'domain'):
default_domain = config.get('defaults', 'domain')
else:
print "Error parsing config file";
exit(1)
for section in config.sections():
if section == 'defaults':
continue
print '%s%s' % ('* ' if section == default_domain else '',section)
def no_config_available(args):
if args.consumer_key == None or args.consumer_secret == None:
print "No token for this domain available, set consumer_key, consumer_secret and initialize token"
exit(1)
if not args.request_token and (args.token_key == None or args.token_secret == None):
print "Set token key and secret or use --request_token"
exit(1)
if args.request_token and args.request_token_url != None and args.request_token_authorize_url != None and args.request_token_access_token_url != None:
access_token = request_token(args.consumer_key, args.consumer_secret, args.request_token_url, args.request_token_callback, args.request_token_authorize_url, args.request_token_access_token_url, args.domain)
exit(1)
else:
print "Set request_token_url, request_token_authorize_url and request_token_access_token_url "
exit(1)
if args.list_domains:
list_domains()
exit(0)
if args.domain == None:
if config.has_section('defaults') and config.has_option('defaults', 'domain'):
domain = config.get('defaults', 'domain')
else:
print "Set domain for token key"
exit(1)
else:
domain = args.domain
if not config.has_section(domain):
no_config_available(args)
exit(1)
consumer = oauth.Consumer(key=config.get(domain, 'consumer_key'), secret=config.get(domain, 'consumer_secret'))
token = oauth.Token(key=config.get(domain, 'token_key'), secret=config.get(domain, 'token_secret'))
client = oauth.Client(consumer, token)
if certifi:
client.ca_certs = certifi.where()
headers = {}
body = None
if args.json:
headers['Content-Type']='application/json'
if args.headers is not None:
for h in args.headers:
try:
headers[h.split(': ')[0].lower()] = h.split(': ')[1]
except:
print "Wrong header format %s" % h
exit(1)
if args.data is not None:
headers['Content-type'] = 'application/x-www-form-urlencoded'
body = args.data
if args.upload_file:
if args.http_method == 'GET':
args.http_method = 'PUT'
if args.upload_file == '-':
input = sys.stdin
else:
input = open(args.upload_file)
body = input.read()
if args.head:
args.http_method == 'HEAD'
if args.http_method == 'HEAD':
# work around https://github.com/simplegeo/python-oauth2/issues/58
oauth_http_method = 'GET'
else:
oauth_http_method=args.http_method
for url in args.urls:
try:
if body is not None:
resp, content = client.request(url, oauth_http_method, headers=headers, body=body)
else:
resp, content = client.request(url, oauth_http_method, headers=headers)
if args.http_method == 'HEAD' or args.include:
if resp.version == 11:
http_version = '1.1'
elif resp.version == 10:
http_version = '1.0'
else:
http_version = 'unknown (%s)' % resp.version
print 'HTTP/%s %s %s' % (http_version, resp.status, resp.reason.upper())
for k in resp:
if k in ['status']:
continue
print '%s: %s' % ('-'.join((ck.capitalize() for ck in k.split('-'))), resp[k])
if args.http_method != 'HEAD':
if args.json:
try:
content = json.dumps(json.loads(content), indent=2)
except Exception, e:
print content
print 'Json error %s' % e
exit(1)
if pygment_support:
try:
print highlight(content, JavascriptLexer(),TerminalFormatter())
exit(0)
except Exception,e:
print e
exit(1)
else:
print content
exit(0)
else:
print content
except httplib2.SSLHandshakeError:
print e
print 'Please run "pip install certifi", and this script will automatically use the certificates that certifi provides.'
except Exception, e:
print e