diff --git a/lib/bald/__init__.py b/lib/bald/__init__.py index d61c24d..0e34b2d 100644 --- a/lib/bald/__init__.py +++ b/lib/bald/__init__.py @@ -675,8 +675,10 @@ def load(afilepath): loader = netCDF4.Dataset else: raise ValueError('filepath suffix not supported: {}'.format(afilepath)) - if not os.path.exists(afilepath): - raise IOError('{} not found'.format(afilepath)) + #Disable this check for now to allow URL input + #TODO: Add feature to check both local files and files on the web, e.g. URLs + #if not os.path.exists(afilepath): + # raise IOError('{} not found'.format(afilepath)) try: f = loader(afilepath, "r") yield f diff --git a/nc2rdf/README.md b/nc2rdf/README.md index 43db903..9f5067a 100644 --- a/nc2rdf/README.md +++ b/nc2rdf/README.md @@ -31,4 +31,32 @@ $ python nc2rdf.py -o ttl myfile.nc $ python nc2rdf.py -o xml myfile.nc ``` +## nc2schemaorg + +This feature provides users a way to create schema.org descriptions from +ACDD/CF/NUG conformant values in a nc file. + +``` +$ python nc2rdf.py -o json-ld --schema-org [cdl or nc file] +``` + +Example: +``` +$ python nc2rdf.py -o json-ld --schema-org ../lib/bald/tests/integration/CDL/trajectoryProfile_template.cdl +``` + + Note: This command-line tool is experimental and is subject to changes, however serves as a prototype for accessing bald functions for netCDF related files to RDF. + + +# thredds2rdf + +This tool allows users to input a THREDDS endpoint or THREDDS catalog.xml and get a set of RDF graphs returned for every nc file found. + +Example: +``` +$ python thredds2rdf.py http://example.org/thredds +$ python thredds2rdf.py http://example.org/thredds/catalog.xml +``` + +Output will be emitted to the `rdf` directory diff --git a/nc2rdf/bald2schemaorg_mappings.json b/nc2rdf/bald2schemaorg_mappings.json new file mode 100644 index 0000000..9eadd25 --- /dev/null +++ b/nc2rdf/bald2schemaorg_mappings.json @@ -0,0 +1,8 @@ +[ + { "bald" : "summary", "schemaorg": "description" }, + { "bald" : "title", "schemaorg": "name" }, + { "bald" : "id", "schemaorg": "identifier" }, + { "bald" : "keywords", "schemaorg": "keywords" }, + { "bald" : "license", "schemaorg": "license" }, + { "bald" : "standard_name", "schemaorg": "variableMeasured" } +] diff --git a/nc2rdf/nc2rdf.py b/nc2rdf/nc2rdf.py index 4a91e32..41c9bd0 100644 --- a/nc2rdf/nc2rdf.py +++ b/nc2rdf/nc2rdf.py @@ -5,13 +5,123 @@ import netCDF4 import numpy as np import bald +import rdflib +import json +from rdflib import Namespace, BNode, URIRef, Literal +from rdflib.namespace import RDF +try: + # python 3 + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse -def nc2rdf(ncfilename, outformat, baseuri=None): - #print("nc2rdf test") - #print(ncfile) +def isUrl(url): + try: + result = urlparse(url) + if all([result.scheme, result.netloc, result.path]) and (result.scheme == 'https' or result.scheme == 'http'): + return True + except: + return False + +def getBasename(urlstr): + return os.path.basename(urlstr) + +def baldgraph2schemaorg(graph, path=None, baseuri=None): + """ + Input: netCDF file + Transforms to a rdflib.Graph bald style + Returns a new graph in schema.org profile + """ + # HACK: The following mappings ignore prefixes as well as prefixes in nc file + # TODO: Fix references to prefixes/aliases proper + + #load mappings + mapping_idx = {} + mapping_data = [] + with open('bald2schemaorg_mappings.json' , 'r') as f: + mapping_data = json.load(f) + + for item in mapping_data: + mapping_idx[item['bald']] = item['schemaorg'] + + qres = graph.query( + """PREFIX bald: + SELECT DISTINCT ?pred ?value + WHERE { + ?c a bald:Container . + ?c ?pred ?value + }""") + + schema_g = rdflib.Graph() + + if baseuri is not None: + container = URIRef(baseuri) + else: + container = BNode() + + so = Namespace("http://schema.org/") + schema_g.add( (container, URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), so.Dataset) ) + + if path is not None and isUrl(path): + predUri = URIRef("http://schema.org/url") + schema_g.add( (container, predUri, URIRef(path)) ) + + for row in qres: + currField = getBasename(str(row[0])).strip() + #print(getBasename(str(row[0])) + ' (type: ' + str(type(row[0])) + ")" + " :: " + row[1] + ' (type: ' + str(type(row[1])) + ")") + if(currField in mapping_idx.keys()): + predUri = URIRef("http://schema.org/" + mapping_idx[currField]) + if currField == 'keywords': + for x in row[1].split(','): + kw = x.strip() + if len(kw) == 0: + continue + lit = Literal(kw) + schema_g.add( (container, predUri, lit) ) + continue + + #print('schemaorg:' + mapping_idx[currField], "\t", row[1]) + lit = Literal(row[1]) + schema_g.add( (container, predUri, lit) ) + return schema_g + +def nc2schemaorg(ncfilename, outformat, outputfile=None, baseuri=None): root_container = bald.load_netcdf(ncfilename, baseuri=baseuri) - ttl = root_container.rdfgraph().serialize(format=outformat).decode("utf-8") - print(ttl) + graph = root_container.rdfgraph() + schema_g = baldgraph2schemaorg(graph, path=ncfilename, baseuri=baseuri) + destination = None + if outputfile is not None: + destination = outputfile + if(outformat == 'json-ld') and destination is not None: + context = "http://schema.org/" + #s = schema_g.serialize(destination=destination, format=outformat, context=context, indent=4).decode("utf-8") + s = schema_g.serialize(destination=destination, format=outformat, context=context, indent=4) + elif outformat == 'json-ld' and destination is None: + context = "http://schema.org/" + s = schema_g.serialize(destination=destination, format=outformat, context=context, indent=4).decode("utf-8") + print(s) + else: + #s = schema_g.serialize(destination=destination, format=outformat).decode("utf-8") + s = schema_g.serialize(destination=destination, format=outformat) + +def nc2rdf(ncfilename, outformat, outputfile=None, baseuri=None): + root_container = bald.load_netcdf(ncfilename, baseuri=baseuri) + if(outputfile is None): + #ttl = root_container.rdfgraph().serialize(format=outformat).decode("utf-8") + ttl = root_container.rdfgraph().serialize(format=outformat) + print(ttl) + else: + #ttl = root_container.rdfgraph().serialize(destination=outputfile, format=outformat).decode("utf-8") + ttl = root_container.rdfgraph().serialize(destination=outputfile, format=outformat) + + +def cdl2schemaorg(cdl_file, outformat, baseuri=None): + tfile, tfilename = tempfile.mkstemp('.nc') + subprocess.check_call(['ncgen', '-o', tfilename, cdl_file]) + schema_g = nc2schemaorg(tfilename, outformat, baseuri=baseuri) + os.close(tfile) + os.remove(tfilename) + return schema_g def cdl2rdf(cdl_file, outformat, baseuri=None): #print("cdl2rdf test") @@ -32,13 +142,20 @@ def cdl2rdf(cdl_file, outformat, baseuri=None): parser.add_argument('--baseuri', action="store", dest="baseuri", help="Base URI for the graph") parser.add_argument('--cdl', action="store_true", dest="isCDL", default=False, help="Flag to indicate file is CDL") parser.add_argument('--nc', action="store_true", dest="isNC", default=False, help="Flag to indicate file is netCDF") + parser.add_argument('--schema-org', action="store_true", dest="isSchemaOrgOutput", default=False, help="Flag to indicate if schema.org output activated") parser.add_argument("ncfile", help="Path for the netCDF file") args = parser.parse_args() if(args.isCDL or args.ncfile.endswith(".cdl") or args.ncfile.endswith('.CDL')): - cdl2rdf(args.ncfile, args.format, baseuri=args.baseuri) + if(args.isSchemaOrgOutput): + cdl2schemaorg(args.ncfile, args.format, baseuri=args.baseuri) + else: + cdl2rdf(args.ncfile, args.format, baseuri=args.baseuri) elif(args.isNC or args.ncfile.endswith(".nc") or args.ncfile.endswith('.NC')): - nc2rdf(args.ncfile, args.format, baseuri=args.baseuri) + if(args.isSchemaOrgOutput): + nc2schemaorg(args.ncfile, args.format, baseuri=args.baseuri) + else: + nc2rdf(args.ncfile, args.format, baseuri=args.baseuri) else: print("Unrecognised file suffix. Please indicate if CDL or NC via --cdl or --nc"); diff --git a/nc2rdf/requirements.txt b/nc2rdf/requirements.txt new file mode 100644 index 0000000..fc66813 --- /dev/null +++ b/nc2rdf/requirements.txt @@ -0,0 +1,3 @@ +lxml +pydap +urllib3 diff --git a/nc2rdf/threddsnc2rdf.py b/nc2rdf/threddsnc2rdf.py new file mode 100755 index 0000000..2076d62 --- /dev/null +++ b/nc2rdf/threddsnc2rdf.py @@ -0,0 +1,303 @@ +import nc2rdf +import re +import sys +import datetime +import argparse +import uuid +try: + from urlparse import urljoin # Python2 + from urlparse import urlsplit, urlunsplit + from urlparse import urlparse +except ImportError: + from urllib.parse import urljoin # Python3 + from urllib.parse import urlsplit, urlunsplit + from urllib.parse import urlparse +import lxml +import json +import requests +from pydap.client import open_url +import pydap.lib +import urllib +from timeit import default_timer as timer +import code, traceback, signal +import os + +pydap.lib.TIMEOUT = 5 + + +#Utility to allow debugger to attach to this program +def debug(sig, frame): + """Interrupt running process, and provide a python prompt for + interactive debugging.""" + d={'_frame':frame} # Allow access to frame object. + d.update(frame.f_globals) # Unless shadowed by global + d.update(frame.f_locals) + + i = code.InteractiveConsole(d) + message = "Signal received : entering python shell.\nTraceback:\n" + message += ''.join(traceback.format_stack(frame)) + i.interact(message) + +#Utility to allow debugger to attach to this program +def listen(): + signal.signal(signal.SIGUSR1, debug) # Register handler + +class ThreddsHarvester: + """Harvests metadata from a Thredds service""" + def lookup_datasets_in_catalog(self, base_url, catalog_url, list_of_netcdf_files): + """loads the catalog xml and extracts dataset access information""" + xml = None + res = requests.get(catalog_url) + if res.status_code == 200: + xml = lxml.etree.fromstring(res.content) + else: + print("Exiting ThreddsHarvester - HTTP code for catalog_url(" + catalog_url + ") was ", res.status_code) + return None + +# try: +# xml = lxml.etree.parse(catalog_url) +# except Exception as e: +# print("Exception caught in ThreddsHarvester - catalog_url(" + catalog_url + ")") +# print(e) +# return + namespaces = {"xlink": "http://www.w3.org/1999/xlink", 'c':'http://www.unidata.ucar.edu/namespaces/thredds/InvCatalog/v1.0'} + access_infos = [] + used_types = [] + for node in xml.xpath('/c:catalog/c:service/c:service', namespaces=namespaces): + access_type = node.get('serviceType') + if access_type not in used_types: + used_types.append(access_type) + access_info = { "type" : access_type, "access" : node.get('base') } + access_infos.append(access_info) + + #print "b: " + base_url + #print "c: " + catalog_url + + #open_dap_result = xml.xpath('/c:catalog/*/c:service[@serviceType="opendap"]', namespaces=namespaces) + open_dap_result = xml.xpath('//c:service[@serviceType="opendap" or @serviceType="OPENDAP" or @serviceType="OPeNDAP"]', namespaces=namespaces) + if len(open_dap_result) > 0: + open_dap_prefix = open_dap_result[0].get('base') + else: + open_dap_prefix = xml.xpath('/c:catalog/c:service[@serviceType="OPeNDAP"]', namespaces=namespaces)[0].get('base') + + iso_prefix_result = xml.xpath('/c:catalog/c:service/c:service[@serviceType="ISO"]', namespaces=namespaces) + wms_prefix_result = xml.xpath('/c:catalog/c:service/c:service[@serviceType="WMS"]', namespaces=namespaces) + + if len(wms_prefix_result) > 0: + wms_prefix = wms_prefix_result[0].get('base') + else: + wms_prefix = None + + if len(iso_prefix_result) > 0: + iso_prefix = iso_prefix_result[0].get('base') + else: + iso_prefix = None + + res = xml.xpath('/c:catalog/c:dataset/c:dataset|/c:catalog/c:dataset[@urlPath]|/c:catalog/c:dataset/c:dataset/c:access[@urlPath and @serviceName="dap"]|//c:catalogRef', namespaces=namespaces) + + for item in res: + print(item) + #if 'urlPath' in item.keys() and 'serviceName' in item.keys(): + if 'urlPath' in item.keys(): + # get the name from parent elem + parent = item.getparent() + name = parent.attrib['name'] + + url_path = item.attrib['urlPath'] + iso_path = base_url + iso_prefix + url_path if iso_prefix != None else None + wms_path = base_url + wms_prefix + url_path if wms_prefix != None else None + dataset_access_infos = [] + print("urlPath: " + url_path) + for access_info in access_infos: + dataset_access_info = { "type" : access_info["type"], "access" : base_url + access_info["access"] + url_path } + dataset_access_infos.append(dataset_access_info) + datasetEntry = { 'name' : name, 'open_dap_path': base_url + open_dap_prefix + url_path, 'iso_path': iso_path, 'wms_path': wms_path, 'access_infos' : dataset_access_infos } + list_of_netcdf_files.append(datasetEntry) + elif 'urlPath' in item.keys(): + url_path = item.attrib['urlPath'] + iso_path = base_url + iso_prefix + url_path if iso_prefix != None else None + wms_path = base_url + wms_prefix + url_path if wms_prefix != None else None + dataset_access_infos = [] + print("urlPath: " + url_path) + for access_info in access_infos: + dataset_access_info = { "type" : access_info["type"], "access" : base_url + access_info["access"] + url_path } + dataset_access_infos.append(dataset_access_info) + datasetEntry = { 'name' : item.attrib['name'], 'open_dap_path': base_url + open_dap_prefix + url_path, 'iso_path': iso_path, 'wms_path': wms_path, 'access_infos' : dataset_access_infos } + list_of_netcdf_files.append(datasetEntry) + if '{http://www.w3.org/1999/xlink}href' in item.keys(): + newCatalogPath = item.attrib["{http://www.w3.org/1999/xlink}href"] + #print item + #print "baseUrl " + base_url + #print "href " + newCatalogPath + #print "catalogUrl " + catalog_url + newCatalogUrl = urljoin(catalog_url, newCatalogPath) + print("newCatalogUrl " + newCatalogUrl) + if (newCatalogPath.endswith('catalog.xml')): + self.lookup_datasets_in_catalog(base_url, newCatalogUrl, list_of_netcdf_files) + elif (newCatalogPath.endswith('.xml')): + self.lookup_datasets_in_catalog(base_url, catalog_url.replace('catalog.xml', newCatalogPath), list_of_netcdf_files) + return list_of_netcdf_files + +def get_opendap_record(dataset_url): + """Get the open dap record from the thredds service and look for the eReefs observed properties, build a record of these and return""" + data = {} + print(dataset_url) + datasetInformation = open_url(dataset_url) + for variable in datasetInformation.keys(): + variable_properties = datasetInformation[variable] + data[variable] = {} + list_attributes = variable_properties.attributes.keys() + for variable_attribute in list_attributes : + value = variable_properties.attributes[variable_attribute] + data[variable][variable_attribute] = value + return data + +def assign_url_later(assign_url_map, property_to_assign, url_to_assign): + """build a record of a url and which properties in the dataset record it exists on, also update a list of unique urls, assign_url_map holds this information for later use when url is resolved""" + if not url_to_assign in assign_url_map["unique_urls"]: + assign_url_map["url_property_map"][url_to_assign] = [] + assign_url_map["unique_urls"].append(url_to_assign) + assign_url_map["url_property_map"][url_to_assign].append(property_to_assign) + +class RecordManager: + """Abstract class for managing dataset record persistence""" + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + pass + + def should_update(self, dataset_address): + return True + + def assign_urls(self, assign_url_map): + """Resolve urls in the assign_url_map and, if redirected, update the dataset records to use the redirected url""" + failed_urls = [] + redirected_urls = [] + original_to_resolved_url = {} + url_contents = {} + + print(assign_url_map) + + +def process_dataset(assign_url_map, dataset_address, thredds_url, thredds_catalog_url, outputformat='turtle', isSchemaOrgOutput=False): + """Extract information about the specific at dataset_address""" + print("processing dataset address: " + dataset_address['name']) + + #Use multiple endpoints to get information about this dataset with redundancy + opendap_url = dataset_address['open_dap_path'] + iso_url = dataset_address['iso_path'] + if 'wms_path' in dataset_address and dataset_address['wms_path'] != None: + wms_url = dataset_address['wms_path'] + "?service=WMS" + + #Common information across all variables in this dataset + common_info = {} + + opendap_information = {} + try: + opendap_information = get_opendap_record(opendap_url) + except Exception as e: + print("Exception caught in perform_harvest - get_opendap_record(" + opendap_url + "): ", e.message) + + common_info["access"] = dataset_address["access_infos"] + common_info["dataset_id"] = opendap_url + + #print opendap_information + #print json.dumps(opendap_information, check_circular=False, sort_keys=True, indent=4, separators=(',', ': '), default=datetime_handler) + + if isSchemaOrgOutput: + outputformat = 'json-ld' + unique_dataset_id = uuid.uuid4().hex + outputpath = OUTDIR + "/" + unique_dataset_id + ".json" + print("emitting to " + outputpath) + nc2rdf.nc2schemaorg(opendap_url, outputformat, outputfile=outputpath, baseuri=None) + elif outputformat == 'turtle': + unique_dataset_id = uuid.uuid4().hex + outputpath = OUTDIR + "/" + unique_dataset_id + ".ttl" + print("emitting to " + outputpath) + nc2rdf.nc2rdf(opendap_url, 'turtle', outputfile=outputpath, baseuri=opendap_url) + + +def perform_harvest(thredds_url, thredds_catalog_url, outputformat='turtle', isSchemaOrgOutput=False ): + """Perform harvest on the thredds_catalog_url""" + #Get dataset information + #print thredds_url + #print thredds_catalog_url + list_datasets_address = harvester.lookup_datasets_in_catalog(thredds_url, thredds_catalog_url, []) + if list_datasets_address is None: + return + + #print list_datasets_address + dataset_uri = '' + + #Prepare a map for delayed resolution of redirected urls + assign_url_map = { "unique_urls" : [], "url_property_map": {} } + + #process other dataset information + for dataset_address in list_datasets_address: + try: + process_dataset(assign_url_map, dataset_address, thredds_url, thredds_catalog_url, outputformat, isSchemaOrgOutput=isSchemaOrgOutput) + except requests.exceptions.HTTPError as e: + print("HTTPError caught in perform_harvest: ", e.message , " ", thredds_catalog_url) + except AttributeError as e: + print("AttributeError caught in perform_harvest: ", e.message , " ", thredds_catalog_url) + + +def process_thredds(thredds_url, isSchemaOrgOutput=False): + """harvest thredds endpoint""" + #assemble catalog url + + if (thredds_url.endswith('catalog.xml')): + thredds_catalog_url = thredds_url + thredds_url = get_base_url(thredds_url) + else: + thredds_catalog_url = thredds_url + '/catalog/catalog.xml' + print('thredds_base_url:' + thredds_url) + print('thredds_catalog_url:' + thredds_catalog_url) + perform_harvest(thredds_url, thredds_catalog_url, isSchemaOrgOutput=isSchemaOrgOutput) + + +def get_base_url(url): + split_url = urlsplit(url) + # You now have: + # split_url.scheme "http" + # split_url.netloc "127.0.0.1" + # split_url.path "/asdf/login.php" + # split_url.query "" + # split_url.fragment "" + + # urlunsplit takes and joins a five item iterable, we "" the last two items to remove the query string and fragment. + clean_url = urlunsplit((split_url.scheme, split_url.netloc, "", "", "")) + return clean_url + +def datetime_handler(x): + if isinstance(x, datetime.datetime): + return x.isoformat() + raise TypeError("Unknown type") + +def checkOrCreateDir(directory): + if not os.path.exists(directory): + os.makedirs(directory) + +if __name__ == '__main__': + start = timer() + + harvester = ThreddsHarvester() + + argparser = argparse.ArgumentParser() + argparser.add_argument('--schema-org', action="store_true", dest="isSchemaOrgOutput", default=False, help="Flag to indicate if schema.org output activated") + argparser.add_argument('threddsUrlOrCatalog', help='THREDDS endpoint url or catalog.xml') + args = argparser.parse_args() + + OUTDIR = 'rdf' + if(args.isSchemaOrgOutput): + OUTDIR = 'schemaorg' + #make sure outdir is created + checkOrCreateDir(OUTDIR) + + process_thredds(args.threddsUrlOrCatalog, args.isSchemaOrgOutput) + #process_dpn(dpn_url.strip(), DPN_ELDA_RES_ENDPOINT) + + end = timer() + elapsed = end - start + print("Execution took ", elapsed, " seconds")