From 9114e28dfa816a9fb3128b7c62ffe2d11890e8b9 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 16 Oct 2023 12:21:31 +0800 Subject: [PATCH 01/10] style: fix mypy issue --- .../azure/example_dags/example_adf_run_pipeline.py | 8 ++++++-- .../providers/microsoft/azure/hooks/data_factory.py | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py index 189fa736c..dc1f9c5d2 100644 --- a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py +++ b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py @@ -40,8 +40,6 @@ default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "azure_data_factory_conn_id": "azure_data_factory_default", - "factory_name": DATAFACTORY_NAME, # This can also be specified in the ADF connection. - "resource_group_name": RESOURCE_GROUP_NAME, # This can also be specified in the ADF connection. "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } @@ -194,6 +192,8 @@ def delete_azure_data_factory_storage_pipeline() -> None: run_pipeline_wait = AzureDataFactoryRunPipelineOperatorAsync( task_id="run_pipeline_wait", pipeline_name=PIPELINE_NAME, + factory_name=DATAFACTORY_NAME, + resource_group_name=RESOURCE_GROUP_NAME, ) # [END howto_operator_adf_run_pipeline_async] @@ -201,6 +201,8 @@ def delete_azure_data_factory_storage_pipeline() -> None: run_pipeline_no_wait = AzureDataFactoryRunPipelineOperatorAsync( task_id="run_pipeline_no_wait", pipeline_name=PIPELINE_NAME, + factory_name=DATAFACTORY_NAME, + resource_group_name=RESOURCE_GROUP_NAME, wait_for_termination=False, ) # [END howto_operator_adf_run_pipeline] @@ -209,6 +211,8 @@ def delete_azure_data_factory_storage_pipeline() -> None: pipeline_run_sensor_async = AzureDataFactoryPipelineRunStatusSensorAsync( task_id="pipeline_run_sensor_async", run_id=cast(str, XComArg(run_pipeline_wait, key="run_id")), + factory_name=DATAFACTORY_NAME, + resource_group_name=RESOURCE_GROUP_NAME, ) # [END howto_sensor_pipeline_run_sensor_async] diff --git a/astronomer/providers/microsoft/azure/hooks/data_factory.py b/astronomer/providers/microsoft/azure/hooks/data_factory.py index 68151c385..c606fa08a 100644 --- a/astronomer/providers/microsoft/azure/hooks/data_factory.py +++ b/astronomer/providers/microsoft/azure/hooks/data_factory.py @@ -75,7 +75,7 @@ class AzureDataFactoryHookAsync(AzureDataFactoryHook): def __init__(self, azure_data_factory_conn_id: str): """Initialize the hook instance.""" - self._async_conn: DataFactoryManagementClient = None + self._async_conn: DataFactoryManagementClient | None = None self.conn_id = azure_data_factory_conn_id super().__init__(azure_data_factory_conn_id=azure_data_factory_conn_id) From 46716957123e54a833fdf71165d310de07a55eaa Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 16 Oct 2023 15:38:51 +0800 Subject: [PATCH 02/10] style: ignore example_dags wrong type --- .../example_dags/example_adf_run_pipeline.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py index dc1f9c5d2..d25007bba 100644 --- a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py +++ b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py @@ -7,7 +7,6 @@ from airflow import DAG from airflow.models.xcom_arg import XComArg from airflow.operators.python import PythonOperator - from astronomer.providers.microsoft.azure.operators.data_factory import ( AzureDataFactoryRunPipelineOperatorAsync, ) @@ -86,7 +85,7 @@ def create_adf_storage_pipeline() -> None: df_resource = Factory(location=LOCATION) df = adf_client.factories.create_or_update(RESOURCE_GROUP_NAME, DATAFACTORY_NAME, df_resource) while df.provisioning_state != "Succeeded": - df = adf_client.factories.get(RESOURCE_GROUP_NAME, DATAFACTORY_NAME) + df = adf_client.factories.get(RESOURCE_GROUP_NAME, DATAFACTORY_NAME) # type: ignore[assignment] time.sleep(1) # Create an Azure Storage linked service @@ -95,17 +94,17 @@ def create_adf_storage_pipeline() -> None: storage_string = SecureString(value=CONNECTION_STRING) ls_azure_storage = LinkedServiceResource( - properties=AzureStorageLinkedService(connection_string=storage_string) + properties=AzureStorageLinkedService(connection_string=storage_string) # type: ignore[arg-type] ) adf_client.linked_services.create_or_update( RESOURCE_GROUP_NAME, DATAFACTORY_NAME, STORAGE_LINKED_SERVICE_NAME, ls_azure_storage ) # Create an Azure blob dataset (input) - ds_ls = LinkedServiceReference(reference_name=STORAGE_LINKED_SERVICE_NAME) + ds_ls = LinkedServiceReference(type="LinkedServiceReference", reference_name=STORAGE_LINKED_SERVICE_NAME) ds_azure_blob = DatasetResource( properties=AzureBlobDataset( - linked_service_name=ds_ls, folder_path=BLOB_PATH, file_name=BLOB_FILE_NAME + linked_service_name=ds_ls, folder_path=BLOB_PATH, file_name=BLOB_FILE_NAME # type: ignore[arg-type] ) ) adf_client.datasets.create_or_update( @@ -114,7 +113,7 @@ def create_adf_storage_pipeline() -> None: # Create an Azure blob dataset (output) ds_out_azure_blob = DatasetResource( - properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=OUTPUT_BLOB_PATH) + properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=OUTPUT_BLOB_PATH) # type: ignore[arg-type] ) adf_client.datasets.create_or_update( RESOURCE_GROUP_NAME, DATAFACTORY_NAME, DATASET_OUTPUT_NAME, ds_out_azure_blob @@ -123,8 +122,8 @@ def create_adf_storage_pipeline() -> None: # Create a copy activity blob_source = BlobSource() blob_sink = BlobSink() - ds_in_ref = DatasetReference(reference_name=DATASET_INPUT_NAME) - ds_out_ref = DatasetReference(reference_name=DATASET_OUTPUT_NAME) + ds_in_ref = DatasetReference(type="DatasetReferenceType", reference_name=DATASET_INPUT_NAME) + ds_out_ref = DatasetReference(type="DatasetReferenceType", reference_name=DATASET_OUTPUT_NAME) copy_activity = CopyActivity( name=ACTIVITY_NAME, inputs=[ds_in_ref], outputs=[ds_out_ref], source=blob_source, sink=blob_sink ) From 6fea5d3d9bd37db909cef953f1e7045e39effae5 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 16 Oct 2023 15:52:26 +0800 Subject: [PATCH 03/10] fix(microsoft): make resource_group_name and factory_name required argument --- .../providers/microsoft/azure/hooks/data_factory.py | 6 +++--- .../microsoft/azure/triggers/data_factory.py | 11 +++++------ tests/microsoft/azure/operators/test_data_factory.py | 2 ++ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/astronomer/providers/microsoft/azure/hooks/data_factory.py b/astronomer/providers/microsoft/azure/hooks/data_factory.py index c606fa08a..2504dbdc4 100644 --- a/astronomer/providers/microsoft/azure/hooks/data_factory.py +++ b/astronomer/providers/microsoft/azure/hooks/data_factory.py @@ -113,8 +113,8 @@ async def get_async_conn(self) -> DataFactoryManagementClient: async def get_pipeline_run( self, run_id: str, - resource_group_name: str | None = None, - factory_name: str | None = None, + resource_group_name: str, + factory_name: str, **config: Any, ) -> PipelineRun: """ @@ -132,7 +132,7 @@ async def get_pipeline_run( raise AirflowException(e) async def get_adf_pipeline_run_status( - self, run_id: str, resource_group_name: str | None = None, factory_name: str | None = None + self, run_id: str, resource_group_name: str, factory_name: str ) -> str: """ Connect to Azure Data Factory asynchronously and get the pipeline status by run_id. diff --git a/astronomer/providers/microsoft/azure/triggers/data_factory.py b/astronomer/providers/microsoft/azure/triggers/data_factory.py index 509751438..cc64d4512 100644 --- a/astronomer/providers/microsoft/azure/triggers/data_factory.py +++ b/astronomer/providers/microsoft/azure/triggers/data_factory.py @@ -1,12 +1,11 @@ import asyncio import time -from typing import Any, AsyncIterator, Dict, List, Optional, Tuple +from typing import Any, AsyncIterator, Dict, List, Tuple from airflow.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryPipelineRunStatus, ) from airflow.triggers.base import BaseTrigger, TriggerEvent - from astronomer.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHookAsync, ) @@ -29,8 +28,8 @@ def __init__( run_id: str, azure_data_factory_conn_id: str, poke_interval: float, - resource_group_name: Optional[str] = None, - factory_name: Optional[str] = None, + resource_group_name: str, + factory_name: str, ): super().__init__() self.run_id = run_id @@ -108,8 +107,8 @@ def __init__( run_id: str, azure_data_factory_conn_id: str, end_time: float, - resource_group_name: Optional[str] = None, - factory_name: Optional[str] = None, + resource_group_name: str, + factory_name: str, wait_for_termination: bool = True, check_interval: int = 60, ): diff --git a/tests/microsoft/azure/operators/test_data_factory.py b/tests/microsoft/azure/operators/test_data_factory.py index 765f9a8d3..992ff2d62 100644 --- a/tests/microsoft/azure/operators/test_data_factory.py +++ b/tests/microsoft/azure/operators/test_data_factory.py @@ -25,6 +25,8 @@ class TestAzureDataFactoryRunPipelineOperatorAsync: task_id="run_pipeline", pipeline_name="pipeline", parameters={"myParam": "value"}, + factory_name="factory_name", + resource_group_name="resource_group", ) @mock.patch(f"{MODULE}.AzureDataFactoryRunPipelineOperatorAsync.defer") From ce5fcac6f0677b648fd55fcf980cca518ad4f57f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 16 Oct 2023 15:54:54 +0800 Subject: [PATCH 04/10] style(microsoft): cast type for fixing mypy issue --- .../microsoft/azure/example_dags/example_adf_run_pipeline.py | 1 + astronomer/providers/microsoft/azure/hooks/data_factory.py | 4 ++-- astronomer/providers/microsoft/azure/triggers/data_factory.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py index d25007bba..72c17d5a6 100644 --- a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py +++ b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py @@ -7,6 +7,7 @@ from airflow import DAG from airflow.models.xcom_arg import XComArg from airflow.operators.python import PythonOperator + from astronomer.providers.microsoft.azure.operators.data_factory import ( AzureDataFactoryRunPipelineOperatorAsync, ) diff --git a/astronomer/providers/microsoft/azure/hooks/data_factory.py b/astronomer/providers/microsoft/azure/hooks/data_factory.py index 2504dbdc4..65fa7cec9 100644 --- a/astronomer/providers/microsoft/azure/hooks/data_factory.py +++ b/astronomer/providers/microsoft/azure/hooks/data_factory.py @@ -82,7 +82,7 @@ def __init__(self, azure_data_factory_conn_id: str): async def get_async_conn(self) -> DataFactoryManagementClient: """Get async connection and connect to azure data factory.""" if self._conn is not None: - return self._conn + return cast(DataFactoryManagementClient, self._conn) conn = await sync_to_async(self.get_connection)(self.conn_id) extras = conn.extra_dejson @@ -147,7 +147,7 @@ async def get_adf_pipeline_run_status( factory_name=factory_name, resource_group_name=resource_group_name, ) - status: str = pipeline_run.status + status: str = cast(str, pipeline_run.status) return status except Exception as e: raise AirflowException(e) diff --git a/astronomer/providers/microsoft/azure/triggers/data_factory.py b/astronomer/providers/microsoft/azure/triggers/data_factory.py index cc64d4512..1628ddd6b 100644 --- a/astronomer/providers/microsoft/azure/triggers/data_factory.py +++ b/astronomer/providers/microsoft/azure/triggers/data_factory.py @@ -6,6 +6,7 @@ AzureDataFactoryPipelineRunStatus, ) from airflow.triggers.base import BaseTrigger, TriggerEvent + from astronomer.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHookAsync, ) From 021a45bed7f3404789902763da1f81b6f78e4bfc Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 16 Oct 2023 22:01:14 +0800 Subject: [PATCH 05/10] fix(microsoft): add missing types for fixing example_adf_run_pipeline.py this is added in newer version of datafactory lib --- .../microsoft/azure/example_dags/example_adf_run_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py index 72c17d5a6..47f9fc68b 100644 --- a/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py +++ b/astronomer/providers/microsoft/azure/example_dags/example_adf_run_pipeline.py @@ -123,8 +123,8 @@ def create_adf_storage_pipeline() -> None: # Create a copy activity blob_source = BlobSource() blob_sink = BlobSink() - ds_in_ref = DatasetReference(type="DatasetReferenceType", reference_name=DATASET_INPUT_NAME) - ds_out_ref = DatasetReference(type="DatasetReferenceType", reference_name=DATASET_OUTPUT_NAME) + ds_in_ref = DatasetReference(type="DatasetReference", reference_name=DATASET_INPUT_NAME) + ds_out_ref = DatasetReference(type="DatasetReference", reference_name=DATASET_OUTPUT_NAME) copy_activity = CopyActivity( name=ACTIVITY_NAME, inputs=[ds_in_ref], outputs=[ds_out_ref], source=blob_source, sink=blob_sink ) From 373f7ddae04bcd35ad3aff3892faf198791b00f4 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 18 Oct 2023 18:21:16 +0800 Subject: [PATCH 06/10] fix(azure): remove deprecated pipelineinfo --- .../providers/microsoft/azure/operators/data_factory.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/astronomer/providers/microsoft/azure/operators/data_factory.py b/astronomer/providers/microsoft/azure/operators/data_factory.py index b9a2b6eb8..85bcdf2e1 100644 --- a/astronomer/providers/microsoft/azure/operators/data_factory.py +++ b/astronomer/providers/microsoft/azure/operators/data_factory.py @@ -6,7 +6,6 @@ AzureDataFactoryHook, AzureDataFactoryPipelineRunException, AzureDataFactoryPipelineRunStatus, - PipelineRunInfo, ) from airflow.providers.microsoft.azure.operators.data_factory import ( AzureDataFactoryRunPipelineOperator, @@ -67,12 +66,11 @@ def execute(self, context: Context) -> None: context["ti"].xcom_push(key="run_id", value=run_id) end_time = time.time() + self.timeout - pipeline_run_info = PipelineRunInfo( + pipeline_run_status = hook.get_pipeline_run_status( run_id=run_id, - factory_name=self.factory_name, resource_group_name=self.resource_group_name, + factory_name=self.factory_name, ) - pipeline_run_status = hook.get_pipeline_run_status(**pipeline_run_info) if pipeline_run_status not in AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES: self.defer( timeout=self.execution_timeout, From 0c09b4460510db3e46e51103d4b9ee0a2b33868b Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 18 Oct 2023 18:42:01 +0800 Subject: [PATCH 07/10] ci(circle-ci): remove unnecessay dependency build-docs --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 753da3469..12d991eb4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -160,7 +160,7 @@ jobs: sudo apt-get install libsasl2-dev - run: name: Install Dependencies - command: pip install -U -e .[all,docs,mypy] + command: pip install -U -e .[docs] - run: name: Run Sphinx command: | From 2009f93821348e037c1684154648302860f62ea3 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 18 Oct 2023 19:16:02 +0800 Subject: [PATCH 08/10] ci(circle-ci): add temperorary solution for mypy env setup --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index a6a74edf8..4f5ffc68e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -114,6 +114,7 @@ mypy = types-Markdown types-PyMySQL types-PyYAML + snowflake-connector-python>=3.3.0 # Temporary solution for fixing the issue that pip cannot find proper connector version # All extras from above except 'mypy', 'docs' and 'tests' all = From bdef5a1be82ca7df9acdb2dbb3c743c528f36fbd Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 18 Oct 2023 20:08:00 +0800 Subject: [PATCH 09/10] test(azure): fix missing argument issue --- tests/microsoft/azure/sensors/test_data_factory.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/microsoft/azure/sensors/test_data_factory.py b/tests/microsoft/azure/sensors/test_data_factory.py index e961854e3..da1810614 100644 --- a/tests/microsoft/azure/sensors/test_data_factory.py +++ b/tests/microsoft/azure/sensors/test_data_factory.py @@ -19,6 +19,8 @@ class TestAzureDataFactoryPipelineRunStatusSensorAsync: SENSOR = AzureDataFactoryPipelineRunStatusSensorAsync( task_id="pipeline_run_sensor_async", run_id=RUN_ID, + factory_name="factory_name", + resource_group_name="resource_group_name", ) @mock.patch(f"{MODULE}.AzureDataFactoryPipelineRunStatusSensorAsync.defer") @@ -61,5 +63,9 @@ def test_poll_interval_deprecation_warning(self): # TODO: Remove once deprecated with pytest.warns(expected_warning=DeprecationWarning): AzureDataFactoryPipelineRunStatusSensorAsync( - task_id="pipeline_run_sensor_async", run_id=self.RUN_ID, poll_interval=5.0 + task_id="pipeline_run_sensor_async", + run_id=self.RUN_ID, + poll_interval=5.0, + factory_name="factory_name", + resource_group_name="resource_group_name", ) From 69378b5e10b4aca679bc0e0b0c2130f8ad1b5606 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 23 Oct 2023 08:52:02 +0900 Subject: [PATCH 10/10] style(microsoft): ignore type casting testing coverage --- astronomer/providers/microsoft/azure/hooks/data_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/astronomer/providers/microsoft/azure/hooks/data_factory.py b/astronomer/providers/microsoft/azure/hooks/data_factory.py index 65fa7cec9..268be4dea 100644 --- a/astronomer/providers/microsoft/azure/hooks/data_factory.py +++ b/astronomer/providers/microsoft/azure/hooks/data_factory.py @@ -82,7 +82,7 @@ def __init__(self, azure_data_factory_conn_id: str): async def get_async_conn(self) -> DataFactoryManagementClient: """Get async connection and connect to azure data factory.""" if self._conn is not None: - return cast(DataFactoryManagementClient, self._conn) + return cast(DataFactoryManagementClient, self._conn) # pragma: no cover conn = await sync_to_async(self.get_connection)(self.conn_id) extras = conn.extra_dejson