Skip to content

Commit

Permalink
script: parallel & top
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <[email protected]>
  • Loading branch information
cutecutecat committed Jan 26, 2025
1 parent a6efa98 commit 30e85cc
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 76 deletions.
2 changes: 1 addition & 1 deletion scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ docker run --name vchord -e POSTGRES_PASSWORD=123 -p 5432:5432 -d vchord:pg16-la
# When using CPU to train k-means clustering
conda install conda-forge::pgvector-python numpy pytorch::faiss-cpu conda-forge::psycopg h5py tqdm
# or
pip install pgvector-python numpy faiss-cpu psycopg h5py tqdm
pip install pgvector numpy faiss-cpu psycopg h5py tqdm

# When using GPU to train k-means clustering
conda install conda-forge::pgvector-python numpy pytorch::faiss-gpu conda-forge::psycopg h5py tqdm
Expand Down
144 changes: 71 additions & 73 deletions scripts/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import h5py
from pgvector.psycopg import register_vector

TOP = [10]


def build_arg_parse():
parser = argparse.ArgumentParser()
Expand All @@ -26,6 +24,9 @@ def build_arg_parse():
parser.add_argument(
"-p", "--password", help="Database password", default="password"
)
parser.add_argument(
"-t", "--top", help="Dimension", type=int, choices=[10, 100], default=10
)
parser.add_argument(
"--nprob", help="argument probes for query", default=100, type=int
)
Expand Down Expand Up @@ -116,90 +117,86 @@ def calculate_metrics(all_results, k, m):


def parallel_bench(
name, test, answer, metric_ops, num_processes, password, nprob, epsilon
name, test, answer, metric_ops, num_processes, password, top, nprob, epsilon
):
"""Run benchmark in parallel using multiple processes"""
m = test.shape[0]

for k in TOP:
# Split data into batches for each process
batch_size = m // num_processes
batches = []

for i in range(num_processes):
start_idx = i * batch_size
end_idx = start_idx + batch_size if i < num_processes - 1 else m

batch = (
test[start_idx:end_idx],
answer[start_idx:end_idx],
k,
metric_ops,
password,
name,
nprob,
epsilon,
)
batches.append(batch)

# Create process pool and execute batches
with mp.Pool(processes=num_processes) as pool:
batch_results = list(
tqdm(
pool.imap(process_batch, batches),
total=len(batches),
desc=f"Processing k={k}",
)
# Split data into batches for each process
batch_size = m // num_processes
batches = []

for i in range(num_processes):
start_idx = i * batch_size
end_idx = start_idx + batch_size if i < num_processes - 1 else m

batch = (
test[start_idx:end_idx],
answer[start_idx:end_idx],
top,
metric_ops,
password,
name,
nprob,
epsilon,
)
batches.append(batch)

# Create process pool and execute batches
with mp.Pool(processes=num_processes) as pool:
batch_results = list(
tqdm(
pool.imap(process_batch, batches),
total=len(batches),
desc=f"Processing k={top}",
)
)

# Flatten results from all batches
all_results = [result for batch in batch_results for result in batch]
# Flatten results from all batches
all_results = [result for batch in batch_results for result in batch]

# Calculate metrics
recall, qps, p50, p99 = calculate_metrics(all_results, k, m)
# Calculate metrics
recall, qps, p50, p99 = calculate_metrics(all_results, top, m)

print(f"Top: {k}")
print(f" Recall: {recall:.4f}")
print(f" QPS: {qps*num_processes:.2f}")
print(f" P50 latency: {p50:.2f}ms")
print(f" P99 latency: {p99:.2f}ms")
print(f"Top: {top}")
print(f" Recall: {recall:.4f}")
print(f" QPS: {qps*num_processes:.2f}")
print(f" P50 latency: {p50:.2f}ms")
print(f" P99 latency: {p99:.2f}ms")


def sequential_bench(name, test, answer, metric_ops, conn):
def sequential_bench(name, test, answer, metric_ops, conn, top):
"""Original sequential benchmark implementation with latency tracking"""
m = test.shape[0]
for k in TOP:
results = []
pbar = tqdm(enumerate(test), total=m)
for i, query in pbar:
start = time.perf_counter()
result = conn.execute(
f"SELECT id FROM {name} ORDER BY embedding {metric_ops} %s LIMIT {k}",
(query,),
).fetchall()
end = time.perf_counter()

query_time = end - start
hit = len(set([p[0] for p in result[:k]]) & set(answer[i][:k].tolist()))
results.append((hit, query_time))

# Update progress bar with running metrics
curr_results = results[: i + 1]
curr_recall, curr_qps, curr_p50, _ = calculate_metrics(
curr_results, k, i + 1
)
pbar.set_description(
f"recall: {curr_recall:.4f} QPS: {curr_qps:.2f} P50: {curr_p50:.2f}ms"
)
results = []
pbar = tqdm(enumerate(test), total=m)
for i, query in pbar:
start = time.perf_counter()
result = conn.execute(
f"SELECT id FROM {name} ORDER BY embedding {metric_ops} %s LIMIT {top}",
(query,),
).fetchall()
end = time.perf_counter()

query_time = end - start
hit = len(set([p[0] for p in result[:top]]) & set(answer[i][:top].tolist()))
results.append((hit, query_time))

# Update progress bar with running metrics
curr_results = results[: i + 1]
curr_recall, curr_qps, curr_p50, _ = calculate_metrics(curr_results, top, i + 1)
pbar.set_description(
f"recall: {curr_recall:.4f} QPS: {curr_qps:.2f} P50: {curr_p50:.2f}ms"
)

# Calculate final metrics
recall, qps, p50, p99 = calculate_metrics(results, k, m)
# Calculate final metrics
recall, qps, p50, p99 = calculate_metrics(results, top, m)

print(f"Top: {k}")
print(f" Recall: {recall:.4f}")
print(f" QPS: {qps:.2f}")
print(f" P50 latency: {p50:.2f}ms")
print(f" P99 latency: {p99:.2f}ms")
print(f"Top: {top}")
print(f" Recall: {recall:.4f}")
print(f" QPS: {qps:.2f}")
print(f" P50 latency: {p50:.2f}ms")
print(f" P99 latency: {p99:.2f}ms")


if __name__ == "__main__":
Expand Down Expand Up @@ -228,9 +225,10 @@ def sequential_bench(name, test, answer, metric_ops, conn):
metric_ops,
args.processes,
args.password,
args.top,
args.nprob,
args.epsilon,
)
else:
conn = create_connection(args.password, args.nprob, args.epsilon)
sequential_bench(args.name, test, answer, metric_ops, conn)
sequential_bench(args.name, test, answer, metric_ops, conn, args.top)
6 changes: 4 additions & 2 deletions scripts/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def build_arg_parse():
"-p", "--password", help="Database password", default="password"
)
parser.add_argument("-d", "--dim", help="Dimension", type=int, required=True)
# Remember to set `max_worker_processes` at server start
parser.add_argument(
"-w",
"--workers",
Expand Down Expand Up @@ -132,9 +133,10 @@ async def add_centroids(conn, name, centroids):
await asyncio.sleep(0)


async def add_embeddings(conn, name, dim, train, chunks):
async def add_embeddings(conn, name, dim, train, chunks, workers):
await conn.execute(f"DROP TABLE IF EXISTS {name}")
await conn.execute(f"CREATE TABLE {name} (id integer, embedding vector({dim}))")
await conn.execute(f"ALTER TABLE {name} SET (parallel_workers = {workers})")

n, dim = train.shape
chunk_size = math.ceil(n / chunks)
Expand Down Expand Up @@ -201,7 +203,7 @@ async def main(dataset):
metric_ops, ivf_config = get_ivf_ops_config(
args.metric, args.workers, args.lists, args.name if args.centroids else None
)
await add_embeddings(conn, args.name, args.dim, dataset["train"], args.chunks)
await add_embeddings(conn, args.name, args.dim, dataset["train"], args.chunks, args.workers)

index_finish = asyncio.Event()
# Need a separate connection for monitor process
Expand Down

0 comments on commit 30e85cc

Please sign in to comment.