From 47f650314abcd01335f686bc900b0b480625e9fd Mon Sep 17 00:00:00 2001 From: Wolfgang Frisch Date: Wed, 18 Dec 2024 10:26:17 +0100 Subject: [PATCH] index: fix multiprocessing regression Parallelization was broken accidentally a while ago when we switched from git subprocesses to libgit2. Previously git subprocesses could run in parallel within a thread pool, but libgit2 implements locking, preventing parallel execution. This commit: - introduces a process pool with an individual GitRepo object for each worker respectively - adds a --max_workers command-line argument --- index.py | 45 +++++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/index.py b/index.py index 84ca70a..87de8bc 100755 --- a/index.py +++ b/index.py @@ -3,8 +3,9 @@ import argparse import collections import concurrent.futures -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor import hashlib +import multiprocessing import sqlite3 import sys @@ -60,6 +61,8 @@ def libpath(lib): choices=["sparse", "full"], default="sparse", help="index mode (default: sparse)") parser.add_argument("-v", "--verbose", action="store_true") +parser.add_argument("--max-workers", type=int, + default=multiprocessing.cpu_count()) args = parser.parse_args() if args.library: @@ -90,7 +93,8 @@ def libpath(lib): cur = con.executescript(SCHEMA) -def get_sourceinfos(git, lib_name, commitinfo): +def get_sourceinfos(lib_name, commitinfo): + global git result = [] commit_hash, commit_time, paths, _ = commitinfo commit_desc = git.describe(commit_hash) @@ -114,21 +118,26 @@ def get_sourceinfos(git, lib_name, commitinfo): # end='\r') return result -def get_all_sourceinfos(git, lib_name, commitinfos): - result = [] - for ci in commitinfos: - result += get_sourceinfos(git, lib_name, ci) - return result -def get_all_sourceinfos_parallel(git, lib_name, commitinfos): +def process_init(repo_path): + global git + git = GitRepo(repo_path) + + +def get_all_sourceinfos_parallel(repo_path, lib_name, commitinfos, + max_workers): result = [] - with ThreadPoolExecutor() as executor: - futures = [executor.submit(get_sourceinfos, git, lib_name, ci) for ci in commitinfos] + with ProcessPoolExecutor(max_workers=max_workers, + initializer=process_init, + initargs=(repo_path,)) as executor: + futures = [executor.submit(get_sourceinfos, lib_name, ci) for ci in + commitinfos] for future in concurrent.futures.as_completed(futures): result += future.result() return result -def index_full(): + +def index_full(max_workers): for lib in libraries: print(f"Indexing library: {lib.name}") sys.stdout.flush() @@ -139,7 +148,9 @@ def index_full(): for ci in commitinfos: num_files += len(ci.paths) print(f"- found {num_files} files in {len(commitinfos)} commits") - sourceinfos = get_all_sourceinfos_parallel(git, lib.name, commitinfos) + sourceinfos = get_all_sourceinfos_parallel(libpath(lib), lib.name, + commitinfos, + max_workers=max_workers) cur = con.cursor() cur.execute('DELETE FROM files WHERE library = ?', (lib.name,)) for info in sourceinfos: @@ -149,7 +160,7 @@ def index_full(): sys.stdout.flush() -def index_sparse(): +def index_sparse(max_workers): for lib in libraries: print(f"Indexing library: {lib.name}") sys.stdout.flush() @@ -159,7 +170,9 @@ def index_sparse(): commitinfos = git.all_commits_with_metadata(path=p) print(f"- found {len(commitinfos)} versions of {p}") sys.stdout.flush() - sourceinfos += get_all_sourceinfos_parallel(git, lib.name, commitinfos) + sourceinfos += get_all_sourceinfos_parallel(libpath(lib), lib.name, + commitinfos, + max_workers=max_workers) print(f"- total {len(sourceinfos)} files") cur = con.cursor() cur.execute('DELETE FROM files WHERE library = ?', (lib.name,)) @@ -213,9 +226,9 @@ def prune(): if not args.prune_only: if args.mode == 'sparse': - index_sparse() + index_sparse(args.max_workers) elif args.mode == 'full': - index_full() + index_full(args.max_workers) if not args.no_prune: print() prune()