diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 4389e08a791a6..5a99f7d9642cd 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -294,6 +294,10 @@ transfers: target-integration-name: Microsoft Azure Blob Storage how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb + - source-integration-name: Amazon Simple Storage Service (S3) + target-integration-name: Microsoft Azure Blob Storage + how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst + python-module: airflow.providers.microsoft.azure.transfers.s3_to_wasb - source-integration-name: Microsoft Azure Blob Storage target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst diff --git a/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py b/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py new file mode 100644 index 0000000000000..53e7bce12f51a --- /dev/null +++ b/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py @@ -0,0 +1,266 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import tempfile +from functools import cached_property +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +# Create three custom exception that are +class TooManyFilesToMoveException(Exception): + """Custom exception thrown when attempting to move multiple files from S3 to a single Azure Blob.""" + + def __init__(self, number_of_files: int): + # Call the parent constructor with a simple message + message: str = f"{number_of_files} cannot be moved to a single Azure Blob." + super().__init__(message) + + +class InvalidAzureBlobParameters(Exception): + """Custom exception raised when neither a blob_prefix or blob_name are passed to the operator.""" + + def __init__(self): + message: str = "One of blob_name or blob_prefix must be provided." + super().__init__(message) + + +class InvalidKeyComponents(Exception): + """Custom exception raised when neither a full_path or file_name + prefix are provided to _create_key.""" + + def __init__(self): + message = "Either full_path of prefix and file_name must not be None" + super().__init__(message) + + +class S3ToAzureBlobStorageOperator(BaseOperator): + """ + Operator to move data from and AWS S3 Bucket to Microsoft Azure Blob Storage. + + A similar class exists to move data from Microsoft Azure Blob Storage to an AWS S3 Bucket, and lives in + the airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py file + + Either an explicit S3 key can be provided, or a prefix containing the files that are to be transferred to + Azure blob storage. The same holds for a Blob name; an explicit name can be passed, or a Blob prefix can + be provided for the file to be stored to + + .. seealso: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator::SFTPToWasbOperator` + + :param aws_conn_id: ID for the AWS S3 connection to use. + :param wasb_conn_id: ID for the Azure Blob Storage connection to use. + :param s3_bucket: The name of the AWS S3 bucket that an object (or objects) would be transferred from. + (templated) + :param container_name: The name of the Azure Storage Blob container an object (or objects) would be + transferred to. (templated) + :param s3_prefix: Prefix string that filters any S3 objects that begin with this prefix. (templated) + :param s3_key: An explicit S3 key (object) to be transferred. (templated) + :param blob_prefix: Prefix string that would provide a path in the Azure Storage Blob container for an + object (or objects) to be moved to. (templated) + :param blob_name: An explicit blob name that an object would be transferred to. This can only be used + if a single file is being moved. If there are multiple files in an S3 bucket that are to be moved + to a single Azure blob, an exception will be raised. (templated) + :param create_container: True if a container should be created if it did not already exist, False + otherwise. + :param replace: If a blob exists in the container and replace takes a value of true, it will be + overwritten. If replace is False and a blob exists in the container, the file will NOT be + overwritten. + :param s3_verify: Whether or not to verify SSL certificates for S3 connection. + By default, SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :param s3_extra_args: kwargs to pass to S3Hook. + :param wasb_extra_args: kwargs to pass to WasbHook. + """ + + template_fields: Sequence[str] = ( + "s3_bucket", + "container_name", + "s3_prefix", + "s3_key", + "blob_prefix", + "blob_name", + ) + + def __init__( + self, + *, + aws_conn_id: str = "aws_default", + wasb_conn_id: str = "wasb_default", + s3_bucket: str, + container_name: str, + s3_prefix: str | None = None, # Only use this to pull an entire directory of files + s3_key: str | None = None, # Only use this to pull a single file + blob_prefix: str | None = None, + blob_name: str | None = None, + create_container: bool = False, + replace: bool = False, + s3_verify: bool = False, + s3_extra_args: dict | None = None, + wasb_extra_args: dict | None = None, + **kwargs, + ): + # Call to constructor of the inherited BaseOperator class + super().__init__(**kwargs) + + self.aws_conn_id = aws_conn_id + self.wasb_conn_id = wasb_conn_id + self.s3_bucket = s3_bucket + self.container_name = container_name + self.s3_prefix = s3_prefix + self.s3_key = s3_key + self.blob_prefix = blob_prefix + self.blob_name = blob_name + self.create_container = create_container + self.replace = replace + self.s3_verify = s3_verify + self.s3_extra_args = s3_extra_args or {} + self.wasb_extra_args = wasb_extra_args or {} + + # These cached properties come in handy when working with hooks. Rather than closing and opening new + # hooks, the same hook can be used across multiple methods (without having to use the constructor to + # create the hook) + @cached_property + def s3_hook(self) -> S3Hook: + """Create and return an S3Hook.""" + return S3Hook(aws_conn_id=self.aws_conn_id, verify=self.s3_verify, **self.s3_extra_args) + + @cached_property + def wasb_hook(self) -> WasbHook: + """Create and return a WasbHook.""" + return WasbHook(wasb_conn_id=self.wasb_conn_id, **self.wasb_extra_args) + + def execute(self, context: Context) -> list[str]: + """Execute logic below when operator is executed as a task.""" + self.log.info( + "Getting %s from %s" if self.s3_key else "Getting all files start with %s from %s", + self.s3_key if self.s3_key else self.s3_prefix, + self.s3_bucket, + ) + + # Pull a list of files to move from S3 to Azure Blob storage + files_to_move: list[str] = self.get_files_to_move() + + # Check to see if there are indeed files to move. If so, move each of these files. Otherwise, output + # a logging message that denotes there are no files to move + if files_to_move: + for file_name in files_to_move: + self.move_file(file_name) + + # Assuming that files_to_move is a list (which it always should be), this will get "hit" after the + # last file is moved from S3 -> Azure Blob + self.log.info("All done, uploaded %s to Azure Blob.", len(files_to_move)) + + else: + # If there are no files to move, a message will be logged. May want to consider alternative + # functionality (should an exception instead be raised?) + self.log.info("There are no files to move!") + + # Return a list of the files that were moved + return files_to_move + + def get_files_to_move(self) -> list[str]: + """Determine the list of files that need to be moved, and return the name.""" + if self.s3_key: + # Only pull the file name from the s3_key, drop the rest of the key + files_to_move: list[str] = [self.s3_key.split("/")[-1]] + else: + # Pull the keys from the s3_bucket using the provided prefix. Remove the prefix from the file + # name, and add to the list of files to move + s3_keys: list[str] = self.s3_hook.list_keys(bucket_name=self.s3_bucket, prefix=self.s3_prefix) + files_to_move = [s3_key.replace(f"{self.s3_prefix}/", "", 1) for s3_key in s3_keys] + + # Now, make sure that there are not too many files to move to a single Azure blob + if self.blob_name and len(files_to_move) > 1: + raise TooManyFilesToMoveException(len(files_to_move)) + + if not self.replace: + # Only grab the files from S3 that are not in Azure Blob already. This will prevent any files that + # exist in both S3 and Azure Blob from being overwritten. If a blob_name is provided, check to + # see if that blob exists + azure_blob_files: list[str] = [] + + if self.blob_name: + # If the singular blob (stored at self.blob_name) exists, add it to azure_blob_files so it + # can be removed from the list of files to move + if self.wasb_hook.check_for_blob(self.container_name, self.blob_name): + azure_blob_files.append(self.blob_name.split("/")[-1]) + + elif self.blob_prefix: + azure_blob_files += self.wasb_hook.get_blobs_list_recursive( + container_name=self.container_name, prefix=self.blob_prefix + ) + else: + raise InvalidAzureBlobParameters + + # This conditional block only does one thing - it alters the elements in the files_to_move list. + # This list is being trimmed to remove the existing files in the Azure Blob (as mentioned above) + existing_files = azure_blob_files if azure_blob_files else [] + files_to_move = list(set(files_to_move) - set(existing_files)) + + return files_to_move + + def move_file(self, file_name: str) -> None: + """Move file from S3 to Azure Blob storage.""" + with tempfile.NamedTemporaryFile("w") as temp_file: + # If using an s3_key, this creates a scenario where the only file in the files_to_move + # list is going to be the name pulled from the s3_key. It's not verbose, but provides + # standard implementation across the operator + source_s3_key: str = self._create_key(self.s3_key, self.s3_prefix, file_name) + + # Create retrieve the S3 client itself, rather than directly using the hook. Download the file to + # the temp_file.name + s3_client = self.s3_hook.get_conn() + s3_client.download_file(self.s3_bucket, source_s3_key, temp_file.name) + + # Load the file to Azure Blob using either the key that has been passed in, or the key + # from the list of files present in the s3_prefix, plus the blob_prefix. There may be + # desire to only pass in an S3 key, in which case, the blob_name should be derived from + # the S3 key + destination_azure_blob_name: str = self._create_key(self.blob_name, self.blob_prefix, file_name) + self.wasb_hook.load_file( + file_path=temp_file.name, + container_name=self.container_name, + blob_name=destination_azure_blob_name, + create_container=self.create_container, + **self.wasb_extra_args, + ) + + @staticmethod + def _create_key(full_path: str | None, prefix: str | None, file_name: str | None): + """Return a file key using its components.""" + if full_path: + return full_path + elif prefix and file_name: + return f"{prefix}/{file_name}" + else: + raise InvalidKeyComponents diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst new file mode 100644 index 0000000000000..59a394b0935fb --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst @@ -0,0 +1,62 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +================================================= +Amazon S3 to Azure Blob Storage Transfer Operator +================================================= + +The Blob service stores text and binary data as objects in the cloud. +The Blob service offers the following three resources: the storage account, containers, and blobs. +Within your storage account, containers provide a way to organize sets of blobs. +For more information about the service visit `Azure Blob Storage API documentation `_. +This page shows how to upload data from local filesystem to Azure Blob Storage. + +Use the ``S3ToWasbOperator`` transfer to copy the data from Amazon Simple Storage Service (S3) to Azure Blob Storage. + +Prerequisite Tasks +------------------ + +.. include:: ../operators/_partials/prerequisite_tasks.rst + +Operators +--------- + +.. _howto/operator:S3ToWasbOperator: + + +Transfer Data from Amazon S3 to Blob Storage +============================================ + +To copy data from an Amazon AWS S3 Bucket to an Azure Blob Storage container, the following operator can be used: +:class:`~airflow.providers.microsoft.azure.transfers.s3_to_wasb.S3ToWasbOperator` + +Example usage: + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_s3_to_wasb.py + :language: python + :dedent: 4 + :start-after: [START howto_transfer_s3_to_wasb] + :end-before: [END howto_transfer_s3_to_wasb] + +Reference +--------- + +For further information, please refer to the following links: + +* `AWS boto3 library documentation for Amazon S3 `__ +* `Azure Blob Storage client library `__ diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 43fb3f2f11089..cc71970041c5a 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -795,6 +795,7 @@ ], "plugins": [], "cross-providers-deps": [ + "amazon", "google", "oracle", "sftp" diff --git a/tests/providers/microsoft/azure/transfers/test_s3_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_s3_to_wasb.py new file mode 100644 index 0000000000000..6d96a8f8306c7 --- /dev/null +++ b/tests/providers/microsoft/azure/transfers/test_s3_to_wasb.py @@ -0,0 +1,272 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from io import BytesIO +from unittest import mock + +import pytest +from moto import mock_aws + +from airflow.providers.microsoft.azure.transfers.s3_to_wasb import ( + InvalidKeyComponents, + S3ToAzureBlobStorageOperator, + TooManyFilesToMoveException, +) + +TASK_ID = "test-s3-to-azure-blob-operator" +S3_BUCKET = "test-bucket" +CONTAINER_NAME = "test-container" +PREFIX = "TEST" +TEMPFILE_NAME = "test-tempfile" +MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"] + +# Here are some of the tests that need to be run (for the get_files_to_move() function) +# 1. Prefix with no existing files, without replace [DONE] +# 2. Prefix with existing files, without replace [DONE] +# 3. Prefix with existing files, with replace [DONE] +# 4. Two keys without existing files, without replace [DONE] +# 5. Two keys with existing files, without replace [DONE] +# 6. Two keys with existing files, with replace [DONE] +# 7. S3 key with Azure prefix, without existing files, without replace [DONE] +# 8. S3 key with Azure prefix, with existing files, without replace [DONE] +# 9. S3 key with Azure prefix, with existing files, with replace [SKIPPED] + +# Other tests that need to be run +# - Test __init__ [DONE] +# - Test execute() [DONE] +# - Test S3 prefix being passed to an Azure Blob name [DONE] +# - Test move_file() [DONE] +# - Test _create_key() [DONE] + + +@mock_aws +class TestS3ToAzureBlobStorageOperator: + def test__init__(self): + # Create a mock operator with a single set of parameters that are used to test the __init__() + # constructor. Not every parameter needs to be provided, as this code will also be used to test the + # default parameters that are configured + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_prefix=PREFIX, + container_name=CONTAINER_NAME, + blob_prefix=PREFIX, + replace=True, + ) + + # ... is None is used to validate if a value is None, while not ... is used to evaluate if a value + # is False + assert operator.task_id == TASK_ID + assert operator.aws_conn_id == "aws_default" + assert operator.wasb_conn_id == "wasb_default" + assert operator.s3_bucket == S3_BUCKET + assert operator.container_name == CONTAINER_NAME + assert operator.s3_prefix == PREFIX + assert operator.s3_key is None + assert operator.blob_prefix == PREFIX + assert operator.blob_name is None + assert not operator.create_container # Should be false (match default value in constructor) + assert operator.replace + assert not operator.s3_verify # Should be false (match default value in constructor) + assert operator.s3_extra_args == {} + assert operator.wasb_extra_args == {} + + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.S3Hook") + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.WasbHook") + @mock.patch("tempfile.NamedTemporaryFile") + def test__execute__prefix_without_replace_empty_destination( + self, tempfile_mock, wasb_mock_hook, s3_mock_hook + ): + # Set the list files that the S3Hook should return, along with an empty list of files in the Azure + # Blob storage container. This scenario was picked for testing, as it's most likely the most common + # setting the operator will be used in + s3_mock_hook.return_value.list_keys.return_value = MOCK_FILES + wasb_mock_hook.return_value.get_blobs_list_recursive.return_value = [] + + s3_mock_hook.return_value.download_file.return_value = BytesIO().write(b"test file contents") + tempfile_mock.return_value.__enter__.return_value.name = TEMPFILE_NAME + + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_prefix=PREFIX, + container_name=CONTAINER_NAME, + blob_prefix=PREFIX, + ) + # Placing an empty "context" object here (using None) + uploaded_files = operator.execute(None) + assert sorted(uploaded_files) == sorted(MOCK_FILES) + + # Using the default connection ID, along with the default value of verify (for the S3 hook) + s3_mock_hook.assert_called_once_with(aws_conn_id="aws_default", verify=False) + wasb_mock_hook.assert_called_once_with(wasb_conn_id="wasb_default") + + # There are a number of very similar tests that use the same mocking, and for the most part, the same + # logic. These tests are used to validate the records being returned by the get_files_to_move() method, + # which heavily drives the successful execution of the operator + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.S3Hook") + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.WasbHook") + @pytest.mark.parametrize( # Please see line 37 above for args used for parametrization + "s3_existing_files,wasb_existing_files,returned_files,s3_prefix,blob_prefix,replace", + [ + # s3_existing files, wasb_existing_files, returned_files, s3_prefix, wasb_prefix, replace + (MOCK_FILES, [], MOCK_FILES, PREFIX, PREFIX, False), # Task 1 from above + (MOCK_FILES, MOCK_FILES[1:], [MOCK_FILES[0]], PREFIX, PREFIX, False), # Task 2 from above + (MOCK_FILES, MOCK_FILES[1:], MOCK_FILES, PREFIX, PREFIX, True), # Task 3 from above + ], + ) + def test_get_files_to_move__both_prefix( + self, + wasb_mock_hook, + s3_mock_hook, + s3_existing_files, + wasb_existing_files, + returned_files, + s3_prefix, + blob_prefix, + replace, + ): + # Set the list files that the S3Hook should return + s3_mock_hook.return_value.list_keys.return_value = s3_existing_files + wasb_mock_hook.return_value.get_blobs_list_recursive.return_value = wasb_existing_files + + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_prefix=s3_prefix, + container_name=CONTAINER_NAME, + blob_prefix=blob_prefix, + replace=replace, + ) + # Placing an empty "context" object here (using None) + uploaded_files = operator.get_files_to_move() + assert sorted(uploaded_files) == sorted(returned_files) + + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.WasbHook") + @pytest.mark.parametrize( + "azure_file_exists,returned_files,replace", + [ + # azure_file_exists, returned_files, replace + (False, ["TEST1.csv"], False), # Task 4 from above + (True, [], False), # Task 5 from above + (True, ["TEST1.csv"], True), # Task 6 from above + ], + ) + def test_get_file_to_move__both_key(self, wasb_mock_hook, azure_file_exists, returned_files, replace): + # Different than above, able to remove the mocking of the list_keys method for the S3 hook (since a + # single key is being passed, rather than a prefix). Testing when a single S3 key is being moved to + # a deterministic Blob name in the operator + wasb_mock_hook.return_value.check_for_blob.return_value = azure_file_exists + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_key="TEST/TEST1.csv", + container_name=CONTAINER_NAME, + blob_name="TEST/TEST1.csv", + replace=replace, + ) + uploaded_files = operator.get_files_to_move() + + # Only the file name should be returned, rather than the entire blob name + assert sorted(uploaded_files) == sorted(returned_files) + + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.WasbHook") + @pytest.mark.parametrize( + "wasb_existing_files,returned_files", + [ + # wasb_existing_files, returned_files + ([], ["TEST1.csv"]), # Task 8 from above + (["TEST1.csv"], []), # Task 9 from above + ], + ) + def test_get_files_to_move__s3_key_wasb_prefix(self, wasb_mock_hook, wasb_existing_files, returned_files): + # A single S3 key is being used to move to a file to a container using a prefix. The files being + # returned should take the same name as the file key that was passed to s3_key + wasb_mock_hook.return_value.get_blobs_list_recursive.return_value = wasb_existing_files + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_key="TEST/TEST1.csv", + container_name=CONTAINER_NAME, + blob_prefix=PREFIX, + ) + uploaded_files = operator.get_files_to_move() + assert sorted(uploaded_files) == sorted(returned_files) + + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.S3Hook") + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.WasbHook") + def test__get_files_to_move__s3_prefix_blob_name_without_replace_empty_destination( + self, wasb_mock_hook, s3_mock_hook + ): + # Set the list files that the S3Hook should return + s3_mock_hook.return_value.list_keys.return_value = MOCK_FILES + wasb_mock_hook.return_value.get_blobs_list_recursive.return_value = [] + + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_prefix=PREFIX, + container_name=CONTAINER_NAME, + blob_name="TEST/TEST1.csv", + ) + + # This should throw an exception, since more than a single S3 object is attempted to move to a single + # Azure blob + with pytest.raises(TooManyFilesToMoveException): + operator.get_files_to_move() + + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.S3Hook") + @mock.patch("airflow.providers.microsoft.azure.transfers.s3_to_wasb.WasbHook") + def test__move_file(self, wasb_mock_hook, s3_mock_hook): + # Only a single S3 key is provided, and there are no blobs in the container. This means that this file + # should be moved, and the move_file method will be executed + wasb_mock_hook.return_value.get_blobs_list_recursive.return_value = [] + s3_mock_hook.return_value.download_file.return_value = BytesIO().write(b"test file contents") + + operator = S3ToAzureBlobStorageOperator( + task_id=TASK_ID, + s3_bucket=S3_BUCKET, + s3_key="TEST/TEST1.csv", + container_name=CONTAINER_NAME, + blob_prefix=PREFIX, + ) + + # Call the move_file method + operator.move_file("TEST1.csv") + + # Test that the s3_hook has been called once (to create the client), and the wasb_hook has been called + # to load the file to WASB + operator.s3_hook.get_conn.assert_called_once() + operator.wasb_hook.load_file.assert_called_once_with( + file_path=mock.ANY, + container_name=CONTAINER_NAME, + blob_name=f"{PREFIX}/TEST1.csv", + create_container=False, + ) + + def test__create_key(self): + # There are three tests that will be run: + # 1. Test will a full path + # 2. Test with a prefix and a file name + # 3. Test with no full path, and a missing file name + assert S3ToAzureBlobStorageOperator._create_key("TEST/TEST1.csv", None, None) == "TEST/TEST1.csv" + assert S3ToAzureBlobStorageOperator._create_key(None, "TEST", "TEST1.csv") == "TEST/TEST1.csv" + with pytest.raises(InvalidKeyComponents): + S3ToAzureBlobStorageOperator._create_key(None, "TEST", None) diff --git a/tests/system/providers/microsoft/azure/example_s3_to_wasb.py b/tests/system/providers/microsoft/azure/example_s3_to_wasb.py new file mode 100644 index 0000000000000..48fd428b9cde4 --- /dev/null +++ b/tests/system/providers/microsoft/azure/example_s3_to_wasb.py @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3CreateObjectOperator, + S3DeleteBucketOperator, + S3DeleteObjectsOperator, +) +from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +sys_test_context_task = SystemTestContextBuilder().build() + +# Set constants +DAG_ID: str = "example_s3_to_wasb" +S3_PREFIX: str = "TEST" +S3_KEY: str = "TEST/TEST1.csv" +BLOB_PREFIX: str = "TEST" +BLOB_NAME: str = "TEST/TEST1.csv" + + +with DAG(dag_id=DAG_ID, start_date=datetime(2024, 1, 1), schedule="@once", catchup=False) as dag: + # Pull the task context, as well as the ENV_ID + test_context = sys_test_context_task() + env_id = test_context["ENV_ID"] + + # Create the bucket name and container name using the ENV_ID + s3_bucket_name: str = f"{env_id}-s3-bucket" + wasb_container_name: str = f"{env_id}-wasb-container" + + # Create an S3 bucket as part of testing set up, which will be removed by the remove_s3_bucket during + # teardown + create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", bucket_name=s3_bucket_name) + + # Add a file to the S3 bucket created above. Part of testing set up, this file will eventually be removed + # by the remove_s3_object task during teardown + create_s3_object = S3CreateObjectOperator( + task_id="create_s3_object", + s3_bucket=s3_bucket_name, + s3_key=S3_KEY, + data=b"Testing...", + replace=True, + encrypt=False, + ) + + # [START howto_transfer_s3_to_wasb] + s3_to_wasb = S3ToAzureBlobStorageOperator( + task_id="s3_to_wasb", + s3_bucket=s3_bucket_name, + container_name=wasb_container_name, + s3_key=S3_KEY, + blob_prefix=BLOB_PREFIX, # Using a prefix for this + trigger_rule=TriggerRule.ALL_DONE, + replace=True, + ) + # [END howto_transfer_s3_to_wasb] + + # Part of tear down, remove all the objects at the S3_PREFIX + remove_s3_object = S3DeleteObjectsOperator( + task_id="remove_s3_object", bucket=s3_bucket_name, prefix=S3_PREFIX, trigger_rule=TriggerRule.ALL_DONE + ) + + # Remove the S3 bucket created as part of setup + remove_s3_bucket = S3DeleteBucketOperator( + task_id="remove_s3_bucket", + bucket_name=s3_bucket_name, # Force delete? + trigger_rule=TriggerRule.ALL_DONE, + ) + + # Set dependencies + chain( + # TEST SETUP + test_context, + create_s3_bucket, + create_s3_object, + # TEST BODY + s3_to_wasb, + # TEST TEARDOWN + remove_s3_object, + remove_s3_bucket, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure when "tearDown" task with trigger + # rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)