Skip to content

Commit

Permalink
Refactor uploadChanges.py script #6
Browse files Browse the repository at this point in the history
The script has been simplified, and a bit of documentation was added.
It now uploads files from a directory, so it is not necessary to construct a JSON "uploads" dictionary.
  • Loading branch information
josusky committed Feb 9, 2024
1 parent dddaea5 commit 8422e99
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 116 deletions.
2 changes: 1 addition & 1 deletion prodRegister
Original file line number Diff line number Diff line change
@@ -1 +1 @@
http://codes.wmo.int
https://codes.wmo.int
229 changes: 115 additions & 114 deletions scripts/uploadChanges.py
Original file line number Diff line number Diff line change
@@ -1,137 +1,138 @@
import argparse
import json
import os

import requests

"""
This script uploads content to the defined register
./prodRegister
This reqires an authentication token and userID and structured
content.
The structured content is taken from the command line positional argument
uploads
which may either consist of a path to a JSON encoded file or an explicit JSON string
The JSON payload shall be a dictionary with the keys:
'PUT', 'POST'
and each key shall provide a list of .ttl files to upload to prodRegister
based on the relative path of the .ttl file.
This script uploads TTL files to the defined register
./prodRegister or ./testRegister
This requires an authentication token/password, userID and the name of directory with TTL files.
"""

def authenticate(session, base, userid, pss):
# Prefer HTTPS for registry session interactions
# Essential for authenticate due to 405 response
if base.startswith('http://'):
base = base.replace('http://', 'https://')
url = f'{base}/system/security/apilogin'

def authenticate(base_url, user_id, password):
"""Constructs authenticated session (with JSESSIONID cookie)."""
url = f"{base_url}/system/security/apilogin"
print(f'Authenticating at "{url}"')
auth = session.post(url, data={'userid':userid, 'password':pss})
session = requests.Session()
auth = session.post(url, data={"userid": user_id, "password": password})
if not auth.status_code == 200:
raise ValueError('auth failed')

raise ValueError("Authentication failed")
return session

def parse_uploads(uploads):
result = json.loads(uploads)
if set(result.keys()) != set(('PUT', 'POST')):
raise ValueError("Uploads inputs should have keys"
" set(('PUT', 'POST')) only, not:\n"
"{}".format(result.keys()))
return result

def post(session, url, payload):
# Prefer HTTPS for registry session interactions
if url.startswith('http://'):
url = url.replace('http://', 'https://')
# POST new content to the intended parent register
headers={'Content-type':'text/turtle; charset=UTF-8'}

def post(session, url, payload, dry_run, verbose):
"""Posts new content to the intended parent register."""
headers = {"Content-type": "text/turtle; charset=UTF-8"}
response = session.get(url, headers=headers)
#if response.status_code != 200:
#raise ValueError('Cannot POST to {}, it does not exist.'.format(url))
# params = {'status':'experimental'}
params = {'status':'stable'}
# params = {'status':'experimental'}
params = {"status": "stable"}
if not dry_run:
res = session.post(url, headers=headers, data=payload.encode("utf-8"),
params=params)
if verbose:
print(f' Posting to: "{url}"')
# print(f'payload: {payload.encode("utf-8")}')
print(f" headers: {headers}")
print(f" params: {params}")
res = session.post(url, headers=headers, data=payload.encode("utf-8"), params=params, stream=False)
if res.status_code != 201:
print('POST failed with {}\n{}'.format(res.status_code, res.reason))
print(f' POST failed with {res.status_code} {res.reason}:\n {res.content.decode("utf-8")}')
elif verbose:
print(f" POST succeeded with {res.status_code} {res.reason}\n")
else:
print(f' Would post to: "{url}"')
print(f" headers: {headers}")
print(f" params: {params}")


def put(session, url, payload, dry_run, verbose):
"""Updates definition of a register or entity."""
headers = {"Content-type": "text/turtle; charset=UTF-8"}
# params = {'status':'experimental'}
params = {"status": "stable"}
# for register update adjust the URL
if "reg:Register" in payload:
url += "?non-member-properties"
if not dry_run:
if verbose:
print(f' Putting to: "{url}"')
# print(f'payload: {payload.encode("utf-8")}')
print(f" headers: {headers}")
print(f" params: {params}")
res = session.put(url, headers=headers, data=payload.encode("utf-8"), params=params)
if res.status_code != 204:
print(f' PUT failed with {res.status_code} {res.reason}:\n {res.content.decode("utf-8")}')
elif verbose:
print(f" PUT succeeded with {res.status_code} {res.reason}\n")
else:
print(f'Would post to: "{url}"')
print(f'payload: {payload.encode("utf-8")}')
print(f'params: {params}')
print(f' Would put to "{url}"')
print(f" headers: {headers}")
print(f" params: {params}")


def put(session, url, payload, dry_run):
# Prefer HTTPS for registry session interactions
if url.startswith('http://'):
url = url.replace('http://', 'https://')
# PUT updated content to the entity already registered
headers={'Content-type':'text/turtle; charset=UTF-8'}
def upload(session, url, payload, dry_run, verbose):
"""PUTs or POSTs given data depending if it already exists or not."""
headers = {"Content-type": "text/turtle; charset=UTF-8"}
if verbose:
print(f" Checking {url}:", end=" ")
response = session.get(url, headers=headers)
if response.status_code != 200:
raise ValueError('Cannot PUT to {}, it does not exist.'.format(url))
params = {'status':'experimental'}
if not dry_run:
res = session.put(url, headers=headers, data=payload.encode("utf-8"),
params=params)
if response.status_code == 200:
if verbose:
print("Existing entry, using PUT")
put(session, url, payload, dry_run, verbose)
elif response.status_code == 404:
if verbose:
print("New entry, using POST")
url = "/".join(url.split("/")[:-1])
post(session, url, payload, dry_run, verbose)
else:
print(f'Would put to "{url}"')
print(f'payload: {payload.encode("utf-8")}')
print(f'params: {params}')

def post_uploads(session, rootURL, uploads, dry_run):
for postfile in uploads:
with open('.{}'.format(postfile), 'r', encoding="utf-8") as pf:
pdata = pf.read()
# post, so remove last part of identity, this is in the payload
relID = postfile.replace('.ttl', '')
relID = '/'.join(postfile.split('/')[:-1])
url = '{}{}'.format(rootURL, relID)
print(url)
post(session, url, pdata, dry_run)

def put_uploads(session, rootURL, uploads, dry_run):
for putfile in uploads:
with open('.{}'.format(putfile), 'r', encoding="utf-8") as pf:
pdata = pf.read()
relID = putfile.replace('.ttl', '')
url = '{}{}'.format(rootURL, relID)
print(url)
put(session, url, pdata, dry_run)

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('user_id')
parser.add_argument("passcode")
parser.add_argument("tmode")
parser.add_argument('uploads')
parser.add_argument('-n', '--dry-run', action="store_true", help='Only print what would be uploaded without actually sending anything.')
args = parser.parse_args()
raise ValueError(
f'Cannot upload to {url}: {response.status_code} {response.reason}:\n {response.content.decode("utf-8")}'
)

if os.path.exists(args.uploads):
with open(args.uploads, 'r') as ups:
uploads = ups.read()
else:
uploads = args.uploads
uploads = parse_uploads(uploads)
print(uploads)
if args.tmode not in ['test', 'prod']:
raise ValueError('test mode must be either "test" or "prod"')
if args.tmode == 'prod':
with open('prodRegister', 'r', encoding='utf-8') as fh:
rooturl = fh.read().split('\n')[0]
print('Running upload with respect to {}'.format(rooturl))
elif args.tmode == 'test':
with open('testRegister', 'r', encoding='utf-8') as fh:
rooturl = fh.read().split('\n')[0]
print('Running upload with respect to {}'.format(rooturl))

session = requests.Session()
session = authenticate(session, rooturl, args.user_id, args.passcode)
print(uploads)
post_uploads(session, rooturl, uploads['POST'], args.dry_run)
put_uploads(session, rooturl, uploads['PUT'], args.dry_run)
def upload_file(session, rootURL, file_path, dry_run, verbose):
"""Uploads given TTL file to the registry."""
with open(file_path, "r", encoding="utf-8") as file:
ttl_data = file.read()
relID = file_path.replace(".ttl", "")
url = f"{rootURL}/{relID}"
print(f"Uploading {file_path}")
upload(session, url, ttl_data, dry_run, verbose)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("user_id", help='User ID, in form "https://api.github.com/users/<my_id>"')
parser.add_argument(
"pass_code", help='Password or token generated at "https://ci.codes.wmo.int/ui/temporary-password"'
)
parser.add_argument("mode", help='Mode: "test" or "prod"')
parser.add_argument("directory", help="Name of the folder with TTL files to upload.")
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
help="Only print what would be uploaded without actually sending anything.",
)
parser.add_argument("-v", "--verbose", action="store_true", help="Print more details.")
args = parser.parse_args()

if not os.path.isdir(args.directory):
raise ValueError(f'Directory "{args.directory}" does not exists.')
if args.mode not in ["test", "prod"]:
raise ValueError('Mode must be either "test" or "prod"')
if args.mode == "prod":
with open("prodRegister", "r", encoding="utf-8") as fh:
base_url = fh.read().split("\n")[0]
elif args.mode == "test":
with open("testRegister", "r", encoding="utf-8") as fh:
base_url = fh.read().split("\n")[0]

print(f"Running upload with respect to {base_url}")

session = authenticate(base_url, args.user_id, args.pass_code)
for root, dirs, files in os.walk(args.directory):
for file_ in files:
if file_.endswith(".ttl"):
filename = os.path.join(root, file_)
upload_file(session, base_url, filename, args.dry_run, args.verbose)
2 changes: 1 addition & 1 deletion testRegister
Original file line number Diff line number Diff line change
@@ -1 +1 @@
http://ci.codes.wmo.int
https://ci.codes.wmo.int

0 comments on commit 8422e99

Please sign in to comment.