-
Notifications
You must be signed in to change notification settings - Fork 131
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
zilto
authored and
zilto
committed
Nov 10, 2023
1 parent
90bf57d
commit 94f3bcd
Showing
7 changed files
with
446 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Purpose of this module | ||
|
||
This module implements simple operations to interact with LanceDB. | ||
|
||
# Configuration Options | ||
This module doesn't receive any configuration. | ||
|
||
## Inputs: | ||
- `schema`: To create a new table, you need to specified a pyarrow schema | ||
- `overwrite_table`: Allows you to overwrite existing table | ||
|
||
|
||
# Limitations | ||
- `push_data()` and `delete_data()` currently return the number of rows added and deleted, which requires reading the table in a Pyarrow table. This could impact performance if the table gets very large or push / delete are highly frequent. |
86 changes: 86 additions & 0 deletions
86
contrib/hamilton/contrib/user/zilto/lancedb_vdb/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import logging | ||
from typing import Any, Dict, List | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
from hamilton import contrib | ||
|
||
with contrib.catch_import_errors(__name__, __file__, logger): | ||
import pyarrow as pa | ||
import lancedb | ||
|
||
from hamilton.function_modifiers import tag | ||
|
||
|
||
def vdb_client(uri: str = "./.lancedb") -> lancedb.DBConnection: | ||
return lancedb.connect(uri=uri) | ||
|
||
|
||
@tag(side_effect="True") | ||
def table_ref( | ||
vdb_client: lancedb.DBConnection, | ||
table_name: str, | ||
schema: pa.Schema, | ||
overwrite_table: bool = False, | ||
) -> lancedb.db.LanceTable: | ||
"""Create or reference a LanceDB table | ||
:param vdb_client: LanceDB connection. | ||
:param table_name: Name of the table. | ||
:param schema: Pyarrow schema defining the table schema. | ||
:param overwrite_table: If True, overwrite existing table | ||
:return: Reference to existing or newly created table. | ||
""" | ||
|
||
try: | ||
table = vdb_client.open_table(table_name) | ||
except FileNotFoundError: | ||
mode = "overwrite" if overwrite_table else "create" | ||
table = vdb_client.create_table(name=table_name, schema=schema, mode=mode) | ||
|
||
return table | ||
|
||
|
||
@tag(side_effect="True") | ||
def reset_vdb(vdb_client: lancedb.DBConnection) -> Dict[str, List[str]]: | ||
"""Drop all existing tables. | ||
:param vdb_client: LanceDB connection. | ||
:return: dictionary containing all the dropped tables. | ||
""" | ||
tables_dropped = [] | ||
for table_name in vdb_client.table_names(): | ||
vdb_client.drop_table(table_name) | ||
tables_dropped.append(table_name) | ||
|
||
return dict(tables_dropped=tables_dropped) | ||
|
||
|
||
@tag(side_effect="True") | ||
def push_data(table_ref: lancedb.db.LanceTable, data: Any) -> Dict: | ||
"""Push new data to the specified table. | ||
:param table_ref: Reference to the LanceDB table. | ||
:param data: Data to add to the table. Ref: https://lancedb.github.io/lancedb/guides/tables/#adding-to-a-table | ||
:return: Reference to the table and number of rows added | ||
""" | ||
n_rows_before = table_ref.to_arrow().shape[0] | ||
table_ref.add(data) | ||
n_rows_after = table_ref.to_arrow().shape[0] | ||
n_rows_added = n_rows_after - n_rows_before | ||
return dict(table=table_ref, n_rows_added=n_rows_added) | ||
|
||
|
||
@tag(side_effect="True") | ||
def delete_data(table_ref: lancedb.db.LanceTable, delete_expression: str) -> Dict: | ||
"""Delete existing data using an SQL expression. | ||
:param table_ref: Reference to the LanceDB table. | ||
:param data: Expression to select data. Ref: https://lancedb.github.io/lancedb/sql/ | ||
:return: Reference to the table and number of rows deleted | ||
""" | ||
n_rows_before = table_ref.to_arrow().shape[0] | ||
table_ref.delete(delete_expression) | ||
n_rows_after = table_ref.to_arrow().shape[0] | ||
n_rows_deleted = n_rows_before - n_rows_after | ||
return dict(table=table_ref, n_rows_deleted=n_rows_deleted) |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions
3
contrib/hamilton/contrib/user/zilto/lancedb_vdb/requirements.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
lancedb | ||
pyarrow | ||
sf-hamilton[visualization] |
Oops, something went wrong.