Skip to content

Commit

Permalink
Fix multipart cache (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelzubarev authored Oct 30, 2023
1 parent 309457b commit 136ec7d
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 75 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,8 @@ Changes are grouped as follows

### Added
- Support for native multipart file upload for CDF in Azure and Google.

## [0.2.10] - 2023-11-1

### Fixed
- Fix internal cache accumulated if big files are handled with native multipart implementation
2 changes: 1 addition & 1 deletion cognite/cdffs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .spec import CdfFileSystem

__version__ = "0.2.9"
__version__ = "0.2.10"
__all__ = ["CdfFileSystem"]

fsspec.register_implementation(CdfFileSystem.protocol, CdfFileSystem)
12 changes: 10 additions & 2 deletions cognite/cdffs/az_upload_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient):
"""Initializer."""
super().__init__(metadata, cognite_client)
self.total_size = 0
self.logger = logging.getLogger("cdffs.AzureUploadStrategy")

def _generate_block_blob_block_id(self, index: int, block_name_prefix: str = __file__.__str__()) -> str:
block_id = f"{block_name_prefix.ljust(19, 'x')[:19]}{index:05}".encode("utf-8")
Expand All @@ -37,10 +38,14 @@ def upload_chunk(self, data: bytes, index: int) -> None:
},
)
response.raise_for_status()
self.total_size += len(data) # track total object size
with self.lock:
self.total_size += len(data) # track total object size
self.indexes.append(index)
logging.info(f"Finished uploading block {index}. Took {response.elapsed.total_seconds()} sec")

self.logger.debug(
f"Finished uploading block {index}. Current file size: {self.total_size}. "
f"Took {response.elapsed.total_seconds()} sec"
)

def merge_chunks(self) -> int:
"""Merge all uploaded blocks into the final blob."""
Expand All @@ -62,5 +67,8 @@ def merge_chunks(self) -> int:
},
)
response.raise_for_status()
self.logger.debug(
f"Merged blocks. Total file size: {self.total_size}. " f"Took {response.elapsed.total_seconds()} sec"
)

return self.total_size
10 changes: 8 additions & 2 deletions cognite/cdffs/file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def __init__(self) -> None:
self.session = requests.Session()

def download_file(
self, download_url: str, start_byte: Union[int, None] = None, end_byte: Union[int, None] = None
self,
download_url: str,
start_byte: Union[int, None] = None,
end_byte: Union[int, None] = None,
) -> bytes:
"""Download the file from a cloud storage using the download URL & offsets provided.
Expand Down Expand Up @@ -65,7 +68,10 @@ def add_or_update_url(self, external_id: str, download_url: str) -> None:
external_id (str): External Id for the file.
download_url (str): Download URL for the file.
"""
self._url_container[external_id] = {"url": download_url, "expiry_time": time.time()}
self._url_container[external_id] = {
"url": download_url,
"expiry_time": time.time(),
}

def get_url(self, external_id: str) -> Any:
"""Get download url from a cache if they are valid.
Expand Down
12 changes: 9 additions & 3 deletions cognite/cdffs/gcp_upload_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@ def __init__(self, metadata: FileMetadata, cognite_client: CogniteClient):
self.chunk_cache: Dict = OrderedDict() # Store chunks in the order received
self.last_written_index = -1 # The last consecutive chunk index that was written
self.last_written_byte = -1 # The last byte position that was written
self.logger = logging.getLogger("cdffs.GoogleUploadStrategy")

def _write_chunk(self, index: int, data: bytes) -> None:
start_byte = self.last_written_byte + 1
end_byte = start_byte + len(data) - 1

headers = {"Content-Length": str(len(data)), "Content-Range": f"bytes {start_byte}-{end_byte}/*"}
headers = {
"Content-Length": str(len(data)),
"Content-Range": f"bytes {start_byte}-{end_byte}/*",
}

response = self.session.put(self.params["upload_url"], headers=headers, data=data)
response.raise_for_status()
self.indexes.append(index)

logging.info(f"Finished uploading chunk {index}. Took {response.elapsed.total_seconds()} sec")
self.logger.debug(
f"Finished uploading chunk {index}=[{start_byte}:{end_byte}]. Took {response.elapsed.total_seconds()} sec"
)

# Update the last written byte position
self.last_written_byte = end_byte
Expand All @@ -46,7 +52,7 @@ def upload_chunk(self, data: bytes, index: int) -> None:
del self.chunk_cache[next_index] # Remove the written chunk from cache
self.last_written_index = next_index

logging.info(f"Received chunk {index}. Cache size: {len(self.chunk_cache)} chunks")
self.logger.debug(f"Received chunk {index}. Cache size: {len(self.chunk_cache)} chunks")

def merge_chunks(self) -> int:
"""Google Cloud Storage handles merging internally. So, this method is a no-op for the GCS strategy."""
Expand Down
76 changes: 60 additions & 16 deletions cognite/cdffs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

Expand All @@ -19,7 +20,7 @@
from fsspec.caching import AllBytes
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import DEFAULT_BLOCK_SIZE
from retry import retry
from tenacity import after_log, retry, stop_after_attempt, wait_fixed

from .az_upload_strategy import AzureUploadStrategy
from .credentials import get_connection_config
Expand Down Expand Up @@ -167,7 +168,7 @@ def split_path(self, path: str, validate_suffix: bool = True) -> Tuple[str, str,
elif self._suffix_exists(path):
external_id_prefix = [x for x in Path(path).parts if Path(x).suffix][0]
root_dir = path[: path.find(external_id_prefix)].strip("/")
external_id = path[path.find(external_id_prefix) :]
external_id = path[path.find(external_id_prefix) :] # noqa

elif Path(path).parts and not validate_suffix:
external_id_prefix = ""
Expand All @@ -187,7 +188,11 @@ def cache_path(self, root_dir: str, external_id: str, file_size: int) -> None:
file_size (int): File size (in bytes).
"""
inp_path = Path(root_dir, external_id)
file_meta = {"type": "file", "name": str(inp_path).lstrip("/"), "size": file_size}
file_meta = {
"type": "file",
"name": str(inp_path).lstrip("/"),
"size": file_size,
}
parent_path = str(inp_path.parent).lstrip("/")

if parent_path not in self.dircache:
Expand Down Expand Up @@ -272,15 +277,21 @@ def _ls(self, root_dir: str, external_id_prefix: str, limit: int = -1) -> None:
if not file_met.external_id:
# Files are expected to have a valid external id.
continue
inp_path = Path(file_met.directory if file_met.directory else "/", file_met.external_id)
inp_path = Path(
file_met.directory if file_met.directory else "/",
file_met.external_id,
)
file_name = str(inp_path).lstrip("/")
file_size = int(file_met.metadata.get("size", -1)) if file_met.metadata else -1
file_meta = {"type": "file", "name": file_name, "size": file_size}

# Add directory information.
parent_path = str(inp_path.parent).lstrip("/")
if parent_path not in directories:
directories[parent_path] = {"type": "directory", "name": parent_path.lstrip("/")}
directories[parent_path] = {
"type": "directory",
"name": parent_path.lstrip("/"),
}

if file_name not in _file_write_cache:
if parent_path not in self.dircache:
Expand Down Expand Up @@ -508,7 +519,10 @@ def open(
)

def read_file(
self, external_id: str, start_byte: Union[int, None] = None, end_byte: Union[int, None] = None
self,
external_id: str,
start_byte: Union[int, None] = None,
end_byte: Union[int, None] = None,
) -> Any:
"""Open and read the contents of a file.
Expand Down Expand Up @@ -547,7 +561,11 @@ def read_file(
raise FileNotFoundError from cognite_exp

def cat(
self, path: Union[str, list], recursive: bool = False, on_error: str = "raise", **kwargs: Optional[Any]
self,
path: Union[str, list],
recursive: bool = False,
on_error: str = "raise",
**kwargs: Optional[Any],
) -> Union[bytes, Any, Dict[str, bytes]]:
"""Open and read the contents of a file(s).
Expand Down Expand Up @@ -669,13 +687,18 @@ def __init__(
)

self.write_strategy: UploadStrategy
if mode == "wb":
self.buffer = BytesIO()
self.offset = None
self.buffered = False

if fs.upload_strategy == "google" and mode != "rb":
self.write_strategy = GoogleUploadStrategy(self.file_metadata, self.cognite_client)
elif fs.upload_strategy == "azure" and mode != "rb":
self.write_strategy = AzureUploadStrategy(self.file_metadata, self.cognite_client)
elif mode != "rb":
self.write_strategy = InMemoryUploadStrategy(self.file_metadata, self.cognite_client)
self.buffered = True # InMemoryUpload requires all data to be cached until last chunk comes

super().__init__(
fs,
Expand All @@ -686,7 +709,6 @@ def __init__(
**kwargs,
)

@retry(exceptions=_COMMON_EXCEPTIONS, tries=2)
def _upload_chunk(self, final: bool = False) -> bool:
"""Upload file contents to CDF.
Expand All @@ -700,14 +722,19 @@ def _upload_chunk(self, final: bool = False) -> bool:
RuntimeError: When an unexpected error occurred.
"""

@retry(tries=3, logger=logging.getLogger("upload_chunk"))
@retry(
reraise=False,
stop=stop_after_attempt(5),
wait=wait_fixed(0.5),
after=after_log(logging.getLogger("cdffs"), logging.INFO),
)
def strategy_submit(data: bytes, index: int) -> None:
self.write_strategy.upload_chunk(data, index)

buffer_length = len(self.buffer.getvalue())
blocks = [self.buffer.getvalue()[i : i + self.blocksize] for i in range(0, buffer_length, self.blocksize)]

logging.info(f"{len(blocks)} full blocks discovered")
blocks = [
self.buffer.getvalue()[i : i + self.blocksize] for i in range(0, buffer_length, self.blocksize) # noqa
]

# Upload blocks in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
Expand All @@ -719,19 +746,36 @@ def strategy_submit(data: bytes, index: int) -> None:

# If it's the final block, then send a merge request
if final:
total_size = self.write_strategy.merge_chunks()

self.cognite_client.files.update(
item=FileMetadataUpdate(external_id=self.external_id).metadata.add({"size": total_size})
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(0.5),
after=after_log(logging.getLogger("cdffs"), logging.INFO),
)
def safe_merge() -> int:
return self.write_strategy.merge_chunks()

total_size = safe_merge()

@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(0.5),
after=after_log(logging.getLogger("cdffs"), logging.INFO),
)
def safe_file_update(_size: int) -> None:
self.cognite_client.files.update(
item=FileMetadataUpdate(external_id=self.external_id).metadata.add({"size": _size})
)

safe_file_update(total_size)

self.fs.cache_path(
self.root_dir,
self.external_id,
total_size,
)

return final
return final if self.buffered else True # tell fsspec to cache buffer or not

def _fetch_range(self, start: int, end: int) -> Any:
"""Read file contents from CDF.
Expand Down
64 changes: 15 additions & 49 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 136ec7d

Please sign in to comment.