-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmfclient.py
1746 lines (1531 loc) · 71.5 KB
/
mfclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
This module is a Python 3.x (standard lib only) implementation of a mediaflux client
Author: Sean Fleming
"""
import os
import re
import sys
import ssl
import math
import time
import zlib
import shlex
import random
import string
import getpass
import logging
import datetime
import platform
import posixpath
import threading
import http.client
import configparser
import xml.etree.ElementTree as ET
import urllib.request, urllib.error, urllib.parse
from pathlib import PurePath
#------------------------------------------------------------
class mf_client():
"""
Base Mediaflux authentication and communication client
All unexpected failures are handled by raising exceptions
"""
def __init__(self, protocol="http", port="80", server="localhost", domain="system", encrypted_data=True):
"""
Create a Mediaflux server connection instance. Raises an exception on failure.
Args:
protocol: a STRING which should be either "http" or "https"
port: a STRING which is usually "80" or "443"
server: a STRING giving the FQDN of the server
domain: a STRING giving the authentication domain to use when authenticating
Returns:
A reachable mediaflux server object that has not been tested for its authentication status
Raises:
Error if server appears to be unreachable
"""
# configure interfaces
self.type = "mflux"
self.protocol = protocol
self.server = server
self.port = int(port)
self.domain = domain
self.timeout = 120
self.status = "not connected"
# NB: there can be some subtle bugs in python library handling if these are "" vs None
self.session = ""
self.token = ""
self.logging = logging.getLogger('mfclient')
# download/upload buffers
self.get_buffer = 8192
self.put_buffer = 8192
# XML pretty print hack
self.indent = 0
self.enable_polling = True
# POST URL
self.post_url = "%s://%s/__mflux_svc__" % (protocol, server)
# can override to test fast http data transfers (with https logins)
if protocol == 'https':
self.encrypted_data = encrypted_data
else:
self.encrypted_data = False
# data URLs ... a bit hacky (hard coded ports) but the speed improvement is huge
if self.encrypted_data:
self.data_get = "https://%s/mflux/content.mfjp" % server
self.data_put = "%s:%s" % (server, 443)
else:
self.data_get = "http://%s/mflux/content.mfjp" % server
self.data_put = "%s:%s" % (server, 80)
# more info
self.logging.debug("POST=%s" % self.post_url)
self.logging.debug("GET=%s" % self.data_get)
self.logging.debug("PUT=%s" % self.data_put)
self.logging.debug("OpenSSL=%s", ssl.OPENSSL_VERSION)
# --- NEW
@classmethod
def from_endpoint(cls, endpoint):
"""
Create mfclient using an endpoint description
"""
if 'url' in endpoint:
url = urllib.parse.urlparse(endpoint['url'])
endpoint['port'] = url.port
endpoint['server'] = url.hostname
endpoint['protocol'] = url.scheme
if 'encrypt' in endpoint:
encrypt = endpoint['encrypt']
else:
encrypt = True
client = cls(protocol=endpoint['protocol'], server=endpoint['server'], port=endpoint['port'], encrypted_data=encrypt)
client.status = "not connected"
if 'domain' in endpoint:
client.domain = endpoint['domain']
if 'session' in endpoint:
client.session = endpoint['session']
if 'token' in endpoint:
client.token = endpoint['token']
return client
#------------------------------------------------------------
def endpoint(self):
"""
Return configuration as endpoint description
"""
endpoint = { 'type':self.type, 'protocol':self.protocol, 'server':self.server, 'port':self.port, 'domain':self.domain }
endpoint['encrypt'] = self.encrypted_data
endpoint['session'] = self.session
endpoint['token'] = self.token
return endpoint
#------------------------------------------------------------
def connect(self):
"""
Acquire connection status via session or token
"""
# NEW - added /aterm path to connection test
# without this it will be attempting to connect to the web server - which may not be configured and doesn't need to be for API access
url = "%s://%s:%d/aterm" % (self.protocol, self.server, self.port)
self.logging.info("url=[%s]" % url)
# reachability check
try:
code = urllib.request.urlopen(url, timeout=5).getcode()
self.logging.info("connection code=%r" % code)
except Exception as e:
# TODO - more precise messaging, eg using str(e) content
# TODO - eg timeout status
self.status = "not connected"
self.logging.info(str(e))
return False
# fast data channel check
if self.protocol == 'https' and self.encrypted_data is True:
try:
# updated connection path test
response = urllib.request.urlopen("http://%s:80/aterm" % self.server, timeout=5)
if response.code == 200:
self.logging.info("Setting data channel to http")
self.encrypted_data = False
# override data channel only
self.data_get = "http://%s/mflux/content.mfjp" % self.server
self.data_put = "%s:%s" % (self.server, 80)
except Exception as e:
self.logging.debug(str(e))
# convert session into a connection description
try:
# NEW - better baseline check in terms of permissions
# NB: don't use actor[name] as this might be an internal mediaflux ID
reply = self.aterm_run("actor.self.describe")
elem = reply.find(".//actor")
if elem is not None:
# self.status = "authenticated to: %s" % url
# NEW - check for expired/destroyed token
if 'destroyed' in elem.attrib:
if elem.attrib['destroyed'] == 'true':
raise Exception("Delegate destroyed")
self.status = "authenticated"
return True
except Exception as e:
message = str(e)
self.logging.info(message)
if "maintenance mode" in message:
self.status = "maintenance"
return False
self.status = "login required"
return False
#------------------------------------------------------------
def login(self, user=None, password=None, domain=None, token=None):
"""
Authenticate to the server and record the session on success
Input:
user, password: STRINGS specifying user login details
token: STRING specifying a delegate credential
Raises:
An error if authentication fails
"""
# security check
if self.protocol != "https":
self.logging.debug("Permitting unencrypted login; I hope you know what you're doing.")
# NEW - priority order: use preset domain, otherwise API argument, finally prompt
if self.domain is not None:
domain = self.domain
else:
if domain is None:
domain = input("Domain: ")
# command prompt entry
if user is None and token is None:
logging.info("Authentication domain [%s]" % domain)
user = input("Username: ")
password = getpass.getpass("Password: ")
# create a session - failed aterm_run calls should raise an exception that gets handed back up
# priority order: user/password followed by token
reply = None
if user is not None:
reply = self.aterm_run("system.logon :domain %s :user %s :password %s" % (domain, user, password))
else:
if token is not None:
if len(token) > 0:
logging.info("Secure token login.")
reply = self.aterm_run("system.logon :token %s" % token)
self.token = token
# attempt to extract a session
try:
elem = reply.find(".//session")
self.session = elem.text
self.logging.info("Established session: %s" % self.session)
# refresh connection info
self.connect()
except Exception as e:
self.logging.error(str(e))
raise Exception("Invalid login call")
#------------------------------------------------------------
def polling(self, polling_state=True):
"""
Set the current polling state, intended for terminating threads
"""
self.logging.info("Set polling: %r" % polling_state)
with threading.Lock():
self.enable_polling = polling_state
#------------------------------------------------------------
def logout(self):
"""
Destroy the current session (NB: delegate can auto-create a new session if available)
"""
self.aterm_run("system.logoff")
self.session = ""
self.status = "login required"
#------------------------------------------------------------
def delegate(self, line):
"""
Create a secure token for use in authenticating
"""
delegate_default = 7
delegate_min = 1
delegate_max = 365
# destroy
if line.startswith('off'):
self.logging.debug("Destroying secure tokens...")
self.aterm_run("secure.identity.token.all.destroy")
self.token = ""
return True
# expiry date setup
try:
dt = max(min(float(line), delegate_max), delegate_min)
except:
dt = delegate_default
d = datetime.datetime.now() + datetime.timedelta(days=dt)
expiry = d.strftime("%d-%b-%Y %H:%M:%S")
try:
# query current authenticated identity
result = self.aterm_run("actor.self.describe")
elem = result.find(".//actor")
actor = elem.attrib['name']
i = actor.find(":")
domain = actor[0:i]
user = actor[i+1:]
self.logging.debug("Attempting to delegate for: domain=%s, user=%s, until=%r" % (domain, user, expiry))
# attempt to delegate as current identity
result = self.aterm_run('secure.identity.token.create :to "%s" :role -type user "%s" :role -type domain "%s" :min-token-length 16 :wallet true' % (expiry, actor, domain))
elem = result.find(".//token")
self.token = elem.text
return True
except Exception as e:
self.logging.error(str(e))
return False
#------------------------------------------------------------
def whoami(self):
"""
Display information about the authenticated identity
"""
xml_reply = self.aterm_run("actor.self.describe")
result = []
# main identity
for elem in xml_reply.iter('actor'):
user_name = elem.attrib['name']
user_type = elem.attrib['type']
if 'identity' in user_type:
# MFLUX BUG - can run a describe ... but if specify an id - even for a token I own - it generates a permission error
# workaround - run and search for the right token to get it's validity
xml_expiry = self.aterm_run("secure.identity.token.describe")
expiry = "Never"
for elem_id in xml_expiry.findall(".//identity"):
elem_actor = elem_id.find(".//actor")
if elem_actor is not None:
if user_name in elem_actor.text:
elem_valid = elem_id.find(".//validity/to")
expiry = elem_valid.text
result.append("user = delegate (expires %s)" % expiry)
else:
result.append("%s = %s" % (user_type, user_name))
# associated roles
for elem in xml_reply.iter('role'):
result.append(" role = %s" % elem.text)
return result
#------------------------------------------------------------
@staticmethod
def _xml_succint_error(xml):
"""
Primitive for extracting more concise error messages from Java stack traces
"""
max_size = 600
# pattern 1 - remove context
match = re.search(r"Syntax error.*Context", xml, re.DOTALL)
if match:
message = match.group(0)[:-7]
return message[:max_size]
# pattern 2 - other
match = re.search(r"failed:.*", xml)
if match:
message = match.group(0)[7:]
return message[:max_size]
# give up
return xml[:max_size]
#------------------------------------------------------------
def _post(self, xml_bytes, out_filepath=None):
"""
Primitive for sending an XML message to the Mediaflux server
"""
# NB: timeout exception if server is unreachable
elem=None
try:
request = urllib.request.Request(self.post_url, data=xml_bytes, headers={'Content-Type': 'text/xml', 'charset': 'utf-8'})
response = urllib.request.urlopen(request, timeout=self.timeout)
xml = response.read()
tree = ET.fromstring(xml.decode())
elem = tree.find(".//reply/error")
# process connection error
except Exception as e:
self.logging.debug(str(e))
raise Exception(str(e))
# process server response error
if elem is not None:
elem = tree.find(".//message")
error_message = self._xml_succint_error(elem.text)
self.logging.debug("raise: [%s]" % error_message)
raise Exception(error_message)
return tree
#------------------------------------------------------------
def _post_multipart_buffered(self, xml, filepath, cb_progress=None):
"""
Primitive for doing buffered upload on a single file. Used by the put() method
Sends a multipart POST to the server; consisting of the initial XML, followed by a streamed, buffered read of the file contents
"""
# mediaflux seems to have random periods of unresponsiveness - particularly around final ACK of transfer
# retries don't seem to work at all, but increasing the timeout seems to help cover the problem
upload_timeout = 1800
# setup
pid = os.getpid()
boundary = ''.join(random.choice(string.digits + string.ascii_letters) for i in range(30))
filename = os.path.basename(filepath)
# default
mimetype = 'application/octet-stream'
# multipart - request xml and file
lines = []
lines.extend(('--%s' % boundary, 'Content-Disposition: form-data; name="request"', '', str(xml),))
# specifying nb-data-attachments is the key for getting the data direct to the store
lines.extend(('--%s' % boundary, 'Content-Disposition: form-data; name="nb-data-attachments"', '', "1",))
# file
lines.extend(('--%s' % boundary, 'Content-Disposition: form-data; name="filename"; filename="%s"' % filename, 'Content-Type: %s' % mimetype, '', ''))
body = '\r\n'.join(lines)
# NB - should include everything AFTER the first /r/n after the headers
total_size = len(body) + os.path.getsize(filepath) + len(boundary) + 8
# different connection object for HTTPS vs HTTP
if self.encrypted_data is True:
self.logging.debug("Using https for data: [%s]" % self.data_put)
conn = http.client.HTTPSConnection(self.data_put, timeout=upload_timeout)
else:
self.logging.debug("Using http for data: [%s]" % self.data_put)
conn = http.client.HTTPConnection(self.data_put, timeout=upload_timeout)
# kickoff
self.logging.debug("[pid=%d] File send starting: %s" % (pid, filepath))
conn.putrequest('POST', '/__mflux_svc__')
# headers
conn.putheader('Connection', 'keep-alive')
conn.putheader('Cache-Control', 'no-cache')
conn.putheader('Content-Length', str(total_size))
conn.putheader('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
conn.putheader('Content-Transfer-Encoding', 'binary')
conn.endheaders()
# start sending the file
conn.send(body.encode())
with open(filepath, 'rb') as infile:
# TODO - we *could* allow ctrl-C interruption here via enable_polling state, but could create a mess on the server
while True:
# trap disk IO issues
try:
chunk = infile.read(self.put_buffer)
except Exception as e:
raise Exception("File read error: %s" % str(e))
# exit condition
if not chunk:
break
# trap network IO issues
try:
conn.send(chunk)
if cb_progress is not None:
cb_progress(len(chunk))
except Exception as e:
raise Exception("Network send error: %s" % str(e))
# terminating line (len(boundary) + 8)
chunk = "\r\n--%s--\r\n" % boundary
conn.send(chunk.encode())
self.logging.debug("[pid=%d] File send completed, waiting for server..." % pid)
# get ACK from server (asset ID) else error (raise exception)
resp = conn.getresponse()
reply = resp.read()
conn.close()
tree = ET.fromstring(reply)
message = "response did not contain an asset ID."
for elem in tree.iter():
if elem.tag == 'id':
return int(elem.text)
if elem.tag == 'message':
message = elem.text
raise Exception(message)
#------------------------------------------------------------
@staticmethod
def _xml_sanitise(text):
"""
Helper method to sanitise text for the server XML parsing routines
"""
if isinstance(text, str):
text = text.replace('&', "&")
text = text.replace('<', "<")
text = text.replace('>', ">")
text = text.replace('"', """)
return text
#------------------------------------------------------------
@staticmethod
def _xml_cloak(text):
"""
Helper method for hiding sensitive text in XML posts so they can be displayed
"""
text1 = re.sub(r'session=[^>]*', 'session="..."', text)
text2 = re.sub(r'<password>.*?</password>', '<password>xxxxxxx</password>', text1)
text3 = re.sub(r'<token>.*?</token>', '<token>xxxxxxx</token>', text2)
text4 = re.sub(r'<service name="secure.wallet.set">.*?</service>', '<service name="secure.wallet.set">xxxxxxx</service>', text3)
return text4
#------------------------------------------------------------
# TODO - convert to background always true -> will need to fix :out first (see below) and a whole lot of other things
def aterm_run(self, input_line, background=False, post=True, description=None, show_progress=False):
"""
Method for parsing aterm's compressed XML syntax and sending to the Mediaflux server
Args:
service_call: raw input text that is assumed to be in aterm syntax
post: if False will just return the argument part of the serialized XML, if True will post and return reply
Returns:
A STRING containing the server reply (if post is TRUE, if false - just the XML for test comparisons)
"""
# TODO - I suspect parser should do this (if appropriate) and pass background=True to the module implementations
# intercept (before lexer!) and remove ampersand at end of line -> background job
if input_line[-1:] == '&':
background = True
input_line = input_line[:-1]
self.logging.info("running in background: [%s]" % input_line)
# use posix=True as it's the closest to how aterm processes input strings
# encoding the line (which is a str) creates an object with no read() method
# this input now has no read() method I guess ...
# lexer = shlex.shlex(input_line.encode('utf-8'), posix=True)
# dropping the encode gets rid of the previous error
lexer = shlex.shlex(input_line, posix=True)
# DS-421 fixes lexer dropping XML text payload starting with #, thinking it's a comment
lexer.commenters=""
lexer.whitespace_split = True
xml_root = ET.Element(None)
xml_node = xml_root
child = None
stack = []
data_out_min = 0
data_out_name = None
flag_password = False
# first token is the service call, the rest are child arguments
service_call = lexer.get_token()
token = lexer.get_token()
# better handling of deletions to the XML
xml_unwanted = None
try:
while token is not None:
# print("token=[%s], child=[%r], flag_pwd=%r" % (token, child, flag_password))
if token[0] == ':':
child = ET.SubElement(xml_node, '%s' % token[1:])
# if element contains : (eg csiro:seismic) then we need to inject the xmlns stuff
if ":" in token[1:]:
item_list = token[1:].split(":")
self.logging.debug("XML associate namespace [%s] with element [%s]" % (item_list[0], token[1:]))
child.set("xmlns:%s" % item_list[0], item_list[0])
# NEW - flag that we expect the next token to be password data
if token == ':password':
flag_password = True
# these are special XML attribute/nesting characters
elif token[0] == '<':
stack.append(xml_node)
xml_node = child
elif token[0] == '>':
xml_node = stack.pop()
elif token[0] == '-':
try:
# -number => it's a text value
number = float(token)
child.text = token
except:
# -other => it's an XML attribute/property
key = token[1:]
value = lexer.get_token()
if value is not None:
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
child.set(key, value)
else:
# someone put in something silly, I think...
raise Exception ("Malformed input command")
else:
# if not a new element, or a special characters, should be text for the child element
if child is not None:
# FIXME - some issues here with data strings with multiple spaces (ie we are doing a whitespace split & only adding one back)
if child.text is not None:
child.text += " " + token
else:
if token.startswith('"') and token.endswith('"'):
child.text = token[1:-1]
else:
child.text = token
# special case - out element - needs to be removed (replaced with outputs-via and an outputs-expected attribute)
if child.tag.lower() == "out":
data_out_name = child.text
data_out_min = 1
# schedule for deletion but don't delete yet due to potentially multiple passthroughs
xml_unwanted = child
# NEW - special case handling for password element text
if flag_password is True:
# HACK - in order to bypass lexer tokenisation (which destroys any multiple white spaces) and avoid special characters - assume :password is last entry
n = input_line.find(':password ')
child.text = input_line[n+10:]
# since we're assuming the password was the very last element - we're done
break
# next token
token = lexer.get_token()
except Exception as e:
self.logging.error(str(e))
raise SyntaxError
# do any deletions to the tree after processing
if xml_unwanted is not None:
xml_node.remove(xml_unwanted)
# build the request XML tree
xml = ET.Element("request")
child = ET.SubElement(xml, "service")
# NEW - xmltree append() doesn't like it if xml_root contains *multiple* elements ... so it injects a <None> parent ...
# NEW - xmltree extend() works as intended ... but it's not available in python 2.6
# special case for "system.login" as it does not work when wrapped with "service.execute" - which requires a valid session
if service_call == "system.logon":
# the case where the call can't be wrapped in a service.execute
child.set("name", service_call)
args = ET.SubElement(child, "args")
for item in xml_root.findall("*"):
args.append(item)
elif service_call == 'service.execute':
# the case where the call is already wrapped in a service.execute
child.set("name", service_call)
child.set("session", self.session)
args = ET.SubElement(child, "args")
for item in xml_root.findall("*"):
args.append(item)
else:
# wrap the service call in a service.execute to allow background execution, if desired
child.set("name", "service.execute")
child.set("session", self.session)
args = ET.SubElement(child, "args")
if background is True:
bg = ET.SubElement(args, "background")
bg.text = "True"
if description is not None:
desc = ET.SubElement(args, "description")
desc.text = description
call = ET.SubElement(args, "service")
call.set("name", service_call)
for item in xml_root.findall("*"):
call.append(item)
# return data via the output URL
if data_out_min > 0:
call.set("outputs", "%s" % data_out_min)
output = ET.SubElement(args, "outputs-via")
output.text = "session"
# convert XML to string for posting ...
xml_text = ET.tostring(xml)
# PYTHON3 - bytes v strings
xml_hidden = self._xml_cloak(xml_text.decode()).encode()
self.logging.debug("XML out: %r" % xml_hidden)
# testing hook
if post is not True:
return xml_text
# send the service call and see what happens ...
message = "This shouldn't happen"
# while True:
# CURRENT - only 2 tries - 1st ... possibly second if session has expired (and we can regen with token) ... after that - done
post_count = 0
post_retry = True
while post_retry is True:
self.logging.debug("loop: post_count=%d, post_retry=%r" % (post_count, post_retry))
post_count += 1
post_retry = False
try:
# NEW - INFO on timing for mflux service calls
start_time = time.time()
# main POST to server
reply = self._post(xml_text)
if background is True:
elem = reply.find(".//id")
job = elem.text
done = False
while done is False:
self.logging.debug("background task [%s] poll..." % job)
# CURRENT - an issue with calling self in some edge cases?
# TODO - switch to plain _post ... ?
state = "unknown"
description = "unknown"
xml_poll = None
try:
xml_poll = self.aterm_run("service.background.describe :id %s" % job)
# self.xml_print(xml_poll)
# try and build a consistent user report using wildly different mediaflux reports
text = "task id=%s, " % job
elem = xml_poll.find(".//task/description")
if elem is not None:
description = elem.text
text += "%s, " % description
elem = xml_poll.find(".//task/state")
if elem is not None:
state = elem.text
text += "%s " % state
# set exit flag
if "complete" in state:
xml_poll = self.aterm_run("service.background.results.get :id %s" % job)
done = True
elif "fail" in state:
done = True
# TODO
# text += "elapsed=%s " % exec_time
except Exception as e:
text = "task id=%s, polling error " % job
self.logging.error(str(e))
done = True
# show progress report, if requested
if show_progress is True:
# sys.stdout.write("\r"+text)
sys.stdout.write("\r"+text)
sys.stdout.flush()
# if done is True:
# print(" ")
# if not done, sleep to prevent overloading server with requests
if done is False:
time.sleep(5)
# successful
# NB: mediaflux seems to not return any output if run in background (eg asset.get :id xxxx &)
# this seems like a bug?
# self.xml_print(xml_poll)
elapsed = time.time() - start_time
self.logging.debug("completed: %s, elapsed: %r" % (service_call, elapsed))
return xml_poll
else:
# CASE 2 - not run in background
if data_out_name is not None:
# output field specified (eg download file)
# FIXME - can only cope with 1 output
self.logging.debug("output filename [%s]" % data_out_name)
elem_output = reply.find(".//outputs")
if elem_output is not None:
elem_id = elem_output.find(".//id")
output_id = elem_id.text
url = self.data_get + "?_skey=%s&id=%s" % (self.session, output_id)
url = url.replace("content", "output")
response = urllib.request.urlopen(url)
with open(data_out_name, 'wb') as output:
while True:
# trap network IO issues
try:
data = response.read(self.get_buffer)
except Exception as e:
raise Exception("Network read error: %s" % str(e))
# exit condition
if not data:
break
# trap disk IO issues
try:
output.write(data)
except Exception as e:
raise Exception("File write error: %s" % str(e))
else:
self.logging.debug("missing output data in XML server response")
# successful
elapsed = time.time() - start_time
self.logging.debug("completed: %s, elapsed: %r" % (service_call, elapsed))
return reply
except Exception as e:
message = str(e)
self.logging.debug(message)
# only flag a retry if the session was invalid and we have a token
if "session is not valid" in message:
if len(self.token) > 0:
if post_count == 1:
post_retry = True
if post_retry is True:
# use raw post here ... not a recursive aterm_run() as this may get stuck in a re-try loop
self.logging.info("Attempting to restore session with token")
try:
xml_raw = '<request><service name="system.logon"><args><token>%s</token></args></service></request>' % self.token
xml_retry = self._post(xml_raw.encode())
elem = xml_retry.find(".//session")
self.session = elem.text
# PYTHON3 - due to the strings vs bytes change (ie xml_text is bytes rather than string)
xml_text = re.sub('session=[^>]*', 'session="%s"' % self.session, xml_text.decode()).encode()
except Exception as e:
# no point continuing to retry - couldn't regenerate a valid session
message = str(e)
self.logging.debug(message)
self.session = ""
post_retry = False
# give up with the most recent error message
raise Exception(message)
#------------------------------------------------------------
def command(self, text):
"""
Default passthrough method
"""
self.logging.debug(text)
# special commands
if text.startswith("delegate "):
args = text[8:].strip()
self.delegate(args)
else:
# assumed mflux service calls
reply = self.aterm_run(text)
self.xml_print(reply)
#------------------------------------------------------------
def _xml_recurse(self, elem, text=""):
"""
Helper method for traversing XML and generating formatted output
"""
if elem.text is not None:
text += ' '*self.indent + '%s="%s" ' % (elem.tag, elem.text)
else:
text += ' '*self.indent + '%s ' % elem.tag
for key, value in elem.attrib.items():
text += ' -%s="%s"' % (key, value)
text += '\n'
self.indent += 4
for child in list(elem):
text = self._xml_recurse(child, text)
self.indent -= 4
return text
#------------------------------------------------------------
def xml_print(self, xml_tree, trim=True):
"""
Helper method for displaying XML nicely, as much as is possible
"""
# seek for "normal" response
elem = None
if trim is True:
elem = xml_tree.find(".//result")
# seek for error message
if elem is None:
elem = xml_tree.find(".//message")
# still nothing? give up and print the whole thing
if elem is None:
elem = xml_tree
if elem is not None:
# TODO - replace with ET.tostring() ?
for child in list(elem):
print(self._xml_recurse(child).strip('\n'))
else:
print("Empty XML document")
return
#------------------------------------------------------------
def _xml_xpath_boolean(self, xpath, xml):
"""
Helper for extracting boolean from expected xpath query
"""
elem = xml.find(xpath)
if elem is not None:
if elem.text == "true":
return True
return False
#------------------------------------------------------------
def namespace_exists(self, namespace):
"""
Wrapper around the generic service call mechanism (for testing namespace existence) that parses the result XML and returns a BOOLEAN
"""
reply = self.aterm_run('asset.namespace.exists :namespace "%s"' % namespace.replace('"', '\\\"'))
elem = reply.find(".//exists")
if elem is not None:
if elem.text == "true":
return True
return False
#------------------------------------------------------------
# completion helper ...
def abspath(self, cwd, path):
"""
enforce absolute remote namespace path
"""
self.logging.debug("cwd = [%s] input = [%s]" % (cwd, path))
if not posixpath.isabs(path):
fullpath = posixpath.normpath(posixpath.join(cwd, path))
else:
fullpath = posixpath.normpath(path)
return fullpath
#------------------------------------------------------------
def complete_folder(self, cwd, partial_ns, start):
"""
Command line completion for folders (aka namespaces)
"""
self.logging.debug("cn seek: cwd=[%s] partial_ns=[%s] start=[%d]" % (cwd, partial_ns, start))
# extract any partial namespace to use as pattern match
match = re.match(r".*/", partial_ns)
if match:
offset = match.end()
pattern = partial_ns[offset:]
else:
offset = 0
pattern = partial_ns
# namespace fragment prefix (if any) to include in the returned candidate
prefix = partial_ns[start:offset]
# offset to use when extracting completion string from candidate matches
xlat_offset = max(0, start-offset)
# special case - we "know" .. is a namespace
if pattern == "..":
return [partial_ns[start:]+"/"]
# construct an absolute namespace (required for any remote lookups)
# target_ns = self.absolute_namespace(partial_ns[:offset])
target_ns = self.abspath(cwd, partial_ns[:offset])
self.logging.debug("cn seek: target_ns: [%s] : prefix=[%r] : pattern=[%r] : start=%r : xlat=%r" % (target_ns, prefix, pattern, start, xlat_offset))
# generate listing in target namespace for completion matches
result = self.aterm_run('asset.namespace.list :namespace "%s"' % target_ns)
ns_list = []
for elem in result.iter('namespace'):
if elem.text is not None:
# namespace matches the pattern we're looking for?
item = None
if len(pattern) != 0:
if elem.text.startswith(pattern):
item = posixpath.join(prefix, elem.text[xlat_offset:]+"/")
else:
item = posixpath.join(prefix, elem.text[xlat_offset:]+"/")
if item is not None:
ns_list.append(item)
self.logging.debug("cn found: %r" % ns_list)
return ns_list
# --- helper
def escape_single_quotes(self, namespace):
return namespace.replace("'", "\\'")
#------------------------------------------------------------
def complete_file(self, cwd, partial_asset_path, start):
"""
Command line completion for files (aka assets)
"""
self.logging.debug("ca seek: cwd=[%s] partial_asset=[%s] start=[%d]" % (cwd, partial_asset_path, start))
# construct an absolute namespace (required for any remote lookups)
candidate_ns = self.abspath(cwd, partial_asset_path)
if self.namespace_exists(candidate_ns):
# candidate is a namespace -> it's our target for listing
target_ns = candidate_ns
# no pattern -> add all namespaces
pattern = None
# replacement prefix for any matches
prefix = partial_asset_path[start:]
else:
# candidate not a namespace -> set the parent as the namespace target
match = re.match(r".*/", candidate_ns)
if match:
target_ns = match.group(0)
# extract pattern to search and prefix for any matches
pattern = candidate_ns[match.end():]