Skip to content

Commit

Permalink
a demo that shows how Aerospike graph and vector can work together
Browse files Browse the repository at this point in the history
  • Loading branch information
Behrad Babaee committed Dec 16, 2024
1 parent fd681ab commit 6b88aeb
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 0 deletions.
38 changes: 38 additions & 0 deletions graph/basic-graph-vector-search/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Basic vector search example

A simple Python application that demonstrates Aerospike Vector and Graph Services together.

## Prerequisites

1. A Python 3.10 - 3.11 environment and familiarity with the Python programming language.
2. An Aerospike Vector Search host.
3. An Aerospike Graph Search host.

You can navigate one directory level up and refer to the README file for instructions on starting the required services.

## Setup build Python Virtual Environment

This is the recommended mode for building the python client.

```shell
# Create virtual environment to isolate dependencies.
python3 -m venv .venv
source .venv/bin/activate
```

## Install dependencies

```shell
python3 -m pip install -r requirements.txt
```
## Run the search demo

Run with --help to see available the example's available configuration.
```shell
python3 search.py --help
```

Run the example.
```shell
python3 search.py
```
4 changes: 4 additions & 0 deletions graph/basic-graph-vector-search/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
aerospike-vector-search==3.0.1
gremlinpython
async_timeout
Faker
186 changes: 186 additions & 0 deletions graph/basic-graph-vector-search/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import sys, os, argparse, timeit, random

from faker import Faker
from aerospike_vector_search import types
from aerospike_vector_search import AdminClient, Client
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.traversal import T, P, Operator
from time import perf_counter_ns

arg_parser = argparse.ArgumentParser(description="Aerospike Vector adn Graph Search Example")
arg_parser.add_argument(
"--host",
dest="host",
required=False,
default="localhost",
help="Aerospike Vector Search host.",
)
arg_parser.add_argument(
"--port",
dest="port",
required=False,
default=5555,
help="Aerospike Vector Search port.",
)
arg_parser.add_argument(
"--namespace",
dest="namespace",
required=False,
default="avs-data",
help="Aerospike namespace for vector data.",
)
arg_parser.add_argument(
"--set",
dest="set",
required=False,
default="basic-data",
help="Aerospike set for vector data.",
)
arg_parser.add_argument(
"--index-name",
dest="index_name",
required=False,
default="basic_index",
help="Name of the index.",
)
arg_parser.add_argument(
"--index-namespace",
dest="index_namespace",
required=False,
default="avs-index",
help="Aerospike namespace the for vector index.",
)
arg_parser.add_argument(
"--index-set",
dest="index_set",
required=False,
default="basic-index",
help="Aerospike set for the vector index.",
)
arg_parser.add_argument(
"--dimensions",
dest="dimensions",
required=False,
default=3,
help="number of dimensions",
)
arg_parser.add_argument(
"--number-of-items-in-each-dimesnsion",
dest="number_of_items_in_each_dimesnsion",
required=False,
default=10,
help="number of items in a dimension",
)
arg_parser.add_argument(
"--search-count",
dest="search_count",
required=False,
default=10,
help="number of random searches at the end",
)
arg_parser.add_argument(
"--load-balancer",
dest="load_balancer",
action="store_true",
required=False,
default=True,
help="Use this if the host is a load balancer.",
)
args = arg_parser.parse_args()

def vector_space_builder(current_list, current_iteration):
list = []
if current_iteration == args.dimensions:
for i in range(args.number_of_items_in_each_dimesnsion):
item = [i * 1.0]
list.append(item)
return list

current_list = vector_space_builder(current_list, current_iteration+1)
for item in current_list:
for i in range(args.number_of_items_in_each_dimesnsion):
newItem = item.copy()
newItem.append(i * 1.0)
list.append(newItem)
return list

def vertex_builder(fake, dimensions):
result = []
for i in range(dimensions):
list = []
list.append(i)
list.append(fake.job())
result.append(list)
return result

def insert_jobs(gClient, jobs):
print("Inserting jobs!")
for j in jobs:
gClient.add_v('Jobs').property(T.id, j[0]).property('Job', j[1]).next()
print("Inserting jobs completed!")

def insert_data(gClient, vClient, vectors):
print("Inserting "+ str(len(vectors)) + " vertices and edges!")
start = perf_counter_ns()
for v in vectors:
key = ','.join( str(x) for x in v )
vClient.upsert(namespace=args.namespace, set_name=args.set, key=key, record_data={ "vector": v } )
person = gClient.add_v('Person').property(T.id, key).property('name', fake.name()).property('ip', fake.ipv4_private()).next()
gClient.V(person).addE("HAS_JOB").to(gClient.V(random.randint(0, args.dimensions-1)).next()).next()

m_secs = round((perf_counter_ns() - start) / 10 ** 6, 3)
print(f"Inserting took: {m_secs} milliseconds.")

def wait_for_index(vClient):
print("Waiting for indexing to complete")
start = perf_counter_ns()
vClient.wait_for_index_completion(namespace=args.namespace, name=args.index_name)
m_secs = round((perf_counter_ns() - start) / 10 ** 6, 3)
print(f"Indexing took: {m_secs} milliseconds")

def query_random(vClient, gClient):
print("querying")
for i in range(args.search_count):
v = []
for j in range(args.dimensions):
v.append(random.uniform(0, args.number_of_items_in_each_dimesnsion))

key = ','.join(map(str, v))
start = perf_counter_ns()
results = vClient.vector_search(namespace=args.namespace, index_name=args.index_name, query=v, limit=args.dimensions*2 + 1)
m_secs = round((perf_counter_ns() - start) / 10 ** 6, 3)
print(f"Querying [{key}] took: {m_secs} milliseconds")

for result in results:
print(str(gClient.V(result.key.key).element_map().to_list()) + " -> " + str(gClient.V(result.key.key).out("HAS_JOB").value_map().to_list()))

print("Clearing the environment and setting up!")
with AdminClient(seeds=types.HostPort(host=args.host, port=args.port), is_loadbalancer=args.load_balancer) as adminClient:
try:
old_stderr = sys.stderr # backup current stderr
sys.stderr = open(os.devnull, "w")
adminClient.index_drop(namespace=args.namespace, name=args.index_name, timeout=60)
except Exception as e:
pass

sys.stderr = old_stderr # reset old stderr

try:
adminClient.index_create(namespace=args.namespace, name=args.index_name, vector_field="vector", dimensions=args.dimensions, sets=args.set, index_storage=types.IndexStorage(namespace=args.index_namespace, set_name=args.index_set))
except Exception as e:
print("failed creating index " + str(e))
pass

fake = Faker()
vectors = vector_space_builder([], 1)
jobs = vertex_builder(fake, args.dimensions)
vClient = Client(seeds=types.HostPort(host=args.host, port=args.port), is_loadbalancer=args.load_balancer)
gClient = traversal().with_remote(DriverRemoteConnection('ws://localhost:8182/gremlin', 'g'))
gClient.V().drop().iterate()
print("Setup completed!")

insert_jobs(gClient, jobs)
insert_data(gClient, vClient, vectors)
wait_for_index(vClient)
query_random(vClient, gClient)

0 comments on commit 6b88aeb

Please sign in to comment.