diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 4d6153f3..3b44198d 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -32,6 +32,7 @@ on:
jobs:
lint:
+ if: github.repository == 'ckan/ckanext-xloader'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
@@ -47,13 +48,22 @@ jobs:
needs: lint
strategy:
matrix:
- ckan-version: ["2.11", "2.10", 2.9]
+ include: #ckan-image see https://github.com/ckan/ckan-docker-base, ckan-version controls other image tags
+ - ckan-version: "2.11"
+ ckan-image: "2.11-py3.10"
+ - ckan-version: "2.10"
+ ckan-image: "2.10-py3.10"
+ - ckan-version: "2.9"
+ ckan-image: "2.9-py3.9"
+ #- ckan-version: "master" Publish does not care about master
+ # ckan-image: "master"
fail-fast: false
name: CKAN ${{ matrix.ckan-version }}
runs-on: ubuntu-latest
container:
- image: ckan/ckan-dev:${{ matrix.ckan-version }}
+ image: ckan/ckan-dev:${{ matrix.ckan-image }}
+ options: --user root
services:
solr:
image: ckan/ckan-solr:${{ matrix.ckan-version }}-solr9
@@ -93,6 +103,12 @@ jobs:
- name: Run tests
run: pytest --ckan-ini=test.ini --cov=ckanext.xloader --disable-warnings ckanext/xloader/tests
+ publishSkipped:
+ if: github.repository != 'ckan/ckanext-xloader'
+ steps:
+ - run: |
+ echo "Skipping PyPI publish on downstream repository"
+
publish:
needs: test
permissions:
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index a65aed65..53ec0c5f 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -23,13 +23,26 @@ jobs:
needs: lint
strategy:
matrix:
- ckan-version: ["2.11", "2.10", 2.9]
+ include: #ckan-image see https://github.com/ckan/ckan-docker-base, ckan-version controls other image tags
+ - ckan-version: "2.11"
+ ckan-image: "2.11-py3.10"
+ experimental: false
+ - ckan-version: "2.10"
+ ckan-image: "2.10-py3.10"
+ experimental: false
+ - ckan-version: "2.9"
+ ckan-image: "2.9-py3.9"
+ experimental: false
+ - ckan-version: "master"
+ ckan-image: "master"
+ experimental: true # master is unstable, good to know if we are compatible or not
fail-fast: false
name: CKAN ${{ matrix.ckan-version }}
runs-on: ubuntu-latest
container:
- image: ckan/ckan-dev:${{ matrix.ckan-version }}
+ image: ckan/ckan-dev:${{ matrix.ckan-image }}
+ options: --user root
services:
solr:
image: ckan/ckan-solr:${{ matrix.ckan-version }}-solr9
@@ -53,9 +66,15 @@ jobs:
steps:
- uses: actions/checkout@v4
- - if: ${{ matrix.ckan-version == 2.9 }}
+ continue-on-error: ${{ matrix.experimental }}
+
+ - name: Pin setuptools for ckan 2.9 only
+ if: ${{ matrix.ckan-version == 2.9 }}
run: pip install "setuptools>=44.1.0,<71"
+ continue-on-error: ${{ matrix.experimental }}
+
- name: Install requirements
+ continue-on-error: ${{ matrix.experimental }}
run: |
pip install -r requirements.txt
pip install -r dev-requirements.txt
@@ -63,8 +82,19 @@ jobs:
pip install -U requests[security]
# Replace default path to CKAN core config file with the one on the container
sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini
- - name: Setup extension (CKAN >= 2.9)
+
+ - name: Setup extension
+ continue-on-error: ${{ matrix.experimental }}
run: |
ckan -c test.ini db init
+
- name: Run tests
- run: pytest --ckan-ini=test.ini --cov=ckanext.xloader --disable-warnings ckanext/xloader/tests
+ continue-on-error: ${{ matrix.experimental }}
+ run: pytest --ckan-ini=test.ini --cov=ckanext.xloader --disable-warnings ckanext/xloader/tests --junit-xml=/tmp/artifacts/junit/results.xml
+
+ - name: Test Summary
+ uses: test-summary/action@v2
+ continue-on-error: ${{ matrix.experimental }}
+ with:
+ paths: "/tmp/artifacts/junit/*.xml"
+ if: always()
\ No newline at end of file
diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml
index 6d85e896..9487999d 100644
--- a/ckanext/xloader/config_declaration.yaml
+++ b/ckanext/xloader/config_declaration.yaml
@@ -128,6 +128,23 @@ groups:
to True.
type: bool
required: false
+ - key: ckanext.xloader.validation.requires_successful_report
+ default: False
+ example: True
+ description: |
+ Resources are required to pass Validation from the ckanext-validation
+ plugin to be able to get XLoadered.
+ type: bool
+ required: false
+ - key: ckanext.xloader.validation.enforce_schema
+ default: True
+ example: False
+ description: |
+ Resources are expected to have a Validation Schema, or use the default ones if not.
+ If this option is set to `False`, Resources that do not have
+ a Validation Schema will be treated like they do not require Validation.
+ See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema
+ for more details.
- key: ckanext.xloader.clean_datastore_tables
default: False
example: True
diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py
index 3ac8ebba..85c51936 100644
--- a/ckanext/xloader/jobs.py
+++ b/ckanext/xloader/jobs.py
@@ -124,6 +124,7 @@ def xloader_data_into_datastore(input):
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
+ logger.info("Job failed due to temporary error [%s], retrying", e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries
enqueue_job(
@@ -245,7 +246,12 @@ def tabulator_load():
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
- tabulator_load()
+ try:
+ tabulator_load()
+ except JobError as e:
+ logger.warning('Load using tabulator failed: %s', e)
+ logger.info('Trying again with direct COPY')
+ direct_load()
else:
try:
direct_load()
diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py
index 8c913e0a..46814181 100644
--- a/ckanext/xloader/loader.py
+++ b/ckanext/xloader/loader.py
@@ -118,8 +118,8 @@ def _clear_datastore_resource(resource_id):
'''
engine = get_write_engine()
with engine.begin() as conn:
- conn.execute("SET LOCAL lock_timeout = '5s'")
- conn.execute('TRUNCATE TABLE "{}"'.format(resource_id))
+ conn.execute("SET LOCAL lock_timeout = '15s'")
+ conn.execute('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id))
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
@@ -339,6 +339,18 @@ def create_column_indexes(fields, resource_id, logger):
logger.info('...column indexes created.')
+def _save_type_overrides(headers_dicts):
+ # copy 'type' to 'type_override' if it's not the default type (text)
+ # and there isn't already an override in place
+ for h in headers_dicts:
+ if h['type'] != 'text':
+ if 'info' in h:
+ if 'type_override' not in h['info']:
+ h['info']['type_override'] = h['type']
+ else:
+ h['info'] = {'type_override': h['type']}
+
+
def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads an Excel file (or other tabular data recognized by tabulator)
into Datastore and creates indexes.
@@ -399,7 +411,14 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
}.get(existing_info.get(h, {}).get('type_override'), t)
for t, h in zip(types, headers)]
- headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()]
+ # Strip leading and trailing whitespace, then truncate to maximum length,
+ # then strip again in case the truncation exposed a space.
+ headers = [
+ header.strip()[:MAX_COLUMN_LENGTH].strip()
+ for header in headers
+ if header and header.strip()
+ ]
+ header_count = len(headers)
type_converter = TypeConverter(types=types)
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
@@ -409,6 +428,17 @@ def row_iterator():
for row in stream:
data_row = {}
for index, cell in enumerate(row):
+ # Handle files that have extra blank cells in heading and body
+ # eg from Microsoft Excel adding lots of empty cells on export.
+ # Blank header cells won't generate a column,
+ # so row length won't match column count.
+ if index >= header_count:
+ # error if there's actual data out of bounds, otherwise ignore
+ if cell:
+ raise LoaderError("Found data in column %s but resource only has %s header(s)",
+ index + 1, header_count)
+ else:
+ continue
data_row[headers[index]] = cell
yield data_row
result = row_iterator()
@@ -426,6 +456,9 @@ def row_iterator():
if type_override in list(_TYPE_MAPPING.values()):
h['type'] = type_override
+ # preserve any types that we have sniffed unless told otherwise
+ _save_type_overrides(headers_dicts)
+
logger.info('Determined headers and types: %s', headers_dicts)
'''
diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py
index 07af8db7..d916cc54 100644
--- a/ckanext/xloader/plugin.py
+++ b/ckanext/xloader/plugin.py
@@ -7,11 +7,16 @@
from ckan.model.domain_object import DomainObjectOperation
from ckan.model.resource import Resource
-from ckan.model.package import Package
from . import action, auth, helpers as xloader_helpers, utils
from ckanext.xloader.utils import XLoaderFormats
+try:
+ from ckanext.validation.interfaces import IPipeValidation
+ HAS_IPIPE_VALIDATION = True
+except ImportError:
+ HAS_IPIPE_VALIDATION = False
+
try:
config_declarations = toolkit.blanket.config_declarations
except AttributeError:
@@ -34,6 +39,8 @@ class xloaderPlugin(plugins.SingletonPlugin):
plugins.implements(plugins.IResourceController, inherit=True)
plugins.implements(plugins.IClick)
plugins.implements(plugins.IBlueprint)
+ if HAS_IPIPE_VALIDATION:
+ plugins.implements(IPipeValidation)
# IClick
def get_commands(self):
@@ -69,6 +76,21 @@ def configure(self, config_):
)
)
+ # IPipeValidation
+
+ def receive_validation_report(self, validation_report):
+ if utils.requires_successful_validation_report():
+ res_dict = toolkit.get_action('resource_show')({'ignore_auth': True},
+ {'id': validation_report.get('resource_id')})
+ if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True))
+ or res_dict.get('schema', None)) and validation_report.get('status') != 'success':
+ # A schema is present, or required to be present
+ return
+ # if validation is running in async mode, it is running from the redis workers.
+ # thus we need to do sync=True to have Xloader put the job at the front of the queue.
+ sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True))
+ self._submit_to_xloader(res_dict, sync=sync)
+
# IDomainObjectModification
def notify(self, entity, operation):
@@ -96,7 +118,16 @@ def notify(self, entity, operation):
if _should_remove_unsupported_resource_from_datastore(resource_dict):
toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id])
- if not getattr(entity, 'url_changed', False):
+ if utils.requires_successful_validation_report():
+ # If the resource requires validation, stop here if validation
+ # has not been performed or did not succeed. The Validation
+ # extension will call resource_patch and this method should
+ # be called again. However, url_changed will not be in the entity
+ # once Validation does the patch.
+ log.debug("Deferring xloading resource %s because the "
+ "resource did not pass validation yet.", resource_dict.get('id'))
+ return
+ elif not getattr(entity, 'url_changed', False):
# do not submit to xloader if the url has not changed.
return
@@ -105,6 +136,11 @@ def notify(self, entity, operation):
# IResourceController
def after_resource_create(self, context, resource_dict):
+ if utils.requires_successful_validation_report():
+ log.debug("Deferring xloading resource %s because the "
+ "resource did not pass validation yet.", resource_dict.get('id'))
+ return
+
self._submit_to_xloader(resource_dict)
def before_resource_show(self, resource_dict):
diff --git a/ckanext/xloader/templates/package/resource_read.html b/ckanext/xloader/templates/package/resource_read.html
index c99dcec2..8ce87bc1 100644
--- a/ckanext/xloader/templates/package/resource_read.html
+++ b/ckanext/xloader/templates/package/resource_read.html
@@ -4,13 +4,13 @@
{% block resource_read_url %}
{% set badge = h.xloader_badge(res) %}
{% if badge %}
- {{ badge }}
+ {{ badge }}
{% asset 'ckanext-xloader/main-css' %}
{% endif %}
{{ super() }}
{% endblock %}
-{% block action_manage_inner %}
+{% block action_manage %}
{{ super() }}
{% if h.is_resource_supported_by_xloader(res) %}