Skip to content

Commit

Permalink
Update AzureBlobStorageCredentials to close default credentials whe…
Browse files Browse the repository at this point in the history
…n used as a context manager (PrefectHQ#16071)
  • Loading branch information
desertaxle authored Nov 20, 2024
1 parent a57b324 commit 42ea12a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 127 deletions.
256 changes: 133 additions & 123 deletions src/integrations/prefect-azure/prefect_azure/blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ def example_blob_storage_download_flow():
logger = get_run_logger()
logger.info("Downloading blob from container %s with key %s", container, blob)

async with blob_storage_credentials.get_blob_client(container, blob) as blob_client:
blob_obj = await blob_client.download_blob()
async with blob_storage_credentials as credentials:
async with credentials.get_blob_client(container, blob) as blob_client:
blob_obj = await blob_client.download_blob()
output = await blob_obj.content_as_bytes()

return output
Expand Down Expand Up @@ -123,8 +124,9 @@ def example_blob_storage_upload_flow():
if blob is None:
blob = str(uuid.uuid4())

async with blob_storage_credentials.get_blob_client(container, blob) as blob_client:
await blob_client.upload_blob(data, overwrite=overwrite)
async with blob_storage_credentials as credentials:
async with credentials.get_blob_client(container, blob) as blob_client:
await blob_client.upload_blob(data, overwrite=overwrite)

return blob

Expand Down Expand Up @@ -176,17 +178,16 @@ def example_blob_storage_list_flow():
logger = get_run_logger()
logger.info("Listing blobs from container %s", container)

async with blob_storage_credentials.get_container_client(
container
) as container_client:
blobs = [
blob
async for blob in container_client.list_blobs(
name_starts_with=name_starts_with, include=include, **kwargs
)
]
async with blob_storage_credentials as credentials:
async with credentials.get_container_client(container) as container_client:
blobs = [
blob
async for blob in container_client.list_blobs(
name_starts_with=name_starts_with, include=include, **kwargs
)
]

return blobs
return blobs


class AzureBlobStorageContainer(
Expand Down Expand Up @@ -278,32 +279,35 @@ async def download_folder_to_path(
to_folder,
)
full_container_path = self._get_path_relative_to_base_folder(from_folder)
async with self.credentials.get_container_client(
self.container_name
) as container_client:
try:
async for blob in container_client.list_blobs(
name_starts_with=full_container_path
):
blob_path = blob.name
local_path = Path(to_folder) / Path(blob_path).relative_to(
full_container_path
)
local_path.parent.mkdir(parents=True, exist_ok=True)
async with container_client.get_blob_client(
blob_path
) as blob_client:
blob_obj = await blob_client.download_blob(**download_kwargs)

with local_path.open(mode="wb") as to_file:
await blob_obj.readinto(to_file)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc
async with self.credentials as credentials:
async with credentials.get_container_client(
self.container_name
) as container_client:
try:
async for blob in container_client.list_blobs(
name_starts_with=full_container_path
):
blob_path = blob.name
local_path = Path(to_folder) / Path(blob_path).relative_to(
full_container_path
)
local_path.parent.mkdir(parents=True, exist_ok=True)
async with container_client.get_blob_client(
blob_path
) as blob_client:
blob_obj = await blob_client.download_blob(
**download_kwargs
)

with local_path.open(mode="wb") as to_file:
await blob_obj.readinto(to_file)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc

return Path(to_folder)
return Path(to_folder)

@sync_compatible
async def download_object_to_file_object(
Expand Down Expand Up @@ -349,19 +353,20 @@ async def download_object_to_file_object(
"Downloading object from container %s to file object", self.container_name
)
full_container_path = self._get_path_relative_to_base_folder(from_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
blob_obj = await blob_client.download_blob(**download_kwargs)
await blob_obj.download_to_stream(to_file_object)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_file_object
async with self.credentials as credentials:
async with credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
blob_obj = await blob_client.download_blob(**download_kwargs)
await blob_obj.download_to_stream(to_file_object)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_file_object

@sync_compatible
async def download_object_to_path(
Expand Down Expand Up @@ -409,24 +414,25 @@ async def download_object_to_path(
to_path,
)
full_container_path = self._get_path_relative_to_base_folder(from_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
blob_obj = await blob_client.download_blob(**download_kwargs)
async with self.credentials as credentials:
async with credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
blob_obj = await blob_client.download_blob(**download_kwargs)

path = Path(to_path)
path = Path(to_path)

path.parent.mkdir(parents=True, exist_ok=True)
path.parent.mkdir(parents=True, exist_ok=True)

with path.open(mode="wb") as to_file:
await blob_obj.readinto(to_file)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc
return Path(to_path)
with path.open(mode="wb") as to_file:
await blob_obj.readinto(to_file)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to download from container"
f" {self.container_name}: {exc.reason}"
) from exc
return path

@sync_compatible
async def upload_from_file_object(
Expand Down Expand Up @@ -471,18 +477,19 @@ async def upload_from_file_object(
"Uploading object to container %s with key %s", self.container_name, to_path
)
full_container_path = self._get_path_relative_to_base_folder(to_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
await blob_client.upload_blob(from_file_object, **upload_kwargs)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload from container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_path
async with self.credentials as credentials:
async with credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
await blob_client.upload_blob(from_file_object, **upload_kwargs)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload from container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_path

@sync_compatible
async def upload_from_path(
Expand Down Expand Up @@ -526,19 +533,20 @@ async def upload_from_path(
"Uploading object to container %s with key %s", self.container_name, to_path
)
full_container_path = self._get_path_relative_to_base_folder(to_path)
async with self.credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
with open(from_path, "rb") as f:
await blob_client.upload_blob(f, **upload_kwargs)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload to container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_path
async with self.credentials as credentials:
async with credentials.get_blob_client(
self.container_name, full_container_path
) as blob_client:
try:
with open(from_path, "rb") as f:
await blob_client.upload_blob(f, **upload_kwargs)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload to container"
f" {self.container_name}: {exc.reason}"
) from exc

return to_path

@sync_compatible
async def upload_from_folder(
Expand Down Expand Up @@ -587,29 +595,30 @@ async def upload_from_folder(
to_folder,
)
full_container_path = self._get_path_relative_to_base_folder(to_folder)
async with self.credentials.get_container_client(
self.container_name
) as container_client:
if not Path(from_folder).is_dir():
raise ValueError(f"{from_folder} is not a directory")
for path in Path(from_folder).rglob("*"):
if path.is_file():
blob_path = Path(full_container_path) / path.relative_to(
from_folder
)
async with container_client.get_blob_client(
blob_path.as_posix()
) as blob_client:
try:
await blob_client.upload_blob(
path.read_bytes(), **upload_kwargs
)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload to "
f"container {self.container_name}: {exc.reason}"
) from exc
return full_container_path
async with self.credentials as credentials:
async with credentials.get_container_client(
self.container_name
) as container_client:
if not Path(from_folder).is_dir():
raise ValueError(f"{from_folder} is not a directory")
for path in Path(from_folder).rglob("*"):
if path.is_file():
blob_path = Path(full_container_path) / path.relative_to(
from_folder
)
async with container_client.get_blob_client(
blob_path.as_posix()
) as blob_client:
try:
await blob_client.upload_blob(
path.read_bytes(), **upload_kwargs
)
except ResourceNotFoundError as exc:
raise RuntimeError(
"An error occurred when attempting to upload to "
f"container {self.container_name}: {exc.reason}"
) from exc
return full_container_path

@sync_compatible
async def get_directory(
Expand Down Expand Up @@ -736,11 +745,12 @@ async def list_blobs(self) -> List[str]:
self.container_name,
)

async with self.credentials.get_container_client(
self.container_name
) as container_client:
blobs = container_client.list_blobs()
filenames = []
async for blob in blobs:
filenames.append(blob.name)
return filenames
async with self.credentials as credentials:
async with credentials.get_container_client(
self.container_name
) as container_client:
blobs = container_client.list_blobs()
filenames = []
async for blob in blobs:
filenames.append(blob.name)
return filenames
Loading

0 comments on commit 42ea12a

Please sign in to comment.