diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index be404c93..4d1b6d50 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -3,10 +3,12 @@ groups: - annotation: ckanext-xloader settings options: - key: ckanext.xloader.site_url - default: + example: http://ckan-dev:5000 + default: http://ckan-dev:5000 description: | Provide an alternate site URL for the xloader_submit action. This is useful, for example, when the site is running within a docker network. + validators: configured_default("ckan.site_url",None) required: false - key: ckanext.xloader.jobs_db.uri default: sqlite:////tmp/xloader_jobs.db diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 3ac8ebba..3d77e16c 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -12,11 +12,13 @@ import sys from psycopg2 import errors -from six.moves.urllib.parse import urlsplit +from six.moves.urllib.parse import urlsplit, urlparse, urlunparse import requests from rq import get_current_job import sqlalchemy as sa +from urllib.parse import urljoin, urlunsplit + from ckan import model from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config @@ -79,9 +81,15 @@ def xloader_data_into_datastore(input): # First flag that this task is running, to indicate the job is not # stillborn, for when xloader_submit is deciding whether another job would # be a duplicate or not + + callback_url = config.get('ckanext.xloader.site_url') or config.get('ckan.site_url') + callback_url = urljoin( + callback_url.rstrip('/'), '/api/3/action/xloader_hook') + result_url = callback_url + job_dict = dict(metadata=input['metadata'], status='running') - callback_xloader_hook(result_url=input['result_url'], + callback_xloader_hook(result_url=result_url, api_key=input['api_key'], job_dict=job_dict) @@ -143,7 +151,7 @@ def xloader_data_into_datastore(input): errored = True finally: # job_dict is defined in xloader_hook's docstring - is_saved_ok = callback_xloader_hook(result_url=input['result_url'], + is_saved_ok = callback_xloader_hook(result_url=result_url, api_key=input['api_key'], job_dict=job_dict) errored = errored or not is_saved_ok @@ -285,6 +293,14 @@ def _download_resource_data(resource, data, api_key, logger): 'Only http, https, and ftp resources may be fetched.' ) + resource_uri = urlunsplit(('', '', url_parts.path, url_parts.query, url_parts.fragment)) + callback_url = config.get('ckanext.xloader.site_url') or config.get('ckan.site_url') + callback_url = urljoin( + callback_url.rstrip('/'), resource_uri) + + url = callback_url + url_parts = urlsplit(url) # reparse the url after the callback_url is set + # fetch the resource data logger.info('Fetching from: {0}'.format(url)) tmp_file = get_tmp_file(url)