diff --git a/pkgs/base/swarmauri_base/vector_stores/VectorStoreBase.py b/pkgs/base/swarmauri_base/vector_stores/VectorStoreBase.py new file mode 100644 index 00000000..4a94bdf1 --- /dev/null +++ b/pkgs/base/swarmauri_base/vector_stores/VectorStoreBase.py @@ -0,0 +1,135 @@ +import json +from abc import ABC, abstractmethod +from typing import List, Optional, Literal +from pydantic import Field, PrivateAttr +from swarmauri_core.ComponentBase import ComponentBase, ResourceTypes +from swarmauri_standard.documents.Document import Document +from swarmauri_core.vector_stores.IVectorStore import IVectorStore + + +class VectorStoreBase(IVectorStore, ComponentBase): + """ + Abstract base class for document stores, implementing the IVectorStore interface. + + This class provides a standard API for adding, updating, getting, and deleting documents in a vector store. + The specifics of storing (e.g., in a database, in-memory, or file system) are to be implemented by concrete subclasses. + """ + + documents: List[Document] = [] + _embedder = PrivateAttr() + _distance = PrivateAttr() + resource: Optional[str] = Field(default=ResourceTypes.VECTOR_STORE.value) + type: Literal["VectorStoreBase"] = "VectorStoreBase" + + @property + def embedder(self): + return self._embedder + + @abstractmethod + def add_document(self, document: Document) -> None: + """ + Add a single document to the document store. + + Parameters: + - document (IDocument): The document to be added to the store. + """ + pass + + @abstractmethod + def add_documents(self, documents: List[Document]) -> None: + """ + Add multiple documents to the document store in a batch operation. + + Parameters: + - documents (List[IDocument]): A list of documents to be added to the store. + """ + pass + + @abstractmethod + def get_document(self, id: str) -> Optional[Document]: + """ + Retrieve a single document by its identifier. + + Parameters: + - doc_id (str): The unique identifier of the document to retrieve. + + Returns: + - Optional[IDocument]: The requested document if found; otherwise, None. + """ + pass + + @abstractmethod + def get_all_documents(self) -> List[Document]: + """ + Retrieve all documents stored in the document store. + + Returns: + - List[IDocument]: A list of all documents in the store. + """ + pass + + @abstractmethod + def update_document(self, id: str, updated_document: Document) -> None: + """ + Update a document in the document store. + + Parameters: + - doc_id (str): The unique identifier of the document to update. + - updated_document (IDocument): The updated document instance. + """ + pass + + @abstractmethod + def delete_document(self, id: str) -> None: + """ + Delete a document from the document store by its identifier. + + Parameters: + - doc_id (str): The unique identifier of the document to delete. + """ + pass + + def clear_documents(self) -> None: + """ + Deletes all documents from the vector store + + """ + self.documents = [] + + def document_count(self): + return len(self.documents) + + def document_dumps(self) -> str: + """ + Placeholder + """ + return json.dumps([each.to_dict() for each in self.documents]) + + def document_dump(self, file_path: str) -> None: + """ + Placeholder + """ + with open(file_path, "w", encoding="utf-8") as f: + json.dump( + [each.to_dict() for each in self.documents], + f, + ensure_ascii=False, + indent=4, + ) + + def document_loads(self, json_data: str) -> None: + """ + Placeholder + """ + self.documents = [ + globals()[each["type"]].from_dict(each) for each in json.loads(json_data) + ] + + def document_load(self, file_path: str) -> None: + """ + Placeholder + """ + with open(file_path, "r", encoding="utf-8") as f: + self.documents = [ + globals()[each["type"]].from_dict(each) for each in json.load(file_path) + ] \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vector_stores/VectorStoreCloudMixin.py b/pkgs/base/swarmauri_base/vector_stores/VectorStoreCloudMixin.py new file mode 100644 index 00000000..353caaa7 --- /dev/null +++ b/pkgs/base/swarmauri_base/vector_stores/VectorStoreCloudMixin.py @@ -0,0 +1,23 @@ +from typing import Optional +from pydantic import Field + +from swarmauri_core.vector_stores.ICloudVectorStore import ICloudVectorStore + + +class VectorStoreCloudMixin(ICloudVectorStore): + """ + Mixin class for cloud-based vector stores. + """ + + api_key: str + collection_name: str + url: Optional[str] = Field( + None, description="URL of the cloud-based store to connect to" + ) + + vector_size: Optional[int] = Field( + None, description="Size of the vectors used in the store" + ) + client: Optional[object] = Field( + None, description="Client object for interacting with the cloud-based store" + ) \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vector_stores/VectorStorePersistentMixin.py b/pkgs/base/swarmauri_base/vector_stores/VectorStorePersistentMixin.py new file mode 100644 index 00000000..4d2b8d74 --- /dev/null +++ b/pkgs/base/swarmauri_base/vector_stores/VectorStorePersistentMixin.py @@ -0,0 +1,33 @@ +from typing import Optional +from pydantic import Field + +from swarmauri_core.vector_stores.IPersistentVectorStore import IPersistentVectorStore + + +class VectorStorePersistentMixin(IPersistentVectorStore): + """ + Mixin class for persistent-based vector stores. + """ + + collection_name: str + + collection: Optional[object] = Field( + None, + description="Collection object for interacting with the persistent-based store", + ) + + path: Optional[str] = Field( + None, description="URL of the persistent-based store to connect to" + ) + + vector_size: Optional[int] = Field( + None, description="Size of the vectors used in the store" + ) + client: Optional[object] = Field( + None, + description="Client object for interacting with the persistent-based store", + ) + + vectorizer: Optional[object] = Field( + None, description="Vectorizer object for converting documents to vectors" + ) \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vector_stores/VectorStoreRetrieveMixin.py b/pkgs/base/swarmauri_base/vector_stores/VectorStoreRetrieveMixin.py new file mode 100644 index 00000000..621f6f51 --- /dev/null +++ b/pkgs/base/swarmauri_base/vector_stores/VectorStoreRetrieveMixin.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod +from typing import List +from pydantic import BaseModel +from swarmauri_standard.documents.Document import Document +from swarmauri_core.vector_stores.IVectorStoreRetrieve import IVectorStoreRetrieve + + +class VectorStoreRetrieveMixin(IVectorStoreRetrieve, BaseModel): + @abstractmethod + def retrieve(self, query: str, top_k: int = 5) -> List[Document]: + """ + Retrieve the top_k most relevant documents based on the given query. + + Args: + query (str): The query string used for document retrieval. + top_k (int): The number of top relevant documents to retrieve. + + Returns: + List[IDocument]: A list of the top_k most relevant documents. + """ + pass \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vector_stores/VectorStoreSaveLoadMixin.py b/pkgs/base/swarmauri_base/vector_stores/VectorStoreSaveLoadMixin.py new file mode 100644 index 00000000..f129dd75 --- /dev/null +++ b/pkgs/base/swarmauri_base/vector_stores/VectorStoreSaveLoadMixin.py @@ -0,0 +1,163 @@ +from typing import List +import os +from pydantic import BaseModel +import json +import glob +import importlib +from swarmauri_core.vector_stores.IVectorStoreSaveLoad import IVectorStoreSaveLoad +from swarmauri_standard.documents.Document import Document + +class VectorStoreSaveLoadMixin(IVectorStoreSaveLoad, BaseModel): + """ + Base class for vector stores with built-in support for saving and loading + the vectorizer's model and the documents. + """ + + + def save_store(self, directory_path: str) -> None: + """ + Saves both the vectorizer's model and the documents. + """ + # Ensure the directory exists + if not os.path.exists(directory_path): + os.makedirs(directory_path) + + # Save the vectorizer model + model_path = os.path.join(directory_path, "embedding_model") + self._vectorizer.save_model(model_path) + + # Save documents + documents_path = os.path.join(directory_path, "documents.json") + with open(documents_path, 'w', encoding='utf-8') as f: + json.dump([each.to_dict() for each in self.documents], + f, + ensure_ascii=False, + indent=4) + + + def load_store(self, directory_path: str) -> None: + """ + Loads both the vectorizer's model and the documents. + """ + # Load the vectorizer model + model_path = os.path.join(directory_path, "embedding_model") + self.vectorizer.load_model(model_path) + + # Load documents + documents_path = os.path.join(directory_path, "documents.json") + with open(documents_path, 'r', encoding='utf-8') as f: + self.documents = [self._load_document(each) for each in json.load(f)] + + def _load_document(self, data): + document_type = data.pop("type") + if document_type: + module = importlib.import_module(f"swarmauri.documents.concrete.{document_type}") + document_class = getattr(module, document_type) + document = document_class.from_dict(data) + return document + else: + raise ValueError("Unknown document type") + + def save_parts(self, directory_path: str, chunk_size: int = 10485760) -> None: + """ + Splits the file into parts if it's too large and saves those parts individually. + """ + file_number = 1 + model_path = os.path.join(directory_path, "embedding_model") + parts_directory = os.path.join(directory_path, "parts") + + if not os.path.exists(parts_directory): + os.makedirs(parts_directory) + + + + with open(f"{model_path}/model.safetensors", 'rb') as f: + chunk = f.read(chunk_size) + while chunk: + with open(f"{parts_directory}/model.safetensors.part{file_number:03}", 'wb') as chunk_file: + chunk_file.write(chunk) + file_number += 1 + chunk = f.read(chunk_size) + + # Split the documents into parts and save them + documents_dir = os.path.join(directory_path, "documents") + + self._split_json_file(directory_path, chunk_size=chunk_size) + + + def _split_json_file(self, directory_path: str, max_records=100, chunk_size: int = 10485760): + # Read the input JSON file + combined_documents_file_path = os.path.join(directory_path, "documents.json") + + # load the master JSON file + with open(combined_documents_file_path , 'r') as file: + data = json.load(file) + + # Set and Create documents parts folder if it does not exist + documents_dir = os.path.join(directory_path, "documents") + if not os.path.exists(documents_dir): + os.makedirs(documents_dir) + current_batch = [] + file_index = 1 + current_size = 0 + + for record in data: + current_batch.append(record) + current_size = len(json.dumps(current_batch).encode('utf-8')) + + # Check if current batch meets the splitting criteria + if len(current_batch) >= max_records or current_size >= chunk_size: + # Write current batch to a new file + output_file = f'document_part_{file_index}.json' + output_file = os.path.join(documents_dir, output_file) + with open(output_file, 'w') as outfile: + json.dump(current_batch, outfile) + + # Prepare for the next batch + current_batch = [] + current_size = 0 + file_index += 1 + + # Check if there's any remaining data to be written + if current_batch: + output_file = f'document_part_{file_index}.json' + output_file = os.path.join(documents_dir, output_file) + with open(output_file, 'w') as outfile: + json.dump(current_batch, outfile) + + def load_parts(self, directory_path: str, file_pattern: str = '*.part*') -> None: + """ + Combines file parts from a directory back into a single file and loads it. + """ + model_path = os.path.join(directory_path, "embedding_model") + parts_directory = os.path.join(directory_path, "parts") + output_file_path = os.path.join(model_path, "model.safetensors") + + parts = sorted(glob.glob(os.path.join(parts_directory, file_pattern))) + with open(output_file_path, 'wb') as output_file: + for part in parts: + with open(part, 'rb') as file_part: + output_file.write(file_part.read()) + + # Load the combined_model now + model_path = os.path.join(directory_path, "embedding_model") + self._vectorizer.load_model(model_path) + + # Load document files + self._load_documents(directory_path) + + + def _load_documents(self, directory_path: str) -> None: + """ + Loads the documents from parts stored in the given directory. + """ + part_paths = glob.glob(os.path.join(directory_path, "documents/*.json")) + for part_path in part_paths: + with open(part_path, "r") as f: + part_documents = json.load(f) + for document_data in part_documents: + document_type = document_data.pop("type") + document_module = importlib.import_module(f"swarmauri.documents.concrete.{document_type}") + document_class = getattr(document_module, document_type) + document = document_class.from_dict(document_data) + self.documents.append(document) \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vector_stores/VisionVectorStoreBase.py b/pkgs/base/swarmauri_base/vector_stores/VisionVectorStoreBase.py new file mode 100644 index 00000000..49c854ea --- /dev/null +++ b/pkgs/base/swarmauri_base/vector_stores/VisionVectorStoreBase.py @@ -0,0 +1,7 @@ +from typing import Optional, Literal +from pydantic import Field +from swarmauri_core.ComponentBase import ComponentBase, ResourceTypes + +class VisionVectorStoreBase(ComponentBase): + resource: Optional[str] = Field(default=ResourceTypes.EMBEDDING.value, frozen=True) + type: Literal['VisionVectorStoreBase'] = 'VisionVectorStoreBase' \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vector_stores/__init__.py b/pkgs/base/swarmauri_base/vector_stores/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkgs/base/swarmauri_base/vectors/VectorBase.py b/pkgs/base/swarmauri_base/vectors/VectorBase.py new file mode 100644 index 00000000..e1aac936 --- /dev/null +++ b/pkgs/base/swarmauri_base/vectors/VectorBase.py @@ -0,0 +1,28 @@ +from abc import ABC, abstractmethod +from typing import List, Optional, Literal +import json +import numpy as np +from pydantic import Field +from swarmauri_core.vectors.IVector import IVector +from swarmauri_core.ComponentBase import ComponentBase, ResourceTypes + +class VectorBase(IVector, ComponentBase): + value: List[float] + resource: Optional[str] = Field(default=ResourceTypes.VECTOR.value, frozen=True) + type: Literal['VectorBase'] = 'VectorBase' + + def to_numpy(self) -> np.ndarray: + """ + Converts the vector into a numpy array. + + Returns: + np.ndarray: The numpy array representation of the vector. + """ + return np.array(self.value) + + @property + def shape(self): + return self.to_numpy().shape + + def __len__(self): + return len(self.value) \ No newline at end of file diff --git a/pkgs/base/swarmauri_base/vectors/__init__.py b/pkgs/base/swarmauri_base/vectors/__init__.py new file mode 100644 index 00000000..e69de29b