-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #57 from apriltuesday/EVA-3624
EVA-3624: Add biosamples communicators
- Loading branch information
Showing
2 changed files
with
307 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
#!/usr/bin/env python | ||
# Copyright 2020 EMBL - European Bioinformatics Institute | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import re | ||
|
||
import requests | ||
from functools import cached_property | ||
from ebi_eva_common_pyutils.logger import AppLogger | ||
from retry import retry | ||
|
||
|
||
class HALNotReadyError(Exception): | ||
pass | ||
|
||
|
||
class HALCommunicator(AppLogger): | ||
""" | ||
This class helps navigate through REST API that uses the HAL standard. | ||
""" | ||
acceptable_code = [200, 201] | ||
|
||
def __init__(self, auth_url, bsd_url, username, password): | ||
self.auth_url = auth_url | ||
self.bsd_url = bsd_url | ||
self.username = username | ||
self.password = password | ||
|
||
def _validate_response(self, response): | ||
"""Check that the response has an acceptable code and raise if it does not""" | ||
if response.status_code not in self.acceptable_code: | ||
self.error(response.request.method + ': ' + response.request.url + " with " + str(response.request.body)) | ||
self.error("headers: {}".format(response.request.headers)) | ||
self.error("<{}>: {}".format(response.status_code, response.text)) | ||
raise ValueError('The HTTP status code ({}) is not one of the acceptable codes ({})'.format( | ||
str(response.status_code), str(self.acceptable_code)) | ||
) | ||
return response | ||
|
||
@cached_property | ||
def token(self): | ||
"""Retrieve the token from the AAP REST API then cache it for further quering""" | ||
response = requests.get(self.auth_url, auth=(self.username, self.password)) | ||
self._validate_response(response) | ||
return response.text | ||
|
||
@retry(exceptions=(ValueError, requests.RequestException), tries=3, delay=2, backoff=1.2, jitter=(1, 3)) | ||
def _req(self, method, url, **kwargs): | ||
"""Private method that sends a request using the specified method. It adds the headers required by bsd""" | ||
headers = kwargs.pop('headers', {}) | ||
headers.update({'Accept': 'application/hal+json'}) | ||
if self.token is not None: | ||
headers.update({'Authorization': 'Bearer ' + self.token}) | ||
if 'json' in kwargs: | ||
headers['Content-Type'] = 'application/json' | ||
response = requests.request( | ||
method=method, | ||
url=url, | ||
headers=headers, | ||
**kwargs | ||
) | ||
self._validate_response(response) | ||
return response | ||
|
||
def follows(self, query, json_obj=None, method='GET', url_template_values=None, join_url=None, **kwargs): | ||
""" | ||
Finds a link within the json_obj using a query string or list, modify the link using the | ||
url_template_values dictionary then query the link using the method and any additional keyword argument. | ||
If the json_obj is not specified then it will use the root query defined by the base url. | ||
""" | ||
all_pages = kwargs.pop('all_pages', False) | ||
|
||
if json_obj is None: | ||
json_obj = self.root | ||
# Drill down into a dict using dot notation | ||
_json_obj = json_obj | ||
if isinstance(query, str): | ||
query_list = query.split('.') | ||
else: | ||
query_list = query | ||
for query_element in query_list: | ||
if query_element in _json_obj: | ||
_json_obj = _json_obj[query_element] | ||
else: | ||
raise KeyError('{} does not exist in json object'.format(query_element, _json_obj)) | ||
if not isinstance(_json_obj, str): | ||
raise ValueError('The result of the query_string must be a string to use as a url') | ||
url = _json_obj | ||
# replace the template in the url with the value provided | ||
if url_template_values: | ||
for k, v in url_template_values.items(): | ||
url = re.sub('{(' + k + ')(:.*)?}', v, url) | ||
if join_url: | ||
url += '/' + join_url | ||
# Now query the url | ||
json_response = self._req(method, url, **kwargs).json() | ||
|
||
# Depaginate the call if requested | ||
if all_pages is True: | ||
# This depagination code will iterate over all the pages available until the pages comes back without a | ||
# next page. It stores the embedded elements in the initial query's json response | ||
content = json_response | ||
while 'next' in content.get('_links'): | ||
content = self._req(method, content.get('_links').get('next').get('href'), **kwargs).json() | ||
for key in content.get('_embedded'): | ||
json_response['_embedded'][key].extend(content.get('_embedded').get(key)) | ||
# Remove the pagination information as it is not relevant to the depaginated response | ||
if 'page' in json_response: json_response.pop('page') | ||
if 'first' in json_response['_links']: json_response['_links'].pop('first') | ||
if 'last' in json_response['_links']: json_response['_links'].pop('last') | ||
if 'next' in json_response['_links']: json_response['_links'].pop('next') | ||
return json_response | ||
|
||
def follows_link(self, key, json_obj=None, method='GET', url_template_values=None, join_url=None, **kwargs): | ||
""" | ||
Same function as follows but construct the query_string from a single keyword surrounded by '_links' and 'href'. | ||
""" | ||
return self.follows(('_links', key, 'href'), | ||
json_obj=json_obj, method=method, url_template_values=url_template_values, | ||
join_url=join_url, **kwargs) | ||
|
||
@cached_property | ||
def root(self): | ||
return self._req('GET', self.bsd_url).json() | ||
|
||
@property | ||
def communicator_attributes(self): | ||
raise NotImplementedError | ||
|
||
|
||
class AAPHALCommunicator(HALCommunicator): | ||
"""Class to navigate BioSamples API using AAP authentication.""" | ||
|
||
def __init__(self, auth_url, bsd_url, username, password, domain=None): | ||
super(AAPHALCommunicator, self).__init__(auth_url, bsd_url, username, password) | ||
self.domain = domain | ||
|
||
@property | ||
def communicator_attributes(self): | ||
return {'domain': self.domain} | ||
|
||
|
||
class WebinHALCommunicator(HALCommunicator): | ||
"""Class to navigate BioSamples API using Webin authentication.""" | ||
|
||
@cached_property | ||
def token(self): | ||
"""Retrieve the token from the ENA Webin REST API then cache it for further querying""" | ||
response = requests.post(self.auth_url, | ||
json={"authRealms": ["ENA"], "password": self.password, | ||
"username": self.username}) | ||
self._validate_response(response) | ||
return response.text | ||
|
||
@property | ||
def communicator_attributes(self): | ||
return {'webinSubmissionAccountId': self.username} | ||
|
||
|
||
class NoAuthHALCommunicator(HALCommunicator): | ||
"""Class to navigate BioSamples API without authentication.""" | ||
|
||
def __init__(self, bsd_url): | ||
super(NoAuthHALCommunicator, self).__init__(None, bsd_url, None, None) | ||
|
||
@cached_property | ||
def token(self): | ||
"""No auth token, so errors will be raised if auth is required for requests""" | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from copy import deepcopy | ||
from unittest import TestCase | ||
from unittest.mock import Mock, patch, PropertyMock | ||
|
||
from ebi_eva_common_pyutils.biosamples_communicators import HALCommunicator, WebinHALCommunicator | ||
|
||
|
||
class TestHALCommunicator(TestCase): | ||
|
||
@staticmethod | ||
def patch_token(token='token'): | ||
"""Creates a patch for BSDCommunicator token attribute. it returns the token provided""" | ||
return patch.object(HALCommunicator, 'token', return_value=PropertyMock(return_value=token)) | ||
|
||
def setUp(self) -> None: | ||
self.comm = HALCommunicator('http://aap.example.org', 'http://BSD.example.org', 'user', 'pass') | ||
|
||
def test_token(self): | ||
with patch('requests.get', return_value=Mock(text='token', status_code=200)) as mocked_get: | ||
self.assertEqual(self.comm.token, 'token') | ||
mocked_get.assert_called_once_with('http://aap.example.org', auth=('user', 'pass')) | ||
|
||
def test_req(self): | ||
with patch('requests.request', return_value=Mock(status_code=200)) as mocked_request, \ | ||
patch.object(HALCommunicator, 'token', new_callable=PropertyMock(return_value='token')): | ||
self.comm._req('GET', 'http://BSD.example.org') | ||
mocked_request.assert_called_once_with( | ||
method='GET', url='http://BSD.example.org', | ||
headers={'Accept': 'application/hal+json', 'Authorization': 'Bearer token'} | ||
) | ||
|
||
with patch.object(HALCommunicator, 'token', new_callable=PropertyMock(return_value='token')), \ | ||
patch('requests.request') as mocked_request: | ||
mocked_request.return_value = Mock(status_code=500, request=PropertyMock(url='text')) | ||
self.assertRaises(ValueError, self.comm._req, 'GET', 'http://BSD.example.org') | ||
|
||
def test_root(self): | ||
expected_json = {'json': 'values'} | ||
with patch.object(HALCommunicator, '_req') as mocked_req: | ||
mocked_req.return_value = Mock(json=Mock(return_value={'json': 'values'})) | ||
self.assertEqual(self.comm.root, expected_json) | ||
mocked_req.assert_called_once_with('GET', 'http://BSD.example.org') | ||
|
||
def test_follows(self): | ||
json_response = {'json': 'values'} | ||
# Patches the _req function that returns the Response object with a json function | ||
patch_req = patch.object(HALCommunicator, '_req', return_value=Mock(json=Mock(return_value=json_response))) | ||
|
||
# test follow url | ||
with patch_req as mocked_req: | ||
self.assertEqual(self.comm.follows('test', {'test': 'url'}), json_response) | ||
mocked_req.assert_any_call('GET', 'url') | ||
|
||
# test follow url with a template | ||
with patch_req as mocked_req: | ||
self.assertEqual(self.comm.follows('test', {'test': 'url/{id:*.}'}, url_template_values={'id': '1'}), | ||
json_response) | ||
mocked_req.assert_any_call('GET', 'url/1') | ||
|
||
# test follow url deep in the json_obj | ||
with patch_req as mocked_req: | ||
self.assertEqual(self.comm.follows('test1.test2', {'test1': {'test2': 'url'}}), json_response) | ||
mocked_req.assert_any_call('GET', 'url') | ||
|
||
# test follow url wih specific verb and payload | ||
with patch_req as mocked_req: | ||
self.assertEqual( | ||
self.comm.follows('test', {'test': 'url'}, method='POST', json={'data': 'value'}), | ||
json_response | ||
) | ||
mocked_req.assert_any_call('POST', 'url', json={'data': 'value'}) | ||
|
||
# test follow with depagination | ||
json_entries_with_next = { | ||
'_embedded': {'samples': [json_response, json_response]}, | ||
'_links': {'next': {'href': 'url'}, 'first': {}, 'last': {}}, | ||
'page': {} | ||
} | ||
json_entries_without_next = { | ||
'_embedded': {'samples': [json_response]}, | ||
'_links': {}, | ||
'page': {} | ||
} | ||
patch_req_with_pages = patch.object(HALCommunicator, '_req', side_effect=[ | ||
Mock(json=Mock(return_value=deepcopy(json_entries_with_next))), | ||
Mock(json=Mock(return_value=deepcopy(json_entries_with_next))), | ||
Mock(json=Mock(return_value=deepcopy(json_entries_with_next))), | ||
Mock(json=Mock(return_value=deepcopy(json_entries_without_next))), | ||
]) | ||
# Without all_pages=True only returns the first page | ||
with patch_req_with_pages as mocked_req: | ||
observed_json = self.comm.follows('test', {'test': 'url'}) | ||
self.assertEqual(observed_json, json_entries_with_next) | ||
self.assertEqual(len(observed_json['_embedded']['samples']), 2) | ||
mocked_req.assert_any_call('GET', 'url') | ||
|
||
# With all_pages=True returns the first page that contains all the embedded elements | ||
with patch_req_with_pages as mocked_req: | ||
observed_json = self.comm.follows('test', {'test': 'url'}, all_pages=True) | ||
self.assertEqual(len(observed_json['_embedded']['samples']), 7) | ||
self.assertEqual(mocked_req.call_count, 4) | ||
|
||
def test_follows_link(self): | ||
json_response = {'json': 'values'} | ||
# Patches the _req function that returns the Response object with a json function | ||
patch_req = patch.object(HALCommunicator, '_req', return_value=Mock(json=Mock(return_value=json_response))) | ||
|
||
# test basic follow | ||
with patch_req as mocked_req: | ||
self.assertEqual(self.comm.follows_link('test', {'_links': {'test': {'href': 'url'}}}), json_response) | ||
mocked_req.assert_any_call('GET', 'url') | ||
|
||
|
||
class TestWebinHALCommunicator(TestCase): | ||
|
||
def setUp(self) -> None: | ||
self.comm = WebinHALCommunicator('http://webin.example.org', 'http://BSD.example.org', 'user', 'pass') | ||
|
||
def test_communicator_attributes(self): | ||
assert self.comm.communicator_attributes == {'webinSubmissionAccountId': 'user'} | ||
|
||
def test_token(self): | ||
with patch('requests.post', return_value=Mock(text='token', status_code=200)) as mocked_post: | ||
self.assertEqual(self.comm.token, 'token') | ||
print(mocked_post.mock_calls) | ||
mocked_post.assert_called_once_with('http://webin.example.org', | ||
json={'authRealms': ['ENA'], 'password': 'pass', 'username': 'user'}) |