From 30e85cc6f093a77989358898ba322fc6417b14ce Mon Sep 17 00:00:00 2001 From: cutecutecat Date: Fri, 17 Jan 2025 15:34:25 +0800 Subject: [PATCH] script: parallel & top Signed-off-by: cutecutecat --- scripts/README.md | 2 +- scripts/bench.py | 144 +++++++++++++++++++++++----------------------- scripts/index.py | 6 +- 3 files changed, 76 insertions(+), 76 deletions(-) diff --git a/scripts/README.md b/scripts/README.md index 9ed7172..56ca24e 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -55,7 +55,7 @@ docker run --name vchord -e POSTGRES_PASSWORD=123 -p 5432:5432 -d vchord:pg16-la # When using CPU to train k-means clustering conda install conda-forge::pgvector-python numpy pytorch::faiss-cpu conda-forge::psycopg h5py tqdm # or -pip install pgvector-python numpy faiss-cpu psycopg h5py tqdm +pip install pgvector numpy faiss-cpu psycopg h5py tqdm # When using GPU to train k-means clustering conda install conda-forge::pgvector-python numpy pytorch::faiss-gpu conda-forge::psycopg h5py tqdm diff --git a/scripts/bench.py b/scripts/bench.py index 0d7670c..4e6eca5 100644 --- a/scripts/bench.py +++ b/scripts/bench.py @@ -9,8 +9,6 @@ import h5py from pgvector.psycopg import register_vector -TOP = [10] - def build_arg_parse(): parser = argparse.ArgumentParser() @@ -26,6 +24,9 @@ def build_arg_parse(): parser.add_argument( "-p", "--password", help="Database password", default="password" ) + parser.add_argument( + "-t", "--top", help="Dimension", type=int, choices=[10, 100], default=10 + ) parser.add_argument( "--nprob", help="argument probes for query", default=100, type=int ) @@ -116,90 +117,86 @@ def calculate_metrics(all_results, k, m): def parallel_bench( - name, test, answer, metric_ops, num_processes, password, nprob, epsilon + name, test, answer, metric_ops, num_processes, password, top, nprob, epsilon ): """Run benchmark in parallel using multiple processes""" m = test.shape[0] - for k in TOP: - # Split data into batches for each process - batch_size = m // num_processes - batches = [] - - for i in range(num_processes): - start_idx = i * batch_size - end_idx = start_idx + batch_size if i < num_processes - 1 else m - - batch = ( - test[start_idx:end_idx], - answer[start_idx:end_idx], - k, - metric_ops, - password, - name, - nprob, - epsilon, - ) - batches.append(batch) - - # Create process pool and execute batches - with mp.Pool(processes=num_processes) as pool: - batch_results = list( - tqdm( - pool.imap(process_batch, batches), - total=len(batches), - desc=f"Processing k={k}", - ) + # Split data into batches for each process + batch_size = m // num_processes + batches = [] + + for i in range(num_processes): + start_idx = i * batch_size + end_idx = start_idx + batch_size if i < num_processes - 1 else m + + batch = ( + test[start_idx:end_idx], + answer[start_idx:end_idx], + top, + metric_ops, + password, + name, + nprob, + epsilon, + ) + batches.append(batch) + + # Create process pool and execute batches + with mp.Pool(processes=num_processes) as pool: + batch_results = list( + tqdm( + pool.imap(process_batch, batches), + total=len(batches), + desc=f"Processing k={top}", ) + ) - # Flatten results from all batches - all_results = [result for batch in batch_results for result in batch] + # Flatten results from all batches + all_results = [result for batch in batch_results for result in batch] - # Calculate metrics - recall, qps, p50, p99 = calculate_metrics(all_results, k, m) + # Calculate metrics + recall, qps, p50, p99 = calculate_metrics(all_results, top, m) - print(f"Top: {k}") - print(f" Recall: {recall:.4f}") - print(f" QPS: {qps*num_processes:.2f}") - print(f" P50 latency: {p50:.2f}ms") - print(f" P99 latency: {p99:.2f}ms") + print(f"Top: {top}") + print(f" Recall: {recall:.4f}") + print(f" QPS: {qps*num_processes:.2f}") + print(f" P50 latency: {p50:.2f}ms") + print(f" P99 latency: {p99:.2f}ms") -def sequential_bench(name, test, answer, metric_ops, conn): +def sequential_bench(name, test, answer, metric_ops, conn, top): """Original sequential benchmark implementation with latency tracking""" m = test.shape[0] - for k in TOP: - results = [] - pbar = tqdm(enumerate(test), total=m) - for i, query in pbar: - start = time.perf_counter() - result = conn.execute( - f"SELECT id FROM {name} ORDER BY embedding {metric_ops} %s LIMIT {k}", - (query,), - ).fetchall() - end = time.perf_counter() - - query_time = end - start - hit = len(set([p[0] for p in result[:k]]) & set(answer[i][:k].tolist())) - results.append((hit, query_time)) - - # Update progress bar with running metrics - curr_results = results[: i + 1] - curr_recall, curr_qps, curr_p50, _ = calculate_metrics( - curr_results, k, i + 1 - ) - pbar.set_description( - f"recall: {curr_recall:.4f} QPS: {curr_qps:.2f} P50: {curr_p50:.2f}ms" - ) + results = [] + pbar = tqdm(enumerate(test), total=m) + for i, query in pbar: + start = time.perf_counter() + result = conn.execute( + f"SELECT id FROM {name} ORDER BY embedding {metric_ops} %s LIMIT {top}", + (query,), + ).fetchall() + end = time.perf_counter() + + query_time = end - start + hit = len(set([p[0] for p in result[:top]]) & set(answer[i][:top].tolist())) + results.append((hit, query_time)) + + # Update progress bar with running metrics + curr_results = results[: i + 1] + curr_recall, curr_qps, curr_p50, _ = calculate_metrics(curr_results, top, i + 1) + pbar.set_description( + f"recall: {curr_recall:.4f} QPS: {curr_qps:.2f} P50: {curr_p50:.2f}ms" + ) - # Calculate final metrics - recall, qps, p50, p99 = calculate_metrics(results, k, m) + # Calculate final metrics + recall, qps, p50, p99 = calculate_metrics(results, top, m) - print(f"Top: {k}") - print(f" Recall: {recall:.4f}") - print(f" QPS: {qps:.2f}") - print(f" P50 latency: {p50:.2f}ms") - print(f" P99 latency: {p99:.2f}ms") + print(f"Top: {top}") + print(f" Recall: {recall:.4f}") + print(f" QPS: {qps:.2f}") + print(f" P50 latency: {p50:.2f}ms") + print(f" P99 latency: {p99:.2f}ms") if __name__ == "__main__": @@ -228,9 +225,10 @@ def sequential_bench(name, test, answer, metric_ops, conn): metric_ops, args.processes, args.password, + args.top, args.nprob, args.epsilon, ) else: conn = create_connection(args.password, args.nprob, args.epsilon) - sequential_bench(args.name, test, answer, metric_ops, conn) + sequential_bench(args.name, test, answer, metric_ops, conn, args.top) diff --git a/scripts/index.py b/scripts/index.py index 77e3839..cb4f111 100644 --- a/scripts/index.py +++ b/scripts/index.py @@ -35,6 +35,7 @@ def build_arg_parse(): "-p", "--password", help="Database password", default="password" ) parser.add_argument("-d", "--dim", help="Dimension", type=int, required=True) + # Remember to set `max_worker_processes` at server start parser.add_argument( "-w", "--workers", @@ -132,9 +133,10 @@ async def add_centroids(conn, name, centroids): await asyncio.sleep(0) -async def add_embeddings(conn, name, dim, train, chunks): +async def add_embeddings(conn, name, dim, train, chunks, workers): await conn.execute(f"DROP TABLE IF EXISTS {name}") await conn.execute(f"CREATE TABLE {name} (id integer, embedding vector({dim}))") + await conn.execute(f"ALTER TABLE {name} SET (parallel_workers = {workers})") n, dim = train.shape chunk_size = math.ceil(n / chunks) @@ -201,7 +203,7 @@ async def main(dataset): metric_ops, ivf_config = get_ivf_ops_config( args.metric, args.workers, args.lists, args.name if args.centroids else None ) - await add_embeddings(conn, args.name, args.dim, dataset["train"], args.chunks) + await add_embeddings(conn, args.name, args.dim, dataset["train"], args.chunks, args.workers) index_finish = asyncio.Event() # Need a separate connection for monitor process