From f8406bf466e4d2fd60913126b29d1617491cc1e1 Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Wed, 10 Feb 2021 10:22:23 +0100 Subject: [PATCH 1/7] fixed objectstorage_reader by removing useless chekings and added njson handling for objectstorage_reader --- nck/readers/gcs_reader.py | 10 ++-- nck/readers/objectstorage_reader.py | 81 ++--------------------------- nck/readers/s3_reader.py | 6 +-- nck/utils/file_reader.py | 51 +++++++++++++----- 4 files changed, 46 insertions(+), 102 deletions(-) diff --git a/nck/readers/gcs_reader.py b/nck/readers/gcs_reader.py index be64859e..6a948507 100644 --- a/nck/readers/gcs_reader.py +++ b/nck/readers/gcs_reader.py @@ -28,7 +28,7 @@ @click.command(name="read_gcs") @click.option("--gcs-bucket", required=True) @click.option("--gcs-prefix", required=True, multiple=True) -@click.option("--gcs-format", required=True, type=click.Choice(["csv", "gz"])) +@click.option("--gcs-format", required=True, type=click.Choice(["csv", "gz", "njson"])) @click.option("--gcs-dest-key-split", default=-1, type=int) @click.option("--gcs-csv-delimiter", default=",") @click.option("--gcs-csv-fieldnames", default=None) @@ -39,14 +39,10 @@ def gcs(**kwargs): class GCSReader(ObjectStorageReader, GoogleBaseClass): def __init__(self, bucket, prefix, format, dest_key_split=-1, **kwargs): - super().__init__( - bucket, prefix, format, dest_key_split, platform="GCS", **kwargs - ) + super().__init__(bucket, prefix, format, dest_key_split, platform="GCS", **kwargs) def create_client(self, config): - return storage.Client( - credentials=self._get_credentials(), project=config.project_id - ) + return storage.Client(credentials=self._get_credentials(), project=config.project_id) def create_bucket(self, client, bucket): return client.bucket(bucket) diff --git a/nck/readers/objectstorage_reader.py b/nck/readers/objectstorage_reader.py index c5e20b03..b0ad0f18 100644 --- a/nck/readers/objectstorage_reader.py +++ b/nck/readers/objectstorage_reader.py @@ -21,33 +21,7 @@ from nck.config import logger from nck.readers.reader import Reader from nck.streams.normalized_json_stream import NormalizedJSONStream -from nck.utils.file_reader import FileEnum - - -def find_reader(_format, kwargs): - _format = _format.upper() - if _format in FileEnum.__members__: - r = getattr(FileEnum, _format).value - _reader = r(**kwargs).get_csv_reader() - else: - raise NotImplementedError(f"The file format {str(_format)} has not been implemented for reading yet.") - return _reader - - -def no_files_seen_before(max_timestamp): - return not max_timestamp - - -def _object_older_than_most_recently_ingested_file(max_timestamp, _object_timestamp): - return max_timestamp > _object_timestamp - - -def _object_newer_than_most_recently_ingested_file(max_timestamp, _object_timestamp): - return max_timestamp < _object_timestamp - - -def _object_as_old_as_most_recently_ingested_file(max_timestamp, _object_timestamp): - return max_timestamp == _object_timestamp +from nck.utils.file_reader import FormatReader class ObjectStorageReader(Reader): @@ -58,7 +32,7 @@ def __init__(self, bucket, prefix, file_format, dest_key_split, platform=None, * self._platform = platform self._format = file_format - self._reader = find_reader(self._format, kwargs) + self._reader = FormatReader().create_file_reader(self._format, **kwargs).get_reader() self._dest_key_split = dest_key_split self.MAX_TIMESTAMP_STATE_KEY = f"{self._platform}_max_timestamp".lower() @@ -69,7 +43,8 @@ def read(self): for prefix in self._prefix_list: objects_sorted_by_time = sorted( - self.list_objects(bucket=self._bucket, prefix=prefix), key=lambda o: self.get_timestamp(o), + self.list_objects(bucket=self._bucket, prefix=prefix), + key=lambda o: self.get_timestamp(o), ) for _object in objects_sorted_by_time: @@ -82,10 +57,6 @@ def read(self): logger.info(f"Wrong extension: Skipping file {self.get_key(_object)}") continue - if self.has_already_processed_object(_object): - logger.info(f"Skipping already processed file {self.get_key(_object)}") - continue - def result_generator(): temp = tempfile.TemporaryFile() self.download_object_to_file(_object, temp) @@ -93,8 +64,6 @@ def result_generator(): for record in self._reader(temp): yield record - self.checkpoint_object(_object) - name = self.get_key(_object).split("/", self._dest_key_split)[-1] yield NormalizedJSONStream(name, result_generator()) @@ -102,48 +71,6 @@ def result_generator(): def is_compatible_object(self, _object): return self.get_key(_object).endswith("." + self._format) - def has_already_processed_object(self, _object): - - assert self.get_timestamp(_object) is not None, "Object has no timestamp!" - - max_timestamp = self.state.get(self.MAX_TIMESTAMP_STATE_KEY) - - if no_files_seen_before(max_timestamp): - return False - - _object_timestamp = self.get_timestamp(_object) - - if _object_older_than_most_recently_ingested_file(max_timestamp, _object_timestamp): - return True - - if _object_newer_than_most_recently_ingested_file(max_timestamp, _object_timestamp): - return False - - if _object_as_old_as_most_recently_ingested_file(max_timestamp, _object_timestamp): - max_files = self.state.get(self.MAX_FILES_STATE_KEY) - return self.get_key(_object) in max_files - - def checkpoint_object(self, _object): - - assert self.get_timestamp(_object) is not None, "Object has no timestamp!" - - max_timestamp = self.state.get(self.MAX_TIMESTAMP_STATE_KEY) - _object_timestamp = self.get_timestamp(_object) - - if max_timestamp and _object_older_than_most_recently_ingested_file(max_timestamp, _object_timestamp): - raise RuntimeError("Object is older than max timestamp at checkpoint time") - - elif not max_timestamp or _object_newer_than_most_recently_ingested_file(max_timestamp, _object_timestamp): - self.update_max_timestamp(_object_timestamp, _object) - - else: - assert _object_as_old_as_most_recently_ingested_file(max_timestamp, _object_timestamp) - self.update_max_files(_object) - - def update_max_timestamp(self, _object_timestamp, _object): - self.state.set(self.MAX_TIMESTAMP_STATE_KEY, _object_timestamp) - self.state.set(self.MAX_FILES_STATE_KEY, [self.get_key(_object)]) - def update_max_files(self, _object): max_files = self.state.get(self.MAX_FILES_STATE_KEY) max_files.append(self.get_key(_object)) diff --git a/nck/readers/s3_reader.py b/nck/readers/s3_reader.py index 8a0c085d..a33524b8 100644 --- a/nck/readers/s3_reader.py +++ b/nck/readers/s3_reader.py @@ -26,7 +26,7 @@ @click.command(name="read_s3") @click.option("--s3-bucket", required=True) @click.option("--s3-prefix", required=True, multiple=True) -@click.option("--s3-format", required=True, type=click.Choice(["csv", "gz"])) +@click.option("--s3-format", required=True, type=click.Choice(["csv", "gz", "njson"])) @click.option("--s3-dest-key-split", default=-1, type=int) @click.option("--s3-csv-delimiter", default=",") @click.option("--s3-csv-fieldnames", default=None) @@ -37,9 +37,7 @@ def s3(**kwargs): class S3Reader(ObjectStorageReader): def __init__(self, bucket, prefix, format, dest_key_split=-1, **kwargs): - super().__init__( - bucket, prefix, format, dest_key_split, platform="S3", **kwargs - ) + super().__init__(bucket, prefix, format, dest_key_split, platform="S3", **kwargs) def create_client(self, config): boto_config = { diff --git a/nck/utils/file_reader.py b/nck/utils/file_reader.py index 0fc560ad..02e70570 100644 --- a/nck/utils/file_reader.py +++ b/nck/utils/file_reader.py @@ -15,7 +15,6 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -from enum import Enum import csv import codecs import gzip @@ -26,7 +25,7 @@ def unzip(input_file, output_path): - with zipfile.ZipFile(input_file, 'r') as zip_ref: + with zipfile.ZipFile(input_file, "r") as zip_ref: zip_ref.extractall(output_path) @@ -53,20 +52,42 @@ def format_csv_fieldnames(csv_fieldnames): elif isinstance(csv_fieldnames, (str, bytes)): _csv_fieldnames = json.loads(csv_fieldnames) else: - raise TypeError( - f"The CSV fieldnames is of the following type: {type(csv_fieldnames)}." - ) + raise TypeError(f"The CSV fieldnames is of the following type: {type(csv_fieldnames)}.") assert isinstance(_csv_fieldnames, list) return _csv_fieldnames -class CSVReader(object): +class FormatReader: + def create_file_reader(self, _format, **kwargs): + if _format == "csv": + return CSVReader(**kwargs) + if _format == "gz": + return GZReader(**kwargs) + if _format == "njson": + return NJSONReader(**kwargs) + else: + raise NotImplementedError(f"The file format {str(_format)} has not been implemented for reading yet.") + + +class FileReader: + def __init__(self, **kwargs): + self.reader = lambda fd: self.read(fd, **kwargs) + + def read(self, fd, **kwargs): + fd.seek(0) + return codecs.iterdecode(fd, encoding="utf8") + + def get_reader(self): + return self.reader + + +class CSVReader(FileReader): def __init__(self, csv_delimiter, csv_fieldnames, **kwargs): self.csv_delimiter = format_csv_delimiter(csv_delimiter) self.csv_fieldnames = format_csv_fieldnames(csv_fieldnames) if csv_fieldnames is not None else None - self.csv_reader = lambda fd: self.read_csv(fd, **kwargs) + super().__init__(**kwargs) - def read_csv(self, fd, **kwargs): + def read(self, fd, **kwargs): fd.seek(0) fd = self.decompress(fd) return csv.DictReader( @@ -79,9 +100,6 @@ def read_csv(self, fd, **kwargs): def decompress(self, fd): return fd - def get_csv_reader(self): - return self.csv_reader - class GZReader(CSVReader): def decompress(self, fd): @@ -89,6 +107,11 @@ def decompress(self, fd): return gzf -class FileEnum(Enum): - CSV = CSVReader - GZ = GZReader +class NJSONReader(FileReader): + def read(self, fd, **kwargs): + fd.seek(0) + return self.jsongene(fd, **kwargs) + + def jsongene(self, fd, **kwargs): + for line in codecs.iterdecode(fd, encoding="utf8"): + yield json.loads(line) From b6e3ae9321d8d577527acd7a0ce579af73356a8f Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Wed, 10 Feb 2021 15:53:24 +0100 Subject: [PATCH 2/7] deleted forgotten dead code --- nck/readers/objectstorage_reader.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/nck/readers/objectstorage_reader.py b/nck/readers/objectstorage_reader.py index b0ad0f18..a244b25a 100644 --- a/nck/readers/objectstorage_reader.py +++ b/nck/readers/objectstorage_reader.py @@ -71,11 +71,6 @@ def result_generator(): def is_compatible_object(self, _object): return self.get_key(_object).endswith("." + self._format) - def update_max_files(self, _object): - max_files = self.state.get(self.MAX_FILES_STATE_KEY) - max_files.append(self.get_key(_object)) - self.state.set(self.MAX_FILES_STATE_KEY, max_files) - def create_client(self, config): raise NotImplementedError From b489edc429224964f0b7ede1e83ea4d5e7779ca5 Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Thu, 25 Feb 2021 18:55:52 +0100 Subject: [PATCH 3/7] minor fix --- nck/readers/objectstorage_reader.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/nck/readers/objectstorage_reader.py b/nck/readers/objectstorage_reader.py index a244b25a..2bef7ae2 100644 --- a/nck/readers/objectstorage_reader.py +++ b/nck/readers/objectstorage_reader.py @@ -57,16 +57,15 @@ def read(self): logger.info(f"Wrong extension: Skipping file {self.get_key(_object)}") continue - def result_generator(): - temp = tempfile.TemporaryFile() - self.download_object_to_file(_object, temp) - - for record in self._reader(temp): - yield record - name = self.get_key(_object).split("/", self._dest_key_split)[-1] - yield NormalizedJSONStream(name, result_generator()) + yield NormalizedJSONStream(name, self._result_generator(_object)) + + def _result_generator(self, _object): + with tempfile.TemporaryFile() as temp: + self.download_object_to_file(_object, temp) + for record in self._reader(temp): + yield record def is_compatible_object(self, _object): return self.get_key(_object).endswith("." + self._format) From 18699a1320b8e26ac3c3b05b825ab9be37b0d58c Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Thu, 25 Feb 2021 18:56:40 +0100 Subject: [PATCH 4/7] test for storage reader --- tests/readers/test_objectstorage_reader.py | 111 +++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 tests/readers/test_objectstorage_reader.py diff --git a/tests/readers/test_objectstorage_reader.py b/tests/readers/test_objectstorage_reader.py new file mode 100644 index 00000000..0d776afa --- /dev/null +++ b/tests/readers/test_objectstorage_reader.py @@ -0,0 +1,111 @@ +import io +import csv +import json + +from nck.readers.objectstorage_reader import ObjectStorageReader +from unittest import TestCase, mock + + +mock_csv_names = ["a.csv", "a.njson", "b.csv", "b.njson"] +mock_csv_files = [ + [["a", "b", "c"], [4, 5, 6], [7, 8, 9]], + [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}], + [["a", "b", "c"], [4, 5, 6], [7, 8, 9]], + [["a", "b", "c"], [4, 5, 6], [7, 8, 9]], +] + +mock_timestamp = [ + 1614179262, + 1614179272, + 1614179277, + 16141792778, +] + + +def mock_to_object(self, _object): + return _object + + +def mock_list_objects(self, bucket, prefix): + a = list(zip(mock_csv_names, mock_timestamp, mock_csv_files)) + return [x for x in a if x[0].startswith(prefix)] + + +def mock_get_timestamp(self, _object, **kwargs): + return _object[1] + + +def write_to_file(self, _object, f, **kwargs): + + if self._format == "csv": + + text_file = io.TextIOWrapper(f, encoding="utf-8", newline="") + w = csv.writer(text_file) + w.writerows(_object[2]) + text_file.detach() + + else: + + text_file = io.TextIOWrapper(f, encoding="utf-8") + for line in _object[2]: + + json.dump(line, text_file) + text_file.write("\n") + text_file.detach() + + +def mock_get_key(self, _object, **kwargs): + return _object[0] + + +@mock.patch("nck.readers.objectstorage_reader.ObjectStorageReader.create_client") +@mock.patch("nck.readers.objectstorage_reader.ObjectStorageReader.create_bucket") +@mock.patch.object(ObjectStorageReader, "download_object_to_file", write_to_file) +@mock.patch.object(ObjectStorageReader, "to_object", mock_to_object) +@mock.patch.object(ObjectStorageReader, "get_timestamp", mock_get_timestamp) +@mock.patch.object(ObjectStorageReader, "list_objects", mock_list_objects) +@mock.patch.object(ObjectStorageReader, "get_key", mock_get_key) +class ObjectStorageReaderTest(TestCase): + def test_wrong_format(self, a, b): + with self.assertRaises(NotImplementedError): + reader = ObjectStorageReader( + bucket="", prefix=["a"], file_format="txt", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None + ) + reader + + def test_ObjectStorageReader_filter_files(self, a, b): + reader = ObjectStorageReader( + bucket="", prefix=["a"], file_format="csv", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None + ) + nb_file = len(list(reader.read())) + """check if filter csv with prefix ["a"]""" + self.assertEqual(nb_file, 1) + + reader = ObjectStorageReader( + bucket="", prefix=["b"], file_format="njson", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None + ) + nb_file = len(list(reader.read())) + """check if filter njson with prefix ["b"]""" + self.assertEqual(nb_file, 1) + + def test_ObjectStorageReader_read_all_file_CSV(self, a, b): + reader = ObjectStorageReader( + bucket="", prefix=["a"], file_format="csv", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None + ) + + expected = [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}] + + for file in reader.read(): + for expect, data in zip(expected, file.readlines()): + self.assertEqual(expect, data) + + def test_ObjectStorageReader_read_all_file_NJSON(self, a, b): + reader = ObjectStorageReader( + bucket="", prefix=["a"], file_format="njson", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None + ) + + expected = [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}] + + for file in reader.read(): + for expect, data in zip(expected, file.readlines()): + self.assertEqual(expect, data) From 3347b10e58f9cb6296cd335d8951687b103583d6 Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Fri, 26 Feb 2021 16:55:34 +0100 Subject: [PATCH 5/7] change utils.file_reader.create_file_reader as standalone function --- nck/utils/file_reader.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/nck/utils/file_reader.py b/nck/utils/file_reader.py index 02e70570..0eded3b2 100644 --- a/nck/utils/file_reader.py +++ b/nck/utils/file_reader.py @@ -57,16 +57,15 @@ def format_csv_fieldnames(csv_fieldnames): return _csv_fieldnames -class FormatReader: - def create_file_reader(self, _format, **kwargs): - if _format == "csv": - return CSVReader(**kwargs) - if _format == "gz": - return GZReader(**kwargs) - if _format == "njson": - return NJSONReader(**kwargs) - else: - raise NotImplementedError(f"The file format {str(_format)} has not been implemented for reading yet.") +def create_file_reader(_format, **kwargs): + if _format == "csv": + return CSVReader(**kwargs) + if _format == "gz": + return GZReader(**kwargs) + if _format == "njson": + return NJSONReader(**kwargs) + else: + raise NotImplementedError(f"The file format {str(_format)} has not been implemented for reading yet.") class FileReader: From b290feb75c2570a2b948fb58018cf2e65fae2e49 Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Fri, 26 Feb 2021 16:58:26 +0100 Subject: [PATCH 6/7] refactorize test_objectstorage_reader to improve readability --- tests/readers/test_objectstorage_reader.py | 45 ++++++++-------------- 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/tests/readers/test_objectstorage_reader.py b/tests/readers/test_objectstorage_reader.py index 0d776afa..c2d52e28 100644 --- a/tests/readers/test_objectstorage_reader.py +++ b/tests/readers/test_objectstorage_reader.py @@ -2,6 +2,7 @@ import csv import json +from parameterized import parameterized from nck.readers.objectstorage_reader import ObjectStorageReader from unittest import TestCase, mock @@ -11,7 +12,7 @@ [["a", "b", "c"], [4, 5, 6], [7, 8, 9]], [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}], [["a", "b", "c"], [4, 5, 6], [7, 8, 9]], - [["a", "b", "c"], [4, 5, 6], [7, 8, 9]], + [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}], ] mock_timestamp = [ @@ -68,44 +69,28 @@ def mock_get_key(self, _object, **kwargs): class ObjectStorageReaderTest(TestCase): def test_wrong_format(self, a, b): with self.assertRaises(NotImplementedError): - reader = ObjectStorageReader( + ObjectStorageReader( bucket="", prefix=["a"], file_format="txt", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None ) - reader - def test_ObjectStorageReader_filter_files(self, a, b): + @parameterized.expand([("njson", 2), ("csv", 2)]) + def test_ObjectStorageReader_filter_files(self, a, b, format, nb_files_expected): reader = ObjectStorageReader( - bucket="", prefix=["a"], file_format="csv", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None + bucket="", prefix=[""], file_format=format, dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None ) nb_file = len(list(reader.read())) - """check if filter csv with prefix ["a"]""" - self.assertEqual(nb_file, 1) - - reader = ObjectStorageReader( - bucket="", prefix=["b"], file_format="njson", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None - ) - nb_file = len(list(reader.read())) - """check if filter njson with prefix ["b"]""" - self.assertEqual(nb_file, 1) - - def test_ObjectStorageReader_read_all_file_CSV(self, a, b): + self.assertEqual(nb_file, nb_files_expected) + + @parameterized.expand( + [ + ("njson", [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}]), + ("csv", [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}]), + ] + ) + def test_ObjectStorageReader_read_all_file(self, a, b, format, expected): reader = ObjectStorageReader( bucket="", prefix=["a"], file_format="csv", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None ) - - expected = [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}] - - for file in reader.read(): - for expect, data in zip(expected, file.readlines()): - self.assertEqual(expect, data) - - def test_ObjectStorageReader_read_all_file_NJSON(self, a, b): - reader = ObjectStorageReader( - bucket="", prefix=["a"], file_format="njson", dest_key_split=-1, csv_delimiter=",", csv_fieldnames=None - ) - - expected = [{"a": "4", "b": "5", "c": "6"}, {"a": "7", "b": "8", "c": "9"}] - for file in reader.read(): for expect, data in zip(expected, file.readlines()): self.assertEqual(expect, data) From 69a08ee7e4e597dddaa918ff2233c01f8a54560f Mon Sep 17 00:00:00 2001 From: Louis Rousselot de Saint Ceran Date: Fri, 26 Feb 2021 17:03:10 +0100 Subject: [PATCH 7/7] minor fix --- nck/readers/objectstorage_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nck/readers/objectstorage_reader.py b/nck/readers/objectstorage_reader.py index 2bef7ae2..93ceaaca 100644 --- a/nck/readers/objectstorage_reader.py +++ b/nck/readers/objectstorage_reader.py @@ -21,7 +21,7 @@ from nck.config import logger from nck.readers.reader import Reader from nck.streams.normalized_json_stream import NormalizedJSONStream -from nck.utils.file_reader import FormatReader +from nck.utils.file_reader import create_file_reader class ObjectStorageReader(Reader): @@ -32,7 +32,7 @@ def __init__(self, bucket, prefix, file_format, dest_key_split, platform=None, * self._platform = platform self._format = file_format - self._reader = FormatReader().create_file_reader(self._format, **kwargs).get_reader() + self._reader = create_file_reader(self._format, **kwargs).get_reader() self._dest_key_split = dest_key_split self.MAX_TIMESTAMP_STATE_KEY = f"{self._platform}_max_timestamp".lower()