Skip to content

Commit

Permalink
missing files
Browse files Browse the repository at this point in the history
  • Loading branch information
cobycloud committed Dec 24, 2024
1 parent 4df40a6 commit 040dfb3
Show file tree
Hide file tree
Showing 9 changed files with 410 additions and 0 deletions.
135 changes: 135 additions & 0 deletions pkgs/base/swarmauri_base/vector_stores/VectorStoreBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
from abc import ABC, abstractmethod
from typing import List, Optional, Literal
from pydantic import Field, PrivateAttr
from swarmauri_core.ComponentBase import ComponentBase, ResourceTypes
from swarmauri_standard.documents.Document import Document
from swarmauri_core.vector_stores.IVectorStore import IVectorStore


class VectorStoreBase(IVectorStore, ComponentBase):
"""
Abstract base class for document stores, implementing the IVectorStore interface.
This class provides a standard API for adding, updating, getting, and deleting documents in a vector store.
The specifics of storing (e.g., in a database, in-memory, or file system) are to be implemented by concrete subclasses.
"""

documents: List[Document] = []
_embedder = PrivateAttr()
_distance = PrivateAttr()
resource: Optional[str] = Field(default=ResourceTypes.VECTOR_STORE.value)
type: Literal["VectorStoreBase"] = "VectorStoreBase"

@property
def embedder(self):
return self._embedder

@abstractmethod
def add_document(self, document: Document) -> None:
"""
Add a single document to the document store.
Parameters:
- document (IDocument): The document to be added to the store.
"""
pass

@abstractmethod
def add_documents(self, documents: List[Document]) -> None:
"""
Add multiple documents to the document store in a batch operation.
Parameters:
- documents (List[IDocument]): A list of documents to be added to the store.
"""
pass

@abstractmethod
def get_document(self, id: str) -> Optional[Document]:
"""
Retrieve a single document by its identifier.
Parameters:
- doc_id (str): The unique identifier of the document to retrieve.
Returns:
- Optional[IDocument]: The requested document if found; otherwise, None.
"""
pass

@abstractmethod
def get_all_documents(self) -> List[Document]:
"""
Retrieve all documents stored in the document store.
Returns:
- List[IDocument]: A list of all documents in the store.
"""
pass

@abstractmethod
def update_document(self, id: str, updated_document: Document) -> None:
"""
Update a document in the document store.
Parameters:
- doc_id (str): The unique identifier of the document to update.
- updated_document (IDocument): The updated document instance.
"""
pass

@abstractmethod
def delete_document(self, id: str) -> None:
"""
Delete a document from the document store by its identifier.
Parameters:
- doc_id (str): The unique identifier of the document to delete.
"""
pass

def clear_documents(self) -> None:
"""
Deletes all documents from the vector store
"""
self.documents = []

def document_count(self):
return len(self.documents)

def document_dumps(self) -> str:
"""
Placeholder
"""
return json.dumps([each.to_dict() for each in self.documents])

def document_dump(self, file_path: str) -> None:
"""
Placeholder
"""
with open(file_path, "w", encoding="utf-8") as f:
json.dump(
[each.to_dict() for each in self.documents],
f,
ensure_ascii=False,
indent=4,
)

def document_loads(self, json_data: str) -> None:
"""
Placeholder
"""
self.documents = [
globals()[each["type"]].from_dict(each) for each in json.loads(json_data)
]

def document_load(self, file_path: str) -> None:
"""
Placeholder
"""
with open(file_path, "r", encoding="utf-8") as f:
self.documents = [
globals()[each["type"]].from_dict(each) for each in json.load(file_path)
]
23 changes: 23 additions & 0 deletions pkgs/base/swarmauri_base/vector_stores/VectorStoreCloudMixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Optional
from pydantic import Field

from swarmauri_core.vector_stores.ICloudVectorStore import ICloudVectorStore


class VectorStoreCloudMixin(ICloudVectorStore):
"""
Mixin class for cloud-based vector stores.
"""

api_key: str
collection_name: str
url: Optional[str] = Field(
None, description="URL of the cloud-based store to connect to"
)

vector_size: Optional[int] = Field(
None, description="Size of the vectors used in the store"
)
client: Optional[object] = Field(
None, description="Client object for interacting with the cloud-based store"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Optional
from pydantic import Field

from swarmauri_core.vector_stores.IPersistentVectorStore import IPersistentVectorStore


class VectorStorePersistentMixin(IPersistentVectorStore):
"""
Mixin class for persistent-based vector stores.
"""

collection_name: str

collection: Optional[object] = Field(
None,
description="Collection object for interacting with the persistent-based store",
)

path: Optional[str] = Field(
None, description="URL of the persistent-based store to connect to"
)

vector_size: Optional[int] = Field(
None, description="Size of the vectors used in the store"
)
client: Optional[object] = Field(
None,
description="Client object for interacting with the persistent-based store",
)

vectorizer: Optional[object] = Field(
None, description="Vectorizer object for converting documents to vectors"
)
21 changes: 21 additions & 0 deletions pkgs/base/swarmauri_base/vector_stores/VectorStoreRetrieveMixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel
from swarmauri_standard.documents.Document import Document
from swarmauri_core.vector_stores.IVectorStoreRetrieve import IVectorStoreRetrieve


class VectorStoreRetrieveMixin(IVectorStoreRetrieve, BaseModel):
@abstractmethod
def retrieve(self, query: str, top_k: int = 5) -> List[Document]:
"""
Retrieve the top_k most relevant documents based on the given query.
Args:
query (str): The query string used for document retrieval.
top_k (int): The number of top relevant documents to retrieve.
Returns:
List[IDocument]: A list of the top_k most relevant documents.
"""
pass
163 changes: 163 additions & 0 deletions pkgs/base/swarmauri_base/vector_stores/VectorStoreSaveLoadMixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from typing import List
import os
from pydantic import BaseModel
import json
import glob
import importlib
from swarmauri_core.vector_stores.IVectorStoreSaveLoad import IVectorStoreSaveLoad
from swarmauri_standard.documents.Document import Document

class VectorStoreSaveLoadMixin(IVectorStoreSaveLoad, BaseModel):
"""
Base class for vector stores with built-in support for saving and loading
the vectorizer's model and the documents.
"""


def save_store(self, directory_path: str) -> None:
"""
Saves both the vectorizer's model and the documents.
"""
# Ensure the directory exists
if not os.path.exists(directory_path):
os.makedirs(directory_path)

# Save the vectorizer model
model_path = os.path.join(directory_path, "embedding_model")
self._vectorizer.save_model(model_path)

# Save documents
documents_path = os.path.join(directory_path, "documents.json")
with open(documents_path, 'w', encoding='utf-8') as f:
json.dump([each.to_dict() for each in self.documents],
f,
ensure_ascii=False,
indent=4)


def load_store(self, directory_path: str) -> None:
"""
Loads both the vectorizer's model and the documents.
"""
# Load the vectorizer model
model_path = os.path.join(directory_path, "embedding_model")
self.vectorizer.load_model(model_path)

# Load documents
documents_path = os.path.join(directory_path, "documents.json")
with open(documents_path, 'r', encoding='utf-8') as f:
self.documents = [self._load_document(each) for each in json.load(f)]

def _load_document(self, data):
document_type = data.pop("type")
if document_type:
module = importlib.import_module(f"swarmauri.documents.concrete.{document_type}")
document_class = getattr(module, document_type)
document = document_class.from_dict(data)
return document
else:
raise ValueError("Unknown document type")

def save_parts(self, directory_path: str, chunk_size: int = 10485760) -> None:
"""
Splits the file into parts if it's too large and saves those parts individually.
"""
file_number = 1
model_path = os.path.join(directory_path, "embedding_model")
parts_directory = os.path.join(directory_path, "parts")

if not os.path.exists(parts_directory):
os.makedirs(parts_directory)



with open(f"{model_path}/model.safetensors", 'rb') as f:
chunk = f.read(chunk_size)
while chunk:
with open(f"{parts_directory}/model.safetensors.part{file_number:03}", 'wb') as chunk_file:
chunk_file.write(chunk)
file_number += 1
chunk = f.read(chunk_size)

# Split the documents into parts and save them
documents_dir = os.path.join(directory_path, "documents")

self._split_json_file(directory_path, chunk_size=chunk_size)


def _split_json_file(self, directory_path: str, max_records=100, chunk_size: int = 10485760):
# Read the input JSON file
combined_documents_file_path = os.path.join(directory_path, "documents.json")

# load the master JSON file
with open(combined_documents_file_path , 'r') as file:
data = json.load(file)

# Set and Create documents parts folder if it does not exist
documents_dir = os.path.join(directory_path, "documents")
if not os.path.exists(documents_dir):
os.makedirs(documents_dir)
current_batch = []
file_index = 1
current_size = 0

for record in data:
current_batch.append(record)
current_size = len(json.dumps(current_batch).encode('utf-8'))

# Check if current batch meets the splitting criteria
if len(current_batch) >= max_records or current_size >= chunk_size:
# Write current batch to a new file
output_file = f'document_part_{file_index}.json'
output_file = os.path.join(documents_dir, output_file)
with open(output_file, 'w') as outfile:
json.dump(current_batch, outfile)

# Prepare for the next batch
current_batch = []
current_size = 0
file_index += 1

# Check if there's any remaining data to be written
if current_batch:
output_file = f'document_part_{file_index}.json'
output_file = os.path.join(documents_dir, output_file)
with open(output_file, 'w') as outfile:
json.dump(current_batch, outfile)

def load_parts(self, directory_path: str, file_pattern: str = '*.part*') -> None:
"""
Combines file parts from a directory back into a single file and loads it.
"""
model_path = os.path.join(directory_path, "embedding_model")
parts_directory = os.path.join(directory_path, "parts")
output_file_path = os.path.join(model_path, "model.safetensors")

parts = sorted(glob.glob(os.path.join(parts_directory, file_pattern)))
with open(output_file_path, 'wb') as output_file:
for part in parts:
with open(part, 'rb') as file_part:
output_file.write(file_part.read())

# Load the combined_model now
model_path = os.path.join(directory_path, "embedding_model")
self._vectorizer.load_model(model_path)

# Load document files
self._load_documents(directory_path)


def _load_documents(self, directory_path: str) -> None:
"""
Loads the documents from parts stored in the given directory.
"""
part_paths = glob.glob(os.path.join(directory_path, "documents/*.json"))
for part_path in part_paths:
with open(part_path, "r") as f:
part_documents = json.load(f)
for document_data in part_documents:
document_type = document_data.pop("type")
document_module = importlib.import_module(f"swarmauri.documents.concrete.{document_type}")
document_class = getattr(document_module, document_type)
document = document_class.from_dict(document_data)
self.documents.append(document)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Optional, Literal
from pydantic import Field
from swarmauri_core.ComponentBase import ComponentBase, ResourceTypes

class VisionVectorStoreBase(ComponentBase):
resource: Optional[str] = Field(default=ResourceTypes.EMBEDDING.value, frozen=True)
type: Literal['VisionVectorStoreBase'] = 'VisionVectorStoreBase'
Empty file.
Loading

0 comments on commit 040dfb3

Please sign in to comment.