Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New parallelization #108

Merged
merged 39 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
229727a
initial work
lmseidler Jan 14, 2025
0069a6e
new parallelization preliminary done
lmseidler Jan 15, 2025
0534777
small fix in tests
lmseidler Jan 15, 2025
ef08fe5
commented unnecessary code
lmseidler Jan 15, 2025
872fd2e
updated construction sites
lmseidler Jan 15, 2025
e2b0a51
hopefully fixed parallelization
lmseidler Jan 16, 2025
3383173
small test fix
lmseidler Jan 16, 2025
6e87a19
initial work
lmseidler Jan 14, 2025
832f0ed
new parallelization preliminary done
lmseidler Jan 15, 2025
f4c02a8
small fix in tests
lmseidler Jan 15, 2025
42c3c41
commented unnecessary code
lmseidler Jan 15, 2025
b49b3f8
updated construction sites
lmseidler Jan 15, 2025
c27dcae
hopefully fixed parallelization
lmseidler Jan 16, 2025
5183a62
small test fix
lmseidler Jan 16, 2025
42fcfca
Merge pull request #1 from lmseidler/parallel
lmseidler Jan 16, 2025
a65637f
only print when verbosity > 0, otherwise nothing is printed (bad)
lmseidler Jan 16, 2025
256159c
pre-commit fixes
lmseidler Jan 16, 2025
f62ef53
added ncores to config + implemented setting ncores for external prog…
lmseidler Jan 17, 2025
c92cedc
fixed tests
lmseidler Jan 17, 2025
7ab2873
added check for number of cores available vs needed
lmseidler Jan 17, 2025
8494ba9
pre-commit
lmseidler Jan 17, 2025
c8df9b1
test fix
lmseidler Jan 17, 2025
47b9acc
Merge branch 'main' into main
lmseidler Jan 17, 2025
57b6001
fixed tm implementation
lmseidler Jan 17, 2025
c0a7855
updated main.py
lmseidler Jan 17, 2025
493b92c
tqdm progress bar
lmseidler Jan 17, 2025
7da10e4
updated dependencies
lmseidler Jan 17, 2025
6230638
mypy types import
lmseidler Jan 17, 2025
6d44b51
moved warnings in correct bracket
lmseidler Jan 17, 2025
4bdff32
updated default config toml
lmseidler Jan 17, 2025
47dd514
added final output of molecules and timing
lmseidler Jan 17, 2025
bdb0c6d
fixed time
lmseidler Jan 17, 2025
c2ac8c9
better time info
lmseidler Jan 17, 2025
5073821
print formatting
lmseidler Jan 17, 2025
c30e162
shift block setup to parallel.py
marcelmbn Jan 19, 2025
22d7f80
avoid UnboundLocalError
marcelmbn Jan 19, 2025
679d094
some code formatting and printout adjustments
marcelmbn Jan 19, 2025
d8b3e46
shifted CHANGELOG entry to correct position
marcelmbn Jan 19, 2025
6ecc996
update CODEOWNERS file
marcelmbn Jan 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ repos:
- id: mypy
additional_dependencies: [types-toml]
default_language_version:
python: python3.12
python: python3.13
lmseidler marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- `GXTBConfig` class for the g-xTB method, supporting SCF cycles check
- updated the parallelization to work over the number of molecules

### Fixed
- version string is now correctly formatted and printed
Expand Down
273 changes: 212 additions & 61 deletions src/mindlessgen/generator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,42 @@
from __future__ import annotations

from collections.abc import Callable
from concurrent.futures import Future, as_completed
from pathlib import Path
import multiprocessing as mp
from threading import Event
from queue import Queue, Empty
import warnings
from dataclasses import dataclass


from ..molecules import generate_random_molecule, Molecule
from ..qm import XTB, get_xtb_path, QMMethod, ORCA, get_orca_path, GXTB, get_gxtb_path
from ..molecules import iterative_optimization, postprocess_mol
from ..prog import ConfigManager
from ..prog import ConfigManager, setup_managers, ResourceMonitor
from ..prog.config import MINCORES_PLACEHOLDER

from ..__version__ import __version__

MINDLESS_MOLECULES_FILE = "mindless.molecules"


def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]:
@dataclass
class Block:
num_molecules: int
ncores: int


def printer_thread(msg_queue: Queue[str], stop_event: Event):
while not stop_event.is_set() or not msg_queue.empty():
try:
msg = msg_queue.get(timeout=0.1)
print(msg)
except Empty:
continue
lmseidler marked this conversation as resolved.
Show resolved Hide resolved


def generator(config: ConfigManager) -> tuple[list[Molecule], int]:
"""
Generate a molecule.
"""
Expand All @@ -38,7 +59,7 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]:

if config.general.print_config:
print(config)
return None, 0
return [], 0

# Import and set up required engines
refine_engine: QMMethod = setup_engines(
Expand Down Expand Up @@ -72,91 +93,177 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]:
if Path(MINDLESS_MOLECULES_FILE).is_file():
if config.general.verbosity > 0:
print(f"\n--- Appending to existing file '{MINDLESS_MOLECULES_FILE}'. ---")

exitcode = 0
optimized_molecules: list[Molecule] = []
for molcount in range(config.general.num_molecules):
# print a decent header for each molecule iteration
if config.general.verbosity > 0:
print(f"\n{'='*80}")
print(
f"{'='*22} Generating molecule {molcount + 1:<4} of "
+ f"{config.general.num_molecules:<4} {'='*24}"
)
print(f"{'='*80}")
manager = mp.Manager()
stop_event = manager.Event()
cycles = range(config.general.max_cycles)
backup_verbosity: int | None = None
if num_cores > 1 and config.general.verbosity > 0:
backup_verbosity = (
config.general.verbosity
) # Save verbosity level for later
config.general.verbosity = 0 # Disable verbosity if parallel

if config.general.verbosity == 0:
print("Cycle... ", end="", flush=True)
with mp.Pool(processes=num_cores) as pool:
results = pool.starmap(
single_molecule_generator,
[
(config, refine_engine, postprocess_engine, cycle, stop_event)
for cycle in cycles
],
)
if config.general.verbosity == 0:
print("")

# Restore verbosity level if it was changed
if backup_verbosity is not None:
config.general.verbosity = backup_verbosity

# Filter out None values and return the first successful molecule
optimized_molecule: Molecule | None = None
for i, result in enumerate(results):
if result is not None:
cycles_needed = i + 1
optimized_molecule = result
break

# Initialize parallel blocks here
blocks = setup_blocks(num_cores, config.general.num_molecules)
blocks.sort(key=lambda x: x.ncores)

backup_verbosity: int | None = None
if len(blocks) > 1 and config.general.verbosity > 0:
backup_verbosity = config.general.verbosity # Save verbosity level for later
config.general.verbosity = 0 # Disable verbosity if parallel
# NOTE: basically no messages will be printed if generation is run in parallel

# Set up parallel blocks environment
with setup_managers(num_cores // MINCORES_PLACEHOLDER, num_cores) as (
executor,
manager,
resources,
):
# Prepare a message queue and printer thread
# msg_queue = Queue()
# stop_event = manager.Event()
# printer = Thread(target=printer_thread, args=(msg_queue, stop_event))
# printer.start()
lmseidler marked this conversation as resolved.
Show resolved Hide resolved

# The following creates a queue of futures which occupy a certain number of cores each
# as defined by each block
# Each future represents the generation of one molecule
# NOTE: proceeding this way assures that each molecule gets a static number of cores
# a dynamic setting would also be thinkable and straightforward to implement
tasks: list[Future[Molecule | None]] = []
for block in blocks:
for _ in range(block.num_molecules):
tasks.append(
executor.submit(
single_molecule_generator,
len(tasks),
config,
resources,
refine_engine,
postprocess_engine,
block.ncores,
)
)

# Collect results of all tries to create a molecule
results: list[Molecule | None] = [task.result() for task in as_completed(tasks)]

# Stop the printer thread if necessary
# stop_event.set()
# printer.join()

# Restore verbosity level if it was changed
if backup_verbosity is not None:
config.general.verbosity = backup_verbosity

for molcount, optimized_molecule in enumerate(results):
if optimized_molecule is None:
# TODO: molcount might not align with the number of the molecule that actually failed, look into this
warnings.warn(
"Molecule generation including optimization (and postprocessing) "
+ f"failed for all cycles for molecule {molcount + 1}."
)
exitcode = 1
continue
if config.general.verbosity > 0:
print(f"Optimized mindless molecule found in {cycles_needed} cycles.")
print(optimized_molecule)

# if config.general.verbosity > 0:
# print(f"Optimized mindless molecule found in {cycles_needed} cycles.")
# print(optimized_molecule)

lmseidler marked this conversation as resolved.
Show resolved Hide resolved
if config.general.write_xyz:
optimized_molecule.write_xyz_to_file()
if config.general.verbosity > 0:
print(f"Written molecule file 'mlm_{optimized_molecule.name}.xyz'.\n")
with open("mindless.molecules", "a", encoding="utf8") as f:
f.write(f"mlm_{optimized_molecule.name}\n")

optimized_molecules.append(optimized_molecule)

return optimized_molecules, exitcode


def single_molecule_generator(
molcount: int,
config: ConfigManager,
resources: ResourceMonitor,
refine_engine: QMMethod,
postprocess_engine: QMMethod | None,
cycle: int,
stop_event,
ncores: int,
) -> Molecule | None:
"""
Generate a single molecule.
Generate a single molecule (from start to finish).
"""

# Wait for enough cores (cores freed automatically upon leaving managed context)
with resources.occupy_cores(ncores):
# print a decent header for each molecule iteration
if config.general.verbosity > 0:
print(f"\n{'='*80}")
print(
f"{'='*22} Generating molecule {molcount + 1:<4} of "
+ f"{config.general.num_molecules:<4} {'='*24}"
)
print(f"{'='*80}")

with setup_managers(ncores, ncores) as (executor, manager, resources_local):
stop_event = manager.Event()
# Launch worker processes to find molecule
# if config.general.verbosity == 0:
# print("Cycle... ", end="", flush=True)
cycles = range(config.general.max_cycles)
tasks: list[Future[Molecule | None]] = []
for cycle in cycles:
tasks.append(
executor.submit(
single_molecule_step,
config,
resources_local,
refine_engine,
postprocess_engine,
cycle,
stop_event,
)
)

# Finally, add a future to set the stop_event if all jobs are completed
# parallel_local.executor.submit(
# lambda: stop_event.set() if wait(tasks) else None
# )
#
# stop_event.wait()

lmseidler marked this conversation as resolved.
Show resolved Hide resolved
results = [task.result() for task in as_completed(tasks)]

# if config.general.verbosity == 0:
# print("")

lmseidler marked this conversation as resolved.
Show resolved Hide resolved
optimized_molecule: Molecule | None = None
for i, result in enumerate(results):
if result is not None:
cycles_needed = i + 1
optimized_molecule = result
break

if config.general.verbosity > 0:
print(f"Optimized mindless molecule found in {cycles_needed} cycles.")
print(optimized_molecule)

return optimized_molecule


def single_molecule_step(
config: ConfigManager,
resources_local: ResourceMonitor,
refine_engine: QMMethod,
postprocess_engine: QMMethod | None,
cycle: int,
stop_event: Event,
) -> Molecule | None:
"""Execute one step in a single molecule generation"""

if stop_event.is_set():
return None # Exit early if a molecule has already been found

if config.general.verbosity == 0:
# print the cycle in one line, not starting a new line
print("✔", end="", flush=True)
elif config.general.verbosity > 0:
# if config.general.verbosity == 0:
# # print the cycle in one line, not starting a new line
# print("✔", end="", flush=True)
if config.general.verbosity > 0:
print(f"Cycle {cycle + 1}:")

# _____ _
# / ____| | |
# | | __ ___ _ __ ___ _ __ __ _| |_ ___ _ __
Expand Down Expand Up @@ -194,10 +301,11 @@ def single_molecule_generator(
# | |
# |_|
optimized_molecule = iterative_optimization(
mol=mol,
engine=refine_engine,
config_generate=config.generate,
config_refine=config.refine,
mol,
refine_engine,
config.generate,
config.refine,
resources_local,
verbosity=config.general.verbosity,
)
except RuntimeError as e:
Expand All @@ -216,7 +324,8 @@ def single_molecule_generator(
optimized_molecule,
postprocess_engine, # type: ignore
config.postprocess,
config.general.verbosity,
resources_local,
verbosity=config.general.verbosity,
)
except RuntimeError as e:
if config.general.verbosity > 0:
Expand Down Expand Up @@ -300,3 +409,45 @@ def setup_engines(
return GXTB(path, cfg.gxtb)
else:
raise NotImplementedError("Engine not implemented.")


def setup_blocks(ncores: int, num_molecules: int) -> list[Block]:
blocks: list[Block] = []

# Maximum and minimum number of parallel processes possible
maxcores = ncores
mincores = MINCORES_PLACEHOLDER
maxprocs = max(1, ncores // mincores)
minprocs = max(1, ncores // maxcores)

# Distribute number of molecules among blocks
# First (if possible) create the maximum number of parallel blocks (maxprocs) and distribute as many molecules as possible
molecules_left = num_molecules
if molecules_left >= maxprocs:
p = maxprocs
molecules_per_block = molecules_left // p
for _ in range(p):
blocks.append(Block(molecules_per_block, ncores // p))
molecules_left -= molecules_per_block * p

# While there are more than minprocs (1) molecules left find the optimal number of parallel blocks
# Again distribute as many molecules per block as possible
while molecules_left >= minprocs:
p = max(
[
j
for j in range(minprocs, maxprocs)
if ncores % j == 0 and j <= molecules_left
]
)
molecules_per_block = molecules_left // p
for _ in range(p):
blocks.append(Block(molecules_per_block, ncores // p))
molecules_left -= molecules_per_block * p

# NOTE: using minprocs = 1 this is probably never true
if molecules_left > 0:
blocks.append(Block(molecules_left, maxcores))
molecules_left -= molecules_left

return blocks
Loading
Loading