From ddfda9ab3945ea7e28a59e59dd46577e00882d4a Mon Sep 17 00:00:00 2001 From: "alicja.kotyla" Date: Wed, 18 Sep 2024 14:39:09 +0200 Subject: [PATCH 1/7] Initial version of GCS source --- .../ragbits-document-search/pyproject.toml | 5 ++ .../document_search/documents/sources.py | 48 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/packages/ragbits-document-search/pyproject.toml b/packages/ragbits-document-search/pyproject.toml index 98c29ebd..3e8820bc 100644 --- a/packages/ragbits-document-search/pyproject.toml +++ b/packages/ragbits-document-search/pyproject.toml @@ -36,6 +36,11 @@ dependencies = [ "ragbits" ] +[project.optional-dependencies] +google-cloud-storage = [ + "gcloud-aio-storage~=9.3.0" +] + [tool.uv] dev-dependencies = [ "pre-commit~=3.8.0", diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py index e1fc9a4c..95c806b9 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py @@ -4,6 +4,13 @@ from pydantic import BaseModel +try: + from gcloud.aio.storage import Storage + + HAS_GCLOUD_AIO = True +except ImportError: + HAS_GCLOUD_AIO = False + class Source(BaseModel, ABC): """ @@ -54,3 +61,44 @@ async def fetch(self) -> Path: The local path to the object fetched from the source. """ return self.path + + +class GoogleCloudStorageSource(Source): + """ + An object representing a GCS file source. + """ + + source_type: Literal["google_cloud_storage_file"] = "google_cloud_storage_file" + + bucket: str + object_name: str + + path: Path + + def get_id(self) -> str: + """ + Get unique identifier of the object in the source. + + Returns: + Unique identifier. + """ + return f"bucket_name: {self.bucket}\nobject_name: {self.path}" + + async def fetch(self) -> Path: + """ + Fetch the source. + + Returns: + Tuple containing bucket name and file path. + + Raises: + ImportError: If the required 'gcloud' package is not installed for Google Cloud Storage source. + """ + + if not HAS_GCLOUD_AIO: + raise ImportError("You need to install the 'gcloud' package to use Google Cloud Storage") + + async with Storage() as client: + await client.download_to_filename(bucket=self.bucket, object_name=self.object_name, filename=self.path) + + return self.path From ac8b0007fc2a97fec6220b38af12b5b61abc8e6e Mon Sep 17 00:00:00 2001 From: "alicja.kotyla" Date: Wed, 18 Sep 2024 16:17:23 +0200 Subject: [PATCH 2/7] Changes in the method --- .../document_search/documents/sources.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py index 95c806b9..bd6a4f2d 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py @@ -73,7 +73,7 @@ class GoogleCloudStorageSource(Source): bucket: str object_name: str - path: Path + local_dir: Path = Path("tmp/ragbits/") def get_id(self) -> str: """ @@ -82,14 +82,17 @@ def get_id(self) -> str: Returns: Unique identifier. """ - return f"bucket_name: {self.bucket}\nobject_name: {self.path}" + return f"bucket_name: {self.bucket}\nobject_name: {self.object_name}" async def fetch(self) -> Path: """ - Fetch the source. + Fetch the file from Google Cloud Storage and store it locally. + + The file is downloaded to a local directory specified by `local_dir`. If the file already exists locally, + it will not be downloaded again. If the file doesn't exist locally, it will be fetched from GCS. Returns: - Tuple containing bucket name and file path. + Path: The local path to the downloaded file. Raises: ImportError: If the required 'gcloud' package is not installed for Google Cloud Storage source. @@ -98,7 +101,13 @@ async def fetch(self) -> Path: if not HAS_GCLOUD_AIO: raise ImportError("You need to install the 'gcloud' package to use Google Cloud Storage") - async with Storage() as client: - await client.download_to_filename(bucket=self.bucket, object_name=self.object_name, filename=self.path) + bucket_local_dir = self.local_dir / self.bucket - return self.path + bucket_local_dir.mkdir(parents=True, exist_ok=True) + path = bucket_local_dir / self.object_name + + if not path.is_file(): + async with Storage() as client: + await client.download_to_filename(bucket=self.bucket, object_name=self.object_name, filename=path) + + return path From 42813827e96e8e251e5dd71663113f7a34df4859 Mon Sep 17 00:00:00 2001 From: "alicja.kotyla" Date: Mon, 23 Sep 2024 09:38:50 +0200 Subject: [PATCH 3/7] Changes after review --- .../ragbits-document-search/pyproject.toml | 2 +- .../document_search/documents/document.py | 4 ++-- .../document_search/documents/sources.py | 18 ++++++++++++------ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/packages/ragbits-document-search/pyproject.toml b/packages/ragbits-document-search/pyproject.toml index 3e8820bc..54febcef 100644 --- a/packages/ragbits-document-search/pyproject.toml +++ b/packages/ragbits-document-search/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ ] [project.optional-dependencies] -google-cloud-storage = [ +gcs = [ "gcloud-aio-storage~=9.3.0" ] diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/document.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/document.py index 8469d8eb..8b8479d5 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/document.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/document.py @@ -5,7 +5,7 @@ from pydantic import BaseModel, Field -from ragbits.document_search.documents.sources import LocalFileSource +from ragbits.document_search.documents.sources import GCSSource, LocalFileSource class DocumentType(str, Enum): @@ -21,7 +21,7 @@ class DocumentMeta(BaseModel): """ document_type: DocumentType - source: Union[LocalFileSource] = Field(..., discriminator="source_type") + source: Union[LocalFileSource, GCSSource] = Field(..., discriminator="source_type") @property def id(self) -> str: diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py index bd6a4f2d..666f1c0d 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py @@ -1,3 +1,4 @@ +import os from abc import ABC, abstractmethod from pathlib import Path from typing import Literal @@ -11,6 +12,8 @@ except ImportError: HAS_GCLOUD_AIO = False +LOCAL_STORAGE_DIR_ENV = "LOCAL_STORAGE_DIR_ENV" + class Source(BaseModel, ABC): """ @@ -63,18 +66,16 @@ async def fetch(self) -> Path: return self.path -class GoogleCloudStorageSource(Source): +class GCSSource(Source): """ An object representing a GCS file source. """ - source_type: Literal["google_cloud_storage_file"] = "google_cloud_storage_file" + source_type: Literal["gcs_file"] = "gcs_file" bucket: str object_name: str - local_dir: Path = Path("tmp/ragbits/") - def get_id(self) -> str: """ Get unique identifier of the object in the source. @@ -96,12 +97,17 @@ async def fetch(self) -> Path: Raises: ImportError: If the required 'gcloud' package is not installed for Google Cloud Storage source. + ValueError: If LOCAL_STORAGE_DIR_ENV is not set. """ if not HAS_GCLOUD_AIO: - raise ImportError("You need to install the 'gcloud' package to use Google Cloud Storage") + raise ImportError("You need to install the 'gcloud-aio-storage' package to use Google Cloud Storage") + + if (local_dir_env := os.getenv(LOCAL_STORAGE_DIR_ENV)) is None: + raise ValueError(f"{LOCAL_STORAGE_DIR_ENV} environment variable is not set") - bucket_local_dir = self.local_dir / self.bucket + local_dir: Path = Path(local_dir_env) + bucket_local_dir = local_dir / self.bucket bucket_local_dir.mkdir(parents=True, exist_ok=True) path = bucket_local_dir / self.object_name From e610fb967e51c0a321f7c657de34c894efab07c8 Mon Sep 17 00:00:00 2001 From: "alicja.kotyla" Date: Tue, 24 Sep 2024 12:38:32 +0200 Subject: [PATCH 4/7] Fixes after review --- .../document_search/documents/sources.py | 21 +++++++++++++----- .../tests/unit/test_gcs_source.py | 22 +++++++++++++++++++ 2 files changed, 37 insertions(+), 6 deletions(-) create mode 100644 packages/ragbits-document-search/tests/unit/test_gcs_source.py diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py index 666f1c0d..d83520be 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py @@ -1,4 +1,6 @@ import os +import tempfile +import warnings from abc import ABC, abstractmethod from pathlib import Path from typing import Literal @@ -71,7 +73,7 @@ class GCSSource(Source): An object representing a GCS file source. """ - source_type: Literal["gcs_file"] = "gcs_file" + source_type: Literal["gcs"] = "gcs" bucket: str object_name: str @@ -91,29 +93,36 @@ async def fetch(self) -> Path: The file is downloaded to a local directory specified by `local_dir`. If the file already exists locally, it will not be downloaded again. If the file doesn't exist locally, it will be fetched from GCS. + The local directory is determined by the environment variable `LOCAL_STORAGE_DIR_ENV`. If this environment + variable is not set, a temporary directory is used. Returns: Path: The local path to the downloaded file. Raises: ImportError: If the required 'gcloud' package is not installed for Google Cloud Storage source. - ValueError: If LOCAL_STORAGE_DIR_ENV is not set. """ if not HAS_GCLOUD_AIO: raise ImportError("You need to install the 'gcloud-aio-storage' package to use Google Cloud Storage") if (local_dir_env := os.getenv(LOCAL_STORAGE_DIR_ENV)) is None: - raise ValueError(f"{LOCAL_STORAGE_DIR_ENV} environment variable is not set") + warnings.warn( + "The environment variable 'LOCAL_STORAGE_DIR_ENV' is not set. A temporary directory will be used " + "to store the file." + ) + local_dir = Path(tempfile.gettempdir()) + else: + local_dir = Path(local_dir_env) - local_dir: Path = Path(local_dir_env) bucket_local_dir = local_dir / self.bucket - bucket_local_dir.mkdir(parents=True, exist_ok=True) path = bucket_local_dir / self.object_name if not path.is_file(): async with Storage() as client: - await client.download_to_filename(bucket=self.bucket, object_name=self.object_name, filename=path) + content = await client.download(self.bucket, self.object_name) + with open(path, mode="wb+") as file_object: + file_object.write(content) return path diff --git a/packages/ragbits-document-search/tests/unit/test_gcs_source.py b/packages/ragbits-document-search/tests/unit/test_gcs_source.py new file mode 100644 index 00000000..78624a96 --- /dev/null +++ b/packages/ragbits-document-search/tests/unit/test_gcs_source.py @@ -0,0 +1,22 @@ +import os +from pathlib import Path + +import aiohttp +import pytest + +from ragbits.document_search.documents.sources import GCSSource + +TEST_FILE_PATH = Path(__file__) + +os.environ["LOCAL_STORAGE_DIR_ENV"] = TEST_FILE_PATH.parent.as_posix() + + +async def test_gcs_source_fetch(): + source = GCSSource(bucket="", object_name="test_gcs_source.py") + + path = await source.fetch() + assert path == TEST_FILE_PATH + + source = GCSSource(bucket="", object_name="not_found_file.py") + with pytest.raises(aiohttp.ClientResponseError): + await source.fetch() From 5dffc9b47980f766c948faa083792afbadc0a1d1 Mon Sep 17 00:00:00 2001 From: "alicja.kotyla" Date: Tue, 24 Sep 2024 13:02:53 +0200 Subject: [PATCH 5/7] Fix tests --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f127fe4a..4be2e0ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.10" dependencies = [ "ragbits[litellm,local]", "ragbits-dev-kit", - "ragbits-document-search" + "ragbits-document-search[gcs]" ] [tool.uv] From 3798a542873a6616dfda900a946e746e4b1256bb Mon Sep 17 00:00:00 2001 From: "alicja.kotyla" Date: Tue, 24 Sep 2024 13:10:20 +0200 Subject: [PATCH 6/7] More fixes --- .../src/ragbits/document_search/documents/sources.py | 5 ----- .../ragbits-document-search/tests/unit/test_gcs_source.py | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py index d83520be..e41f02d7 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py @@ -1,6 +1,5 @@ import os import tempfile -import warnings from abc import ABC, abstractmethod from pathlib import Path from typing import Literal @@ -107,10 +106,6 @@ async def fetch(self) -> Path: raise ImportError("You need to install the 'gcloud-aio-storage' package to use Google Cloud Storage") if (local_dir_env := os.getenv(LOCAL_STORAGE_DIR_ENV)) is None: - warnings.warn( - "The environment variable 'LOCAL_STORAGE_DIR_ENV' is not set. A temporary directory will be used " - "to store the file." - ) local_dir = Path(tempfile.gettempdir()) else: local_dir = Path(local_dir_env) diff --git a/packages/ragbits-document-search/tests/unit/test_gcs_source.py b/packages/ragbits-document-search/tests/unit/test_gcs_source.py index 78624a96..df390361 100644 --- a/packages/ragbits-document-search/tests/unit/test_gcs_source.py +++ b/packages/ragbits-document-search/tests/unit/test_gcs_source.py @@ -18,5 +18,5 @@ async def test_gcs_source_fetch(): assert path == TEST_FILE_PATH source = GCSSource(bucket="", object_name="not_found_file.py") - with pytest.raises(aiohttp.ClientResponseError): + with pytest.raises(aiohttp.ClientConnectorError): await source.fetch() From 366aade51417871cda38b4bed91cb67d1c01fd76 Mon Sep 17 00:00:00 2001 From: akotyla <79326805+akotyla@users.noreply.github.com> Date: Wed, 25 Sep 2024 08:42:11 +0200 Subject: [PATCH 7/7] Update packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mateusz HordyƄski <26008518+mhordynski@users.noreply.github.com> --- .../src/ragbits/document_search/documents/sources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py index e41f02d7..3c7240de 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/documents/sources.py @@ -84,7 +84,7 @@ def get_id(self) -> str: Returns: Unique identifier. """ - return f"bucket_name: {self.bucket}\nobject_name: {self.object_name}" + return f"gcs:gs://{self.bucket}/{self.object_name}" async def fetch(self) -> Path: """