Skip to content

Commit

Permalink
Merge pull request #13 from wfrisch/fix_multiprocessing_regression
Browse files Browse the repository at this point in the history
index: fix multiprocessing regression
  • Loading branch information
wfrisch authored Dec 18, 2024
2 parents 9aec6bb + 47f6503 commit 80cd1e3
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import argparse
import collections
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import hashlib
import multiprocessing
import sqlite3
import sys

Expand Down Expand Up @@ -60,6 +61,8 @@ def libpath(lib):
choices=["sparse", "full"], default="sparse",
help="index mode (default: sparse)")
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("--max-workers", type=int,
default=multiprocessing.cpu_count())
args = parser.parse_args()

if args.library:
Expand Down Expand Up @@ -90,7 +93,8 @@ def libpath(lib):
cur = con.executescript(SCHEMA)


def get_sourceinfos(git, lib_name, commitinfo):
def get_sourceinfos(lib_name, commitinfo):
global git
result = []
commit_hash, commit_time, paths, _ = commitinfo
commit_desc = git.describe(commit_hash)
Expand All @@ -114,21 +118,26 @@ def get_sourceinfos(git, lib_name, commitinfo):
# end='\r')
return result

def get_all_sourceinfos(git, lib_name, commitinfos):
result = []
for ci in commitinfos:
result += get_sourceinfos(git, lib_name, ci)
return result

def get_all_sourceinfos_parallel(git, lib_name, commitinfos):
def process_init(repo_path):
global git
git = GitRepo(repo_path)


def get_all_sourceinfos_parallel(repo_path, lib_name, commitinfos,
max_workers):
result = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(get_sourceinfos, git, lib_name, ci) for ci in commitinfos]
with ProcessPoolExecutor(max_workers=max_workers,
initializer=process_init,
initargs=(repo_path,)) as executor:
futures = [executor.submit(get_sourceinfos, lib_name, ci) for ci in
commitinfos]
for future in concurrent.futures.as_completed(futures):
result += future.result()
return result

def index_full():

def index_full(max_workers):
for lib in libraries:
print(f"Indexing library: {lib.name}")
sys.stdout.flush()
Expand All @@ -139,7 +148,9 @@ def index_full():
for ci in commitinfos:
num_files += len(ci.paths)
print(f"- found {num_files} files in {len(commitinfos)} commits")
sourceinfos = get_all_sourceinfos_parallel(git, lib.name, commitinfos)
sourceinfos = get_all_sourceinfos_parallel(libpath(lib), lib.name,
commitinfos,
max_workers=max_workers)
cur = con.cursor()
cur.execute('DELETE FROM files WHERE library = ?', (lib.name,))
for info in sourceinfos:
Expand All @@ -149,7 +160,7 @@ def index_full():
sys.stdout.flush()


def index_sparse():
def index_sparse(max_workers):
for lib in libraries:
print(f"Indexing library: {lib.name}")
sys.stdout.flush()
Expand All @@ -159,7 +170,9 @@ def index_sparse():
commitinfos = git.all_commits_with_metadata(path=p)
print(f"- found {len(commitinfos)} versions of {p}")
sys.stdout.flush()
sourceinfos += get_all_sourceinfos_parallel(git, lib.name, commitinfos)
sourceinfos += get_all_sourceinfos_parallel(libpath(lib), lib.name,
commitinfos,
max_workers=max_workers)
print(f"- total {len(sourceinfos)} files")
cur = con.cursor()
cur.execute('DELETE FROM files WHERE library = ?', (lib.name,))
Expand Down Expand Up @@ -213,9 +226,9 @@ def prune():

if not args.prune_only:
if args.mode == 'sparse':
index_sparse()
index_sparse(args.max_workers)
elif args.mode == 'full':
index_full()
index_full(args.max_workers)
if not args.no_prune:
print()
prune()
Expand Down

0 comments on commit 80cd1e3

Please sign in to comment.