Skip to content

Commit

Permalink
Merge pull request #15 from materials-data-facility/forge-dev
Browse files Browse the repository at this point in the history
v0.4.6
  • Loading branch information
jgaff authored Oct 24, 2017
2 parents 1abc6fc + 5f18256 commit d8421c7
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 50 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ notifications:
on_success: never
on_failure: always
slack:
secure: h9Pn5T7PPIcoKAu61I7/bigL9A3UdvmF+D92mbEGCr1w+Vk9/TaTwayxNleVLd2ukG9cuMKFJ9lYzDamuHkadKVue7SbyXLp7Gg+n88Vdn5OJ0tSgVG/V4E9qj0S8GSGX1XnBdiMpV8tTqf1CAiNvAu7xA4kstd1WooFIostkk6v3lGIGAr5y4F+Ziz4JozRgJWByMSkx46GEFUFazLpVSz0iIZL1J/Jyzh2D0uo52f8V6kWqXJsyJZXKQYLNZD6sQNk48DcAOZoJPkmJ1MTFuoa2Qi0Re9Q0L8A+n2CZ0B+/4IxWIJFkYIIJ3/Amn7/2pks8+dNcmX0zFPgdxiP/L3LW4AMLe3Y1cTWCDKxjHzdpiMs83ZfrApPZqdAu66F/mLY+kNY9xDtlQ5z9UA/AGkVAO9gARWUHwZNEF4NQ1pN3MSwTwhlDnw/SJXgkWa+5wR7RNDAGbLMaWztphxslOCj/5HeDNA/mMBcb2yoTMsZzMdZRloKkaCx+EjVc7bZlBfo1GVB2AgkASc5zAUn+6w8oTARm2oHDMjJc3c3HvO3rbHteC3y89KcnNWIk7q1KuBz/53JyvQFvc5Iil2yFFKcopS5c7B1tP/gTa1CgrOhLidPr1qISWS31X6UA+7GNmVYJl0kUfWNCDLHBkw4PfhUw894BLsTNB9pIICaEBs=
on_success: always
on_failure: always
rooms:
secure: vYpU1qxCSx8OMQsHBEfLZP6foqd88Xon54kejUNzMKR6BpOS0rgJZmY1VYKgqd8gWxAox3WyYDyIzEsEO/b0JQ4l53hC9A1Hk0F5YCTm9NjPIb0boIpJ2MbIuijnDAJEEiruevQKH+esmbOssGSg8bDyCd8V1vWrvCEe84pIbfsevIyuUnRWKpEIHk/oIDliUy9d8HhNJLGuJrKnIHtMFw+fkO36yQMxM5SDXuM6yvBXr3+7ntqeZXthYogmKLn1fu+X/V9v8Zz2MlfZmY9G5q3RSJAlC4ASeFLF8GNBZHsRBlGPvF8RNtRiiUlxXmYOes7B61SD3sHVAMvNhSS2xL0LGNU9ZzUWx8HVwlt60ATOF7aOu2Uao3SVWMi50ZC0DfRlUw11gNOyyMFq5zsHK32HeFm81o/TvSr/Iy4Xy8j/psX/38T9MjrgFxPneOolUIBjAeW27kW/39qfLLtPOqA3wmByucmiQTJXJhpIbiEKfnliERxbp5gGdFg0DZKQKWNikxjhUJXKU4EF5FC7m9KgqkgvfcnhhEoz6moRBDAjC9/xr9xIYFDa5bIYuB98gGWle452pUr3x5Br1SvS7RfA42Ob2G2U11T/GQ+uEmXy1Ai5emrxjF10qfXY1j7Oe1MbKNqR/C/n3Z3xjduX3LDSkskvcClqmi9cFuMX3Kk=
deploy:
provider: pypi
user: jgaff
on:
tags: true
distributions: "sdist bdist_wheel"
distributions: sdist bdist_wheel
password:
secure: Bao2ot8PG8VX8EjzQ9kgLmzQIkepGf4o3VPM1IhUuYjlos3cpVtCn1Vqn1GLShdJzKXrgkAo12A0qkxt9T0Mx/hwYtkY4mlg/0wTo8DAKq96Gy6kjb9xlXw+DpeoVwHOUkibCRIPd+QC3z5Ut36l/xAR31xJ8BDhIkD7MaVZXwgB2ykG3tElL63peFCMVPhS7HmZ9Hf+11b73MPdlG8j/gNReDtd9vCU/LmGrjiHukB3nQLYmmVnCvbfwQdUJ3t2B9BpNlRUwcfqLOyTFIQoWlo+GkkyhTsafWEhHMQgsZvcti5PPlgbUSrUK1fUc5+/gRRsn6RJ0Bof+njXmgVmgOA4XGRsO+/L4Dfe9gXAleR1lx/lIzkh9hPot870w+65WIQs4tdSx9cShXHx+d6uv3eWkzPhCzWZwrK5DL4M5oXRSpojA8MPzuYsTsWaOdCHXj6g7KymdHRqgi7QPWJGvOf/Gs+xHaLcSKMZz/uLBVFyTCjcy3vyv48n3HljZTLYNp6tpV77Yo5i/qNtQJ7Tb853L/iBosGQd2DMVxGvbJbhu7grdgA5wt/nGOKLNLDYc7AmgSQoWh3un5/c0cTQYQo8e13fLC43FTKTQthsalZNBfLcaCDVJ6KvKMh/kF11pYfTQ8hDzpGsFklthXvu9zggS1I4sHwbYjb6Z7lAle0=
51 changes: 45 additions & 6 deletions mdf_forge/forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@ class Forge:
local_ep is the endpoint ID of the local Globus Connect Personal endpoint.
"""
__index = "mdf"
__default_index = "mdf"
__services = ["mdf", "transfer", "search"]
__app_name = "MDF_Forge"

def __init__(self, **kwargs):
def __init__(self, index=__default_index, local_ep=None, **kwargs):
"""Initialize the Forge instance.
Arguments:
index (str): The Globus Search index to search on. Default "mdf".
local_ep (str): The endpoint ID of the local Globus Connect Personal endpoint.
If not provided, may be autodetected as possible.
Keyword Arguments:
index (str): The Globus Search index to search on.
services (list of str): The services to authenticate for.
local_ep (str): The endpoint ID of the local Globus Connect Personal endpoint.
Advanced users only.
"""
self.__index = kwargs.get('index', self.__index)
self.__index = index
self.local_ep = local_ep
self.__services = kwargs.get('services', self.__services)
self.local_ep = kwargs.get("local_ep", None)

clients = toolbox.login(credentials={
"app_name": self.__app_name,
Expand Down Expand Up @@ -173,6 +177,32 @@ def aggregate(self, q=None, scroll_size=SEARCH_LIMIT, reset_query=True):
return res


def show_fields(self, block=None):
"""Retrieve and return the mapping for the given metadata block."
Arguments:
block (str): The top-level field to fetch the mapping for.
Default None, which lists just the blocks.
Returns:
dict: A set of field:datatype pairs.
"""
mapping = self.__query.mapping()
if not block:
blocks = set()
for key in mapping.keys():
blocks.add(key.split(".")[0])
block_map = {}
for b in blocks:
block_map[b] = "object"
else:
block_map = {}
for key, value in mapping.items():
if key.startswith(block):
block_map[key] = value
return block_map


def reset_query(self):
"""Destroy the current query and create a fresh, clean one."""
del self.__query
Expand Down Expand Up @@ -899,3 +929,12 @@ def aggregate(self, q=None, scroll_size=SEARCH_LIMIT):

return output


def mapping(self):
"""Fetch the mapping for the current index.
Returns:
dict: The full mapping for the index.
"""
return self.__search_client.mapping()["mappings"]

74 changes: 35 additions & 39 deletions mdf_forge/toolbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@

from six import print_

# Will uncomment the UUIDs once Search actually handles them as documented
SEARCH_INDEX_UUIDS = {
"mdf": "mdf", #"d6cc98c3-ff53-4ee2-b22b-c6f945c0d30c",
"mdf-test": "mdf-test", #"c082b745-32ac-4ad2-9cde-92393f6e505c",
"dlhub": "dlhub", #"847c9105-18a0-4ffb-8a71-03dd76dfcc9d",
"dlhub-test": "dlhub-test" #"5c89e0a9-00e5-4171-b415-814fe4d0b8af"
}


###################################################
Expand Down Expand Up @@ -115,10 +122,10 @@ def _get_tokens(client, scopes, app_name, force_refresh=False):
clients["transfer"] = globus_sdk.TransferClient(authorizer=transfer_authorizer)
if "search_ingest" in servs:
ingest_authorizer = globus_sdk.RefreshTokenAuthorizer(all_tokens["search.api.globus.org"]["refresh_token"], native_client)
clients["search_ingest"] = SearchClient(default_index=(creds.get("index", None) or kwargs.get("index", None)), authorizer=ingest_authorizer)
clients["search_ingest"] = SearchClient(index=(kwargs.get("index", None) or creds["index"]), authorizer=ingest_authorizer)
elif "search" in servs:
search_authorizer = globus_sdk.RefreshTokenAuthorizer(all_tokens["search.api.globus.org"]["refresh_token"], native_client)
clients["search"] = SearchClient(default_index=(creds.get("index", None) or kwargs.get("index", None)), authorizer=search_authorizer)
clients["search"] = SearchClient(index=(kwargs.get("index", None) or creds["index"]), authorizer=search_authorizer)
if "mdf" in servs:
mdf_authorizer = globus_sdk.RefreshTokenAuthorizer(all_tokens["data.materialsdatafacility.org"]["refresh_token"], native_client)
clients["mdf"] = mdf_authorizer
Expand Down Expand Up @@ -193,9 +200,9 @@ def confidential_login(credentials=None):
if "transfer" in servs:
clients["transfer"] = globus_sdk.TransferClient(authorizer=conf_authorizer)
if "search_ingest" in servs:
clients["search_ingest"] = SearchClient(default_index=creds.get("index", None), authorizer=conf_authorizer)
clients["search_ingest"] = SearchClient(index=creds["index"], authorizer=conf_authorizer)
elif "search" in servs:
clients["search"] = SearchClient(default_index=creds.get("index", None), authorizer=conf_authorizer)
clients["search"] = SearchClient(index=creds["index"], authorizer=conf_authorizer)
if "mdf" in servs:
clients["mdf"] = conf_authorizer
if "publish" in servs:
Expand Down Expand Up @@ -226,6 +233,8 @@ def find_files(root, file_pattern=None, verbose=False):
no_root_path (str): The path to the directory containing the file, with the path to the root directory removed.
filename (str): The name of the file.
"""
if not os.path.exists(root):
raise ValueError("Path '" + root + "' does not exist.")
# Add separator to end of root if not already supplied
root += os.sep if root[-1:] != os.sep else ""
for path, dirs, files in tqdm(os.walk(root), desc="Finding files", disable= not verbose):
Expand All @@ -248,8 +257,8 @@ def uncompress_tree(root, verbose=False):
If False, will remain silent unless there is an error.
Default False.
"""
for path, dirs, files in tqdm(os.walk(root), desc="Uncompressing files", disable= not verbose):
for single_file in files:
for path, dirs, files in tqdm(os.walk(root), desc="Uncompressing dirs", disable= not verbose):
for single_file in tqdm(files, desc="Uncompressing files", disable= not verbose):
abs_path = os.path.join(path, single_file)
if tarfile.is_tarfile(abs_path):
tar = tarfile.open(abs_path)
Expand Down Expand Up @@ -376,7 +385,7 @@ def quick_transfer(transfer_client, source_ep, dest_ep, path_list, timeout=None)
If this argument is -1, the transfer will submit but not wait at all. There is then no error checking.
Returns:
int: 0 on success.
str: ID of the Globus Transfer.
"""
INTERVAL_SEC = 10
tdata = globus_sdk.TransferData(transfer_client, source_ep, dest_ep, verify_checksum=True)
Expand All @@ -396,7 +405,7 @@ def quick_transfer(transfer_client, source_ep, dest_ep, path_list, timeout=None)
raise globus_sdk.GlobusError("Failed to transfer files: Transfer " + res["code"])

iterations = 0
while timeout >= 0 and not transfer_client.task_wait(res["task_id"], timeout=INTERVAL_SEC, polling_interval=INTERVAL_SEC):
while timeout is not None and timeout >= 0 and not transfer_client.task_wait(res["task_id"], timeout=INTERVAL_SEC, polling_interval=INTERVAL_SEC):
for event in transfer_client.task_event_list(res["task_id"]):
if event["is_error"]:
transfer_client.cancel_task(res["task_id"])
Expand All @@ -406,7 +415,7 @@ def quick_transfer(transfer_client, source_ep, dest_ep, path_list, timeout=None)
raise globus_sdk.GlobusError("Transfer timed out after " + str(iterations * INTERVAL_SEC) + " seconds.")
iterations += 1

return 0
return res["task_id"]


def get_local_ep(transfer_client):
Expand Down Expand Up @@ -475,25 +484,19 @@ def get_local_ep(transfer_client):
class SearchClient(BaseClient):
"""Access (search and ingest) Globus Search."""

def __init__(self, base_url="https://search.api.globus.org/", default_index=None, **kwargs):
def __init__(self, index, base_url="https://search.api.globus.org/", **kwargs):
app_name = kwargs.pop('app_name', 'Search Client v0.2')
BaseClient.__init__(self, "search", app_name=app_name, **kwargs)
# base URL lookup will fail, producing None, set it by hand
self.base_url = base_url
self._headers['Content-Type'] = 'application/json'
self.default_index = default_index
self.index = SEARCH_INDEX_UUIDS.get(index.strip().lower()) or index

def _base_index_uri(self, index):
index = index or self.default_index
if not index:
raise ValueError(
('You must either pass an explicit index '
'or set a default one at the time that you create '
'a SearchClient'))
return '/v1/index/{}'.format(index)
def _base_index_uri(self):
return '/v1/index/{}'.format(self.index)

def search(self, q, limit=None, offset=None, query_template=None,
index=None, advanced=None, **params):
advanced=None, **params):
"""
Perform a simple ``GET`` based search.
Expand All @@ -506,10 +509,6 @@ def search(self, q, limit=None, offset=None, query_template=None,
The user-query string. Required for simple searches (and most
advanced searches).
``index`` (*string*)
Optional unless ``default_index`` was not set.
The index to query.
``limit`` (*int*)
Optional. The number of results to return.
Expand All @@ -527,12 +526,12 @@ def search(self, q, limit=None, offset=None, query_template=None,
``params``
Any additional query params to pass. For internal use only.
"""
uri = slash_join(self._base_index_uri(index), 'search')
uri = slash_join(self._base_index_uri(), 'search')
merge_params(params, q=q, limit=limit, offset=offset,
query_template=query_template, advanced=advanced)
return self.get(uri, params=params)

def structured_search(self, data, index=None, **params):
def structured_search(self, data, **params):
"""
Perform a structured, ``POST``-based, search.
Expand All @@ -541,21 +540,17 @@ def structured_search(self, data, index=None, **params):
``data`` (*dict*)
A valid GSearchRequest document to execute.
``index`` (*string*)
Optional unless ``default_index`` was not set.
The index to query.
``advanced`` (*bool*)
Use simple query parsing vs. advanced query syntax when
interpreting the query string. Defaults to False.
``params``
Any additional query params to pass. For internal use only.
"""
uri = slash_join(self._base_index_uri(index), 'search')
uri = slash_join(self._base_index_uri(), 'search')
return self.post(uri, json_body=data, params=params)

def ingest(self, data, index=None, **params):
def ingest(self, data, **params):
"""
Perform a simple ``POST`` based ingest op.
Expand All @@ -564,21 +559,22 @@ def ingest(self, data, index=None, **params):
``data`` (*dict*)
A valid GIngest document to index.
``index`` (*string*)
Optional unless ``default_index`` was not set.
The search index to send data into.
``params``
Any additional query params to pass. For internal use only.
"""
uri = slash_join(self._base_index_uri(index), 'ingest')
uri = slash_join(self._base_index_uri(), 'ingest')
return self.post(uri, json_body=data, params=params)

def remove(self, subject, index=None, **params):
uri = slash_join(self._base_index_uri(index), "subject")
def remove(self, subject, **params):
uri = slash_join(self._base_index_uri(), "subject")
params["subject"] = subject
return self.delete(uri, params=params)

def mapping(self, **params):
"""Get the mapping for the index."""
uri = "/unstable/index/{}/mapping".format(self.index)
return self.get(uri, params=params)


class DataPublicationClient(BaseClient):
"""Publish data with Globus Publish."""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='mdf_forge',
version='0.4.5',
version='0.4.6',
packages=['mdf_forge'],
description='Materials Data Facility python package',
long_description="Forge is the Materials Data Facility Python package to interface and leverage the MDF Data Discovery service. Forge allows users to perform simple queries and facilitiates moving and synthesizing results.",
Expand Down
11 changes: 9 additions & 2 deletions tests/test_forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,7 @@ def test_forge_http_download():
os.remove(os.path.join(dest_path, "test_multifetch.txt"))


# This test does not work on Travis because Travis does not have a local Globus EP
@pytest.mark.skipif(os.getenv("TEST_ENV", "local") == "travis", reason="Travis CI does not have a Globus Endpoint.")
@pytest.mark.xfail(reason="Test relies on get_local_ep() which will cause failures if exactly one EP isn't detected.")
def test_forge_globus_download():
f = forge.Forge()
# Simple case
Expand Down Expand Up @@ -430,3 +429,11 @@ def test_forge_chaining():
res2 = forge.Forge().match_field("source_name", "cip").match_field("elements", "Al").search()
assert all([r in res2 for r in res1]) and all([r in res1 for r in res2])


def test_forge_show_fields():
f1 = forge.Forge()
res1 = f1.show_fields()
assert "mdf" in res1.keys()
res2 = f1.show_fields("mdf")
assert "mdf.mdf_id" in res2.keys()

0 comments on commit d8421c7

Please sign in to comment.