Skip to content

Commit

Permalink
WIP: basic example of a harvester which connects to a PostgreSQL data…
Browse files Browse the repository at this point in the history
…base
  • Loading branch information
Leigh Gordon committed Jan 22, 2021
1 parent dd01cac commit d6af463
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
78 changes: 78 additions & 0 deletions aodncore/pipeline/steps/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
"""

import abc
import contextlib
import itertools
import os
import re
from collections import OrderedDict
from tempfile import NamedTemporaryFile

import psycopg2

from .basestep import BaseStepRunner
from ..exceptions import InvalidHarvesterError, UnmappedFilesError
from ..files import PipelineFileCollection, validate_pipelinefilecollection
Expand All @@ -25,6 +28,7 @@
'create_symlink',
'executor_conversion',
'get_harvester_runner',
'CsvHarvesterRunner',
'HarvesterMap',
'TalendHarvesterRunner',
'TriggerEvent',
Expand All @@ -48,6 +52,8 @@ def get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_

if harvester_name == 'talend':
return TalendHarvesterRunner(store_runner, harvest_params, tmp_base_dir, config, logger)
elif harvester_name == 'csv':
return CsvHarvesterRunner(store_runner, harvest_params, config, logger)
else:
raise InvalidHarvesterError("invalid harvester '{name}'".format(name=harvester_name))

Expand Down Expand Up @@ -192,6 +198,78 @@ def run(self, pipeline_files):
pass


class AodnHarvesterRunner(BaseHarvesterRunner):
""":py:class:`BaseHarvesterRunner` implementation to execute Talend harvesters
"""

def __init__(self, storage_broker, harvest_params, config, logger):
super().__init__(config, logger)
if harvest_params is None:
harvest_params = {}

self._storage_broker = storage_broker
self._harvest_params = harvest_params
self._slice_size = harvest_params.get('slice_size', 2048)

@contextlib.contextmanager
def _pgcursor(self):
conn = psycopg2.connect('') # DB connection details retrieved from the runtime environment
try:
with conn.cursor() as cur:
yield cur
finally:
conn.close()

@abc.abstractmethod
def _harvest_files(self, pipeline_files):
pass

@abc.abstractmethod
def _unharvest_files(self, pipeline_files):
pass

def run(self, pipeline_files):
validate_pipelinefilecollection(pipeline_files)

deletions = pipeline_files.filter_by_bool_attribute('pending_harvest_early_deletion')
additions = pipeline_files.filter_by_bool_attribute('pending_harvest_addition')
late_deletions = pipeline_files.filter_by_bool_attribute('pending_harvest_late_deletion')

self._logger.sysinfo("harvesting slice size: {slice_size}".format(slice_size=self._slice_size))
deletion_slices = deletions.get_slices(self._slice_size)
addition_slices = additions.get_slices(self._slice_size)
late_deletions_slices = late_deletions.get_slices(self._slice_size)

for file_slice in deletion_slices:
self._unharvest_files(file_slice)

for file_slice in addition_slices:
self._harvest_files(file_slice)

for file_slice in late_deletions_slices:
self._unharvest_files(file_slice)


class CsvHarvesterRunner(AodnHarvesterRunner):
def _harvest_files(self, pipeline_files):
# TODO: replace with SQL that actually harvests the files
with self._pgcursor() as cursor:
cursor.execute('SELECT 1')
print("Result:" + str(cursor.fetchone()))

pipeline_files.set_bool_attribute('is_harvested', True)
self._storage_broker.upload(pipeline_files)

def _unharvest_files(self, pipeline_files):
# TODO: replace with SQL that actually unharvests the files
with self._pgcursor() as cursor:
cursor.execute('SELECT 2')
print("Result:" + str(cursor.fetchone()))

pipeline_files.set_bool_attribute('is_harvested', True)
self._storage_broker.delete(pipeline_files)


class TalendHarvesterRunner(BaseHarvesterRunner):
""":py:class:`BaseHarvesterRunner` implementation to execute Talend harvesters
"""
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
'compliance-checker==4.1.1',
'jsonschema>=2.6.0',
'paramiko>=2.6.0',
'psycopg2>=2.8.6',
'python-magic>=0.4.15',
'tableschema>=1.19.4',
'transitions>=0.7.1'
Expand Down
36 changes: 33 additions & 3 deletions test_aodncore/pipeline/steps/test_harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from aodncore.pipeline import PipelineFile, PipelineFileCollection, PipelineFilePublishType
from aodncore.pipeline.exceptions import InvalidHarvesterError, UnmappedFilesError
from aodncore.pipeline.steps.harvest import (get_harvester_runner, HarvesterMap, TalendHarvesterRunner, TriggerEvent,
validate_harvester_mapping)
validate_harvester_mapping, CsvHarvesterRunner)
from aodncore.pipeline.steps.store import StoreRunner
from aodncore.testlib import BaseTestCase, NullStorageBroker
from test_aodncore import TESTDATA_DIR
Expand Down Expand Up @@ -45,8 +45,11 @@ def setUp(self):
self.uploader = StoreRunner(NullStorageBroker("/"), None, None)

def test_get_harvester_runner(self):
harvester_runner = get_harvester_runner('talend', self.uploader, None, TESTDATA_DIR, None, self.test_logger)
self.assertIsInstance(harvester_runner, TalendHarvesterRunner)
talend_runner = get_harvester_runner('talend', self.uploader, None, TESTDATA_DIR, None, self.test_logger)
self.assertIsInstance(talend_runner, TalendHarvesterRunner)

csv_runner = get_harvester_runner('csv', self.uploader, None, TESTDATA_DIR, None, self.test_logger)
self.assertIsInstance(csv_runner, CsvHarvesterRunner)

def test_get_harvester_runner_invalid(self):
with self.assertRaises(InvalidHarvesterError):
Expand All @@ -63,6 +66,33 @@ def test_validate_harvester_mapping(self):
validate_harvester_mapping(collection, matched_file_map)


class TestCsvHarvesterRunner(BaseTestCase):
def setUp(self):
super().setUp()

self.uploader = NullStorageBroker("/")

# assumes that username and password are supplied externally in one of the ways supported by libpq, such as
# a ~/.pgpass file or environment variables

os.environ['PGHOST'] = 'PGHOST'
os.environ['PGDATABASE'] = 'PGDATABASE'
os.environ['PGSSLMODE'] = 'require'

self.harvester = CsvHarvesterRunner(self.uploader, None, self.config, self.test_logger)

def test_harvester(self):
collection = PipelineFileCollection([
PipelineFile(self.temp_nc_file, publish_type=PipelineFilePublishType.HARVEST_ONLY),
PipelineFile(GOOD_NC, publish_type=PipelineFilePublishType.UNHARVEST_ONLY, is_deletion=True),
PipelineFile(BAD_NC, publish_type=PipelineFilePublishType.UNHARVEST_ONLY, is_deletion=True, late_deletion=True)
])
self.harvester.run(collection)

self.assertTrue(all(f.is_harvested for f in collection))
self.assertTrue(any(f.is_stored for f in collection))


class TestTalendHarvesterRunner(BaseTestCase):
def setUp(self):
super().setUp()
Expand Down

0 comments on commit d6af463

Please sign in to comment.