diff --git a/dags/common/pull_ftp.py b/dags/common/pull_ftp.py index 08b7c16c..24b1d8d0 100644 --- a/dags/common/pull_ftp.py +++ b/dags/common/pull_ftp.py @@ -133,8 +133,11 @@ def _differential_pull( logger.msg("Pulling missing files only.") excluded_directories = kwargs["params"]["excluded_directories"] sftp_files = s_ftp.list_files(excluded_directories=excluded_directories) + logger.msg(sftp_files) s3_files = repo.get_all_raw_filenames() + logger.msg(s3_files) diff_files = list(filter(lambda x: x not in s3_files, sftp_files)) + logger.msg(diff_files) return migrate_files(diff_files, s_ftp, repo, logger) diff --git a/requirements-airflow.txt b/requirements-airflow.txt index 45717acb..eceea4bc 100644 --- a/requirements-airflow.txt +++ b/requirements-airflow.txt @@ -1,3 +1,3 @@ -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.3/constraints-3.10.txt -apache-airflow[celery, postgres, redis, cncf.kubernetes]==2.8.3 +apache-airflow[celery, postgres, redis, cncf.kubernetes, sentry]==2.8.3 diff --git a/tests/integration/iop/test_iop_dag_pull_sftp.py b/tests/integration/iop/test_iop_dag_pull_sftp.py index a9b331da..30236786 100644 --- a/tests/integration/iop/test_iop_dag_pull_sftp.py +++ b/tests/integration/iop/test_iop_dag_pull_sftp.py @@ -4,6 +4,7 @@ from iop.repository import IOPRepository from iop.sftp_service import IOPSFTPService from structlog import get_logger +import time DAG_NAME = "iop_pull_sftp" @@ -91,7 +92,9 @@ def test_dag_run(dag, dag_was_paused: bool, iop_empty_repo): def test_dag_migrate_from_FTP(iop_empty_repo): + iop_empty_repo.delete_all() assert len(iop_empty_repo.find_all()) == 0 + with IOPSFTPService() as sftp: migrate_from_ftp( sftp, @@ -110,6 +113,8 @@ def test_dag_migrate_from_FTP(iop_empty_repo): }, ) + time.sleep(5) + expected_files = [ { "pdf": "extracted/2022-07-30T03_02_01_content/1674-1137/1674-1137_46/1674-1137_46_8/1674-1137_46_8_085001/cpc_46_8_085001.pdf", @@ -157,10 +162,16 @@ def test_dag_migrate_from_FTP(iop_empty_repo): }, {"xml": "extracted/aca95c/aca95c.xml"}, ] - for (file_from_repo, expected_file) in zip( - iop_empty_repo.find_all(), expected_files - ): - assert file_from_repo == expected_file + + assert len(iop_empty_repo.find_all()) == len(expected_files) + + iop_pdf_files = sorted(item["pdf"] for item in iop_empty_repo.find_all() if "pdf" in item) + expected_pdf_files = sorted(item["pdf"] for item in expected_files if "pdf" in item) + assert iop_pdf_files == expected_pdf_files + + iop_xml_files = sorted(item["xml"] for item in iop_empty_repo.find_all() if "xml" in item) + expected_xml_files = sorted(item["xml"] for item in expected_files if "xml" in item) + assert iop_xml_files == expected_xml_files def test_dag_trigger_file_processing(): diff --git a/tests/integration/iop/test_repo.py b/tests/integration/iop/test_repo.py index 36fbd1a5..ae757e72 100644 --- a/tests/integration/iop/test_repo.py +++ b/tests/integration/iop/test_repo.py @@ -3,6 +3,7 @@ from iop.sftp_service import IOPSFTPService from pytest import fixture from structlog import get_logger +import time @fixture @@ -13,6 +14,9 @@ def iop_empty_repo(): def test_pull_from_sftp(iop_empty_repo): + iop_empty_repo.delete_all() + assert len(iop_empty_repo.find_all()) == 0 + with IOPSFTPService() as sftp: migrate_from_ftp( sftp, @@ -30,6 +34,9 @@ def test_pull_from_sftp(iop_empty_repo): } } ) + + time.sleep(5) + expected_files = [ { "pdf": "extracted/2022-07-30T03_02_01_content/1674-1137/1674-1137_46/1674-1137_46_8/1674-1137_46_8_085001/cpc_46_8_085001.pdf", @@ -77,7 +84,17 @@ def test_pull_from_sftp(iop_empty_repo): {"xml": "extracted/aca95c/aca95c.xml"}, ] - assert iop_empty_repo.find_all() == expected_files + + assert len(iop_empty_repo.find_all()) == len(expected_files) + + iop_pdf_files = sorted(item["pdf"] for item in iop_empty_repo.find_all() if "pdf" in item) + expected_pdf_files = sorted(item["pdf"] for item in expected_files if "pdf" in item) + assert iop_pdf_files == expected_pdf_files + + iop_xml_files = sorted(item["xml"] for item in iop_empty_repo.find_all() if "xml" in item) + expected_xml_files = sorted(item["xml"] for item in expected_files if "xml" in item) + assert iop_xml_files == expected_xml_files + assert sorted(iop_empty_repo.get_all_raw_filenames()) == sorted( [ "2022-07-30T03_02_01_content.zip", @@ -86,4 +103,4 @@ def test_pull_from_sftp(iop_empty_repo): "2022-09-24T03_01_43_content.zip", "aca95c.zip", ] - ) + ) \ No newline at end of file