Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for cloud native multipart upload #103

Merged
merged 19 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
hooks:
- id: mypy
exclude: ^(tests/|examples/|docs/)
additional_dependencies: ["types-requests"]
additional_dependencies: ["types-requests", "types-retry"]
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,8 @@ Changes are grouped as follows
- Dependency updates for docs & source code.

[unreleased]:

## [0.2.9] - 2023-10-26

### Added
- Support for native multipart file upload for CDF in Azure and Google.
2 changes: 1 addition & 1 deletion cognite/cdffs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .spec import CdfFileSystem

__version__ = "0.2.8"
__version__ = "0.2.9"
__all__ = ["CdfFileSystem"]

fsspec.register_implementation(CdfFileSystem.protocol, CdfFileSystem)
66 changes: 66 additions & 0 deletions cognite/cdffs/az_upload_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Azure native file upload strategy."""
import base64
import logging

from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata

from .upload_strategy import UploadStrategy


class AzureUploadStrategy(UploadStrategy):
"""Azure implementation for upload strategy."""

def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient):
"""Initializer."""
super().__init__(metadata, cognite_client)
self.total_size = 0

def _generate_block_blob_block_id(self, index: int, block_name_prefix: str = __file__.__str__()) -> str:
block_id = f"{block_name_prefix.ljust(19, 'x')[:19]}{index:05}".encode("utf-8")

return base64.b64encode(block_id).decode("utf-8")

def upload_chunk(self, data: bytes, index: int) -> None:
"""Uploads a single block."""
url = self.params["upload_url"].split("?")
block_id = self._generate_block_blob_block_id(index=index)
upload_block_url = f"{url[0]}?blockid={block_id}&comp=block&{url[1]}"
response = self.session.put(
upload_block_url,
data=data,
headers={
"Accept": "application/xml",
"Content-Type": "application/octet-stream",
"Content-Length": str(len(data)),
"x-ms-version": "2019-12-12",
},
)
response.raise_for_status()
self.total_size += len(data) # track total object size
with self.lock:
self.indexes.append(index)
logging.info(f"Finished uploading block {index}. Took {response.elapsed.total_seconds()} sec")

def merge_chunks(self) -> int:
"""Merge all uploaded blocks into the final blob."""
upload_url = self.params["upload_url"]
commit_url = f"{upload_url}&comp=blocklist"
block_list_xml = '<?xml version="1.0" encoding="utf-8"?><BlockList>'
for index in sorted(self.indexes):
block_list_xml += f"<Latest>{self._generate_block_blob_block_id(index)}</Latest>"
block_list_xml += "</BlockList>"

response = self.session.put(
commit_url,
data=block_list_xml,
headers={
"x-ms-blob-content-type": self.params["mime_type"],
"Content-Type": "application/xml",
"Content-Length": str(len(block_list_xml)),
"x-ms-version": "2019-12-12",
},
)
response.raise_for_status()

return self.total_size
53 changes: 53 additions & 0 deletions cognite/cdffs/gcp_upload_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Google native file upload strategy."""
import logging
from collections import OrderedDict
from typing import Dict

from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata

from .upload_strategy import UploadStrategy


class GoogleUploadStrategy(UploadStrategy):
"""Google implementation for upload strategy."""

def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient):
"""Initializer."""
super().__init__(metadata, cognite_client)
self.chunk_cache: Dict = OrderedDict() # Store chunks in the order received
self.last_written_index = -1 # The last consecutive chunk index that was written
self.last_written_byte = -1 # The last byte position that was written

def _write_chunk(self, index: int, data: bytes) -> None:
start_byte = self.last_written_byte + 1
end_byte = start_byte + len(data) - 1

headers = {"Content-Length": str(len(data)), "Content-Range": f"bytes {start_byte}-{end_byte}/*"}

response = self.session.put(self.params["upload_url"], headers=headers, data=data)
response.raise_for_status()
self.indexes.append(index)

logging.info(f"Finished uploading chunk {index}. Took {response.elapsed.total_seconds()} sec")

# Update the last written byte position
self.last_written_byte = end_byte

def upload_chunk(self, data: bytes, index: int) -> None:
"""Uploads a single chunk."""
with self.lock:
self.chunk_cache[index] = data

# Check for consecutive chunks in cache and write them
while self.last_written_index + 1 in self.chunk_cache:
next_index = self.last_written_index + 1
self._write_chunk(next_index, self.chunk_cache[next_index])
del self.chunk_cache[next_index] # Remove the written chunk from cache
self.last_written_index = next_index

logging.info(f"Received chunk {index}. Cache size: {len(self.chunk_cache)} chunks")

def merge_chunks(self) -> int:
"""Google Cloud Storage handles merging internally. So, this method is a no-op for the GCS strategy."""
return self.last_written_byte + 1
30 changes: 30 additions & 0 deletions cognite/cdffs/memory_upload_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Inmemory file upload strategy."""
import threading
from typing import Dict

from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata

from .upload_strategy import UploadStrategy


class InMemoryUploadStrategy(UploadStrategy):
"""Inmemory implementation for upload strategy."""

def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient):
"""Initializer."""
super().__init__(metadata, cognite_client)
self.blocks: Dict[int, bytes] = {}
self.lock = threading.Lock()
self.metadata = metadata

def upload_chunk(self, data: bytes, index: int) -> None:
"""Upload single chunk."""
with self.lock:
self.blocks[index] = data

def merge_chunks(self) -> int:
"""Merge all uploaded blocks into the final blob."""
content = b"".join([self.blocks[key] for key in sorted(self.blocks.keys())])
self.cognite_client.files.upload_bytes(content=content, **self.metadata.dump(), overwrite=True)
return len(content)
117 changes: 79 additions & 38 deletions cognite/cdffs/spec.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""File System Specification for CDF Files."""
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import requests
from cognite.client import ClientConfig, CogniteClient
from cognite.client.data_classes.files import FileMetadata
from cognite.client.data_classes.files import FileMetadata, FileMetadataUpdate
from cognite.client.exceptions import (
CogniteAPIError,
CogniteAuthError,
Expand All @@ -16,12 +18,24 @@
from fsspec import AbstractFileSystem
from fsspec.caching import AllBytes
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import DEFAULT_BLOCK_SIZE
from retry import retry

from .az_upload_strategy import AzureUploadStrategy
from .credentials import get_connection_config
from .file_handler import FileException, FileHandler
from .gcp_upload_strategy import GoogleUploadStrategy
from .memory_upload_strategy import InMemoryUploadStrategy
from .upload_strategy import UploadStrategy

logger = logging.getLogger(__name__)
_COMMON_EXCEPTIONS = (CogniteAuthError, CogniteConnectionError, CogniteConnectionRefused, CogniteAPIError)
_COMMON_EXCEPTIONS = (
CogniteAuthError,
CogniteConnectionError,
CogniteConnectionRefused,
CogniteAPIError,
requests.exceptions.RequestException,
)
_CACHE_SLEEP_INTERVAL = 5


Expand All @@ -48,6 +62,7 @@ def __init__(
self,
connection_config: Optional[ClientConfig] = None,
file_metadata: FileMetadata = FileMetadata(metadata={}),
upload_strategy: str = "inmemory",
**kwargs: Optional[Any],
) -> None:
"""Initialize the CdfFileSystem and creates a connection to CDF.
Expand Down Expand Up @@ -78,6 +93,7 @@ def __init__(
self.download_retries: bool = kwargs.get("download_retries", True) # type: ignore
self.file_cache: Dict[str, Dict[str, Any]] = {}
self.file_handler: FileHandler = FileHandler()
self.upload_strategy = upload_strategy

# Create a connection to Cdf
self.do_connect(connection_config, **kwargs)
Expand Down Expand Up @@ -461,7 +477,7 @@ def open(
self,
path: str,
mode: str = "rb",
block_size: str = "default",
block_size: int = DEFAULT_BLOCK_SIZE,
cache_options: Optional[Dict[Any, Any]] = None,
**kwargs: Optional[Any],
) -> "CdfFile":
Expand Down Expand Up @@ -612,7 +628,7 @@ def __init__(
directory: str,
external_id: str,
mode: str = "rb",
block_size: str = "default",
block_size: int = DEFAULT_BLOCK_SIZE,
cache_options: Optional[Union[Dict[Any, Any], None]] = None,
**kwargs: Optional[Any],
) -> None:
Expand All @@ -625,19 +641,41 @@ def __init__(
directory (str): Root directory for the file.
external_id (str): External Id for the file.
mode (str): Mode to work with the file.
block_size (str): Block size to read/write the file.
block_size (int): Block size to read/write the file.
cache_options (str): Additional caching options for the file.
**kwargs (Optional[Any]): Set of keyword arguments to read/write the file contents.
"""
self.index = 0
self.cognite_client: CogniteClient = cognite_client
self.root_dir: str = directory
self.external_id: str = external_id
self.all_bytes_caching: bool = "cache_type" in kwargs and kwargs["cache_type"] == "all"
self.file_metadata: FileMetadata = FileMetadata(metadata={})
self.file_metadata: FileMetadata = FileMetadata(
**{
**fs.file_metadata.dump(),
"name": Path(path).name,
"external_id": self.external_id,
"directory": self.root_dir,
}
)

# User can use a file metadata for each file when they write the files.
if isinstance(kwargs.get("file_metadata"), FileMetadata) and mode != "rb":
self.file_metadata = kwargs.pop("file_metadata")
self.file_metadata = FileMetadata(
**{
**self.file_metadata.dump(),
**kwargs.pop("file_metadata").dump(), # type: ignore
}
)

self.write_strategy: UploadStrategy

if fs.upload_strategy == "google" and mode != "rb":
self.write_strategy = GoogleUploadStrategy(self.file_metadata, self.cognite_client)
elif fs.upload_strategy == "azure" and mode != "rb":
self.write_strategy = AzureUploadStrategy(self.file_metadata, self.cognite_client)
elif mode != "rb":
self.write_strategy = InMemoryUploadStrategy(self.file_metadata, self.cognite_client)

super().__init__(
fs,
Expand All @@ -648,6 +686,7 @@ def __init__(
**kwargs,
)

@retry(exceptions=_COMMON_EXCEPTIONS, tries=2)
def _upload_chunk(self, final: bool = False) -> bool:
"""Upload file contents to CDF.

Expand All @@ -660,37 +699,39 @@ def _upload_chunk(self, final: bool = False) -> bool:
Raises:
RuntimeError: When an unexpected error occurred.
"""
try:
if final:
file_metadata = self.file_metadata.metadata or self.fs.file_metadata.metadata
file_metadata = (
dict(file_metadata, **{"size": self.buffer.getbuffer().nbytes})
if file_metadata
else {"size": self.buffer.getbuffer().nbytes}
)
response = self.cognite_client.files.upload_bytes(
content=self.buffer.getbuffer(),
name=Path(self.external_id).name,
external_id=self.external_id,
directory=self.root_dir,
source=self.file_metadata.source or self.fs.file_metadata.source,
asset_ids=self.file_metadata.asset_ids or self.fs.file_metadata.asset_ids,
data_set_id=self.file_metadata.data_set_id or self.fs.file_metadata.data_set_id,
mime_type=self.file_metadata.mime_type or self.fs.file_metadata.mime_type,
geo_location=self.file_metadata.geo_location or self.fs.file_metadata.geo_location,
metadata=file_metadata,
overwrite=True,
)

self.fs.cache_path(
self.root_dir,
self.external_id,
int(response.metadata.get("size")) if response.metadata.get("size") else -1,
)

return final
except _COMMON_EXCEPTIONS as cognite_exp:
raise RuntimeError from cognite_exp

@retry(tries=3, logger=logging.getLogger("upload_chunk"))
def strategy_submit(data: bytes, index: int) -> None:
self.write_strategy.upload_chunk(data, index)

buffer_length = len(self.buffer.getvalue())
blocks = [self.buffer.getvalue()[i : i + self.blocksize] for i in range(0, buffer_length, self.blocksize)]

logging.info(f"{len(blocks)} full blocks discovered")

# Upload blocks in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
for arg in zip(blocks, range(self.index, self.index + len(blocks))):
executor.submit(strategy_submit, arg[0], arg[1])

# Update the index
self.index += len(blocks)

# If it's the final block, then send a merge request
if final:
total_size = self.write_strategy.merge_chunks()

self.cognite_client.files.update(
item=FileMetadataUpdate(external_id=self.external_id).metadata.add({"size": total_size})
)

self.fs.cache_path(
self.root_dir,
self.external_id,
total_size,
)

return final

def _fetch_range(self, start: int, end: int) -> Any:
"""Read file contents from CDF.
Expand Down
Loading