Skip to content

Commit

Permalink
Advent: rename harvesting dag files & fix broken link in readme (#190)
Browse files Browse the repository at this point in the history
* rename harvesting dag files

* fix broken image link in readme

* fix failing test
  • Loading branch information
miguelgrc authored Dec 20, 2023
1 parent ada029b commit 90ae504
Show file tree
Hide file tree
Showing 20 changed files with 32 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

The following image describes the process for each publishers. Please note that the number of tasks is the minimum, more tasks can be implemented if the need raises.

![DAG Architecture](./airflow_workflows.png)
![DAG Architecture](./documentation/airflow_workflows.png)

## Run with docker-compose

Expand Down
4 changes: 2 additions & 2 deletions dags/aps/aps.py → dags/aps/aps_pull_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
schedule="@hourly",
params={"from_date": None, "until_date": None, "per_page": None},
)
def aps_fetch_api():
def aps_pull_api():
@task()
def set_fetching_intervals(repo: IRepository = APSRepository(), **kwargs):
return set_harvesting_interval(repo=repo, **kwargs)
Expand Down Expand Up @@ -53,4 +53,4 @@ def trigger_files_processing(key, repo: IRepository = APSRepository()):
trigger_files_processing(key)


APS_download_files_dag = aps_fetch_api()
APS_download_files_dag = aps_pull_api()
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"filenames_pull": {"enabled": False, "filenames": [], "force_from_ftp": False},
},
)
def elsevier_pull_ftp():
logger = get_logger().bind(class_name="elsevier_pull_ftp")
def elsevier_pull_sftp():
logger = get_logger().bind(class_name="elsevier_pull_sftp")

@task()
def migrate_from_ftp(
Expand Down Expand Up @@ -55,4 +55,4 @@ def trigger_file_processing(
trigger_file_processing(filenames=archive_names)


dag_taskflow = elsevier_pull_ftp()
dag_taskflow = elsevier_pull_sftp()
4 changes: 2 additions & 2 deletions dags/hindawi/hindawi.py → dags/hindawi/hindawi_pull_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
schedule="30 */3 * * *",
params={"from_date": None, "until_date": None, "record_doi": None},
)
def hindawi_fetch_api():
def hindawi_pull_api():
@task()
def set_fetching_intervals(repo: IRepository = HindawiRepository(), **kwargs):
return set_harvesting_interval(repo=repo, **kwargs)
Expand Down Expand Up @@ -49,4 +49,4 @@ def trigger_files_processing(key, repo: IRepository = HindawiRepository()):
trigger_files_processing(key)


hindawi_download_files_dag = hindawi_fetch_api()
hindawi_download_files_dag = hindawi_pull_api()
6 changes: 3 additions & 3 deletions dags/iop/dag_pull_ftp.py → dags/iop/iop_pull_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"filenames_pull": {"enabled": False, "filenames": [], "force_from_ftp": False},
},
)
def iop_pull_ftp():
logger = get_logger().bind(class_name="iop_pull_ftp")
def iop_pull_sftp():
logger = get_logger().bind(class_name="iop_pull_sftp")

@task()
def migrate_from_ftp(repo=IOPRepository(), sftp=IOPSFTPService(), **kwargs):
Expand Down Expand Up @@ -45,4 +45,4 @@ def trigger_file_processing(filenames=None):
trigger_file_processing(filenames=filenames)


dag_taskflow = iop_pull_ftp()
dag_taskflow = iop_pull_sftp()
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"filenames_pull": {"enabled": False, "filenames": [], "force_from_ftp": False},
},
)
def springer_pull_ftp():
logger = get_logger().bind(class_name="springer_pull_ftp")
def springer_pull_sftp():
logger = get_logger().bind(class_name="springer_pull_sftp")

@task()
def migrate_from_ftp(
Expand Down Expand Up @@ -44,4 +44,4 @@ def trigger_file_processing(repo=SpringerRepository(), filenames=None):
trigger_file_processing(filenames=filenames)


dag_taskflow = springer_pull_ftp()
dag_taskflow = springer_pull_sftp()
4 changes: 2 additions & 2 deletions documentation/apis_instruction.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ We can just restart tasks, which are still running or failed.
## Restart a run from the beginning

1. Restart run from the beggining:
- api: `/api/v1/dags/aps_fetch_api/dagRuns`
- api: `/api/v1/dags/aps_pull_api/dagRuns`
- header
- Content-Type : application/json
- Accept : application/json
Expand All @@ -108,7 +108,7 @@ We can just restart tasks, which are still running or failed.
Same as Restart a run from the beginning, just passing wanted parameters in conf field:

1. Restart run from the beggining:
- api: `/api/v1/dags/aps_fetch_api/dagRuns`
- api: `/api/v1/dags/aps_pull_api/dagRuns`
- header
- Content-Type : application/json
- Accept : application/json
Expand Down
4 changes: 2 additions & 2 deletions documentation/harvester_docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
![airflow_running_with_customize_params](./airflow_running_with_customize_params.png)

- Running a DAG using the API:
- POST: `api/v1/dags/aps_fetch_api/dagRuns`
- POST: `api/v1/dags/aps_pull_api/dagRuns`
- Header: `{Content-Type : application/json, Accept : application/json}`
- Body:
```json
Expand All @@ -36,7 +36,7 @@
"execution_date": "2023-08-01T12:56:22Z",
"conf": {"from_date": "2023-01-01", "until_date": "2023-02-01"},
"note": "string"
}' --user "admin:admin" http://localhost:8080/api/v1/dags/aps_fetch_api/dagRuns
}' --user "admin:admin" http://localhost:8080/api/v1/dags/aps_pull_api/dagRuns
```

3. Trigger a DAG Run from UI:
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/aps/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from aps.repository import APSRepository
from aps.utils import save_file_in_s3

DAG_NAME = "aps_fetch_api"
TRIGGERED_DAG_NAME = "aps_fetch_api"
DAG_NAME = "aps_pull_api"
TRIGGERED_DAG_NAME = "aps_pull_api"


@pytest.fixture
Expand All @@ -25,7 +25,7 @@ def test_dag_loaded(dag: DAG):


@pytest.mark.vcr
def test_aps_fetch_api(dag: DAG):
def test_aps_pull_api(dag: DAG):
repo = APSRepository()
repo.delete_all()
dates = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
from elsevier.repository import ElsevierRepository
from elsevier.sftp_service import ElsevierSFTPService
from structlog import get_logger
from pytest import fixture

DAG_NAME = "elsevier_pull_ftp"
DAG_NAME = "elsevier_pull_sftp"


@pytest.fixture
@fixture
def dag():
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert dagbag.import_errors.get(f"dags/{DAG_NAME}.py") is None
return dagbag.get_dag(dag_id=DAG_NAME)


@pytest.fixture
@fixture
def elsevier_empty_repo():
repo = ElsevierRepository()
repo.delete_all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from iop.sftp_service import IOPSFTPService
from structlog import get_logger

DAG_NAME = "iop_pull_ftp"
DAG_NAME = "iop_pull_sftp"


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from common.utils import check_dagrun_state
from freezegun import freeze_time
from pytest import fixture, raises
from springer.dag_process_file import (
from springer.springer_process_file import (
springer_enhance_file,
springer_enrich_file,
springer_parse_file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from springer.sftp_service import SpringerSFTPService
from structlog import get_logger

DAG_NAME = "springer_pull_ftp"
DAG_NAME = "springer_pull_sftp"


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion tests/units/aps/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from common.utils import set_harvesting_interval
from freezegun import freeze_time

DAG_NAME = "aps_fetch_api"
DAG_NAME = "aps_pull_api"
TRIGGERED_DAG_NAME = "aps_process_file"


Expand Down
4 changes: 2 additions & 2 deletions tests/units/elsevier/test_files_proccessing_tirgger.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def elsevier_empty_repo():

@fixture
def logger():
return get_logger().bind(class_name="elsevier_pull_ftp")
return get_logger().bind(class_name="elsevier_pull_sftp")


@fixture
Expand All @@ -39,7 +39,7 @@ def test_trigger_file_processing_elsevier(elsevier_empty_repo, migrated_files):
files = trigger_file_processing_elsevier(
publisher="elsevier",
repo=elsevier_empty_repo,
logger=get_logger().bind(class_name="elsevier_pull_ftp"),
logger=get_logger().bind(class_name="elsevier_pull_sftp"),
filenames=migrated_files,
)
assert sorted(files) == sorted(
Expand Down
2 changes: 1 addition & 1 deletion tests/units/hindawi/test_hindawi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from common.utils import set_harvesting_interval
from hindawi.utils import save_file_in_s3, split_xmls

DAG_NAME = "hindawi_fetch_api"
DAG_NAME = "hindawi_pull_api"
TRIGGERED_DAG_NAME = "hindawi_file_processing"
hindawi_xml = "<wrapper><ListRecords><record>record</record></ListRecords></wrapper>"

Expand Down

0 comments on commit 90ae504

Please sign in to comment.