diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4162f50..767b45d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ # .pre-commit-config.yaml repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.13 + rev: v0.3.0 hooks: - id: ruff - id: ruff-format diff --git a/anglerfish/__main__.py b/anglerfish/__main__.py index 72a9288..9310eaa 100644 --- a/anglerfish/__main__.py +++ b/anglerfish/__main__.py @@ -1,4 +1,7 @@ +import multiprocessing + from .anglerfish import anglerfish if __name__ == "__main__": + multiprocessing.freeze_support() anglerfish() diff --git a/anglerfish/anglerfish.py b/anglerfish/anglerfish.py index 006c158..f7a56cf 100755 --- a/anglerfish/anglerfish.py +++ b/anglerfish/anglerfish.py @@ -3,6 +3,7 @@ import glob import gzip import logging +import multiprocessing import os import uuid from collections import Counter @@ -25,8 +26,12 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger("anglerfish") +MAX_PROCESSES = 64 # Ought to be enough for anybody + def run_demux(args): + multiprocessing.set_start_method("spawn") + if args.debug: log.setLevel(logging.DEBUG) run_uuid = str(uuid.uuid4()) @@ -51,6 +56,11 @@ def run_demux(args): ) exit() log.debug(f"Samplesheet bc_dist == {bc_dist}") + if args.threads > MAX_PROCESSES: + log.warning( + f" Setting threads to {MAX_PROCESSES} as the maximum number of processes is {MAX_PROCESSES}" + ) + args.threads = MAX_PROCESSES # Sort the adaptors by type and size adaptors_t = [(entry.adaptor.name, entry.ont_barcode) for entry in ss] @@ -103,6 +113,7 @@ def run_demux(args): flipped_i7 = False flipped_i5 = False flips = { + "original": {"i7_reversed": False, "i5_reversed": False}, "i7": {"i7_reversed": True, "i5_reversed": False}, "i5": {"i7_reversed": False, "i5_reversed": True}, "i7+i5": {"i7_reversed": True, "i5_reversed": True}, @@ -119,38 +130,58 @@ def run_demux(args): ) flipped_i7, flipped_i5 = flips[args.force_rc].values() elif args.lenient: # Try reverse complementing the I5 and/or i7 indices and choose the best match - no_matches, matches = cluster_matches( - adaptors_sorted[key], fragments, args.max_distance - ) flipped = {} + results = [] + pool = multiprocessing.Pool( + processes=4 if args.threads >= 4 else args.threads + ) + results = [] for flip, rev in flips.items(): - rc_no_matches, rc_matches = cluster_matches( - adaptors_sorted[key], fragments, args.max_distance, **rev + spawn = pool.apply_async( + cluster_matches, + args=( + adaptors_sorted[key], + fragments, + args.max_distance, + rev["i7_reversed"], + rev["i5_reversed"], + ), ) - flipped[flip] = (rc_matches, rc_no_matches, len(rc_matches)) - best_flip = max(flipped, key=lambda k: flipped[k][2]) + results.append((spawn, flip)) + pool.close() + pool.join() + flipped = {result[1]: result[0].get() for result in results} + + best_flip = max(flipped, key=lambda k: len(flipped[k][1])) - # There are no barcode flips with unambiguously more matches, so we abort if ( - sorted([i[2] for i in flipped.values()])[-1] - == sorted([i[2] for i in flipped.values()])[-2] + sorted([len(i[1]) for i in flipped.values()])[-1] + == sorted([len(i[1]) for i in flipped.values()])[-2] ): - log.info( - "Could not find any barcode reverse complements with unambiguously more matches" + log.warning( + " Lenient mode: Could not find any barcode reverse complements with unambiguously more matches. Using original index orientation for all adaptors. Please study the results carefully!" ) - elif flipped[best_flip][2] > len(matches) * args.lenient_factor: + no_matches, matches = flipped["original"] + elif ( + best_flip != "None" + and len(flipped[best_flip][1]) + > len(flipped["original"][1]) * args.lenient_factor + ): log.info( - f" Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches" + f" Lenient mode: Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches" ) - matches, no_matches, _ = flipped[best_flip] - flipped_i7, flipped_i5 = flips[best_flip].values() + no_matches, matches = flipped[best_flip] else: - log.info(f" Using original index orientation for {adaptor_name}") + log.info( + f" Lenient mode: using original index orientation for {adaptor_name}" + ) + no_matches, matches = flipped["original"] else: no_matches, matches = cluster_matches( adaptors_sorted[key], fragments, args.max_distance ) + out_pool = [] for k, v in groupby(sorted(matches, key=lambda x: x[3]), key=lambda y: y[3]): # To avoid collisions in fastq filenames, we add the ONT barcode to the sample name fq_prefix = k @@ -179,7 +210,21 @@ def run_demux(args): ) report.add_sample_stat(sample_stat) if not args.skip_demux: - write_demuxedfastq(sample_dict, fastq_path, fq_name) + out_pool.append((sample_dict, fastq_path, fq_name)) + + # Write demuxed fastq files + pool = multiprocessing.Pool(processes=args.threads) + results = [] + for out in out_pool: + log.debug(f" Writing {out[2]}") + spawn = pool.starmap_async(write_demuxedfastq, [out]) + results.append((spawn, out[2])) + pool.close() + pool.join() + for result in results: + log.debug( + f" PID-{result[0].get()}: wrote {result[1]}, size {os.path.getsize(result[1])} bytes" + ) # Top unmatched indexes nomatch_count = Counter([x[3] for x in no_matches]) @@ -224,7 +269,11 @@ def anglerfish(): help="Analysis output folder (default: Current dir)", ) parser.add_argument( - "--threads", "-t", default=4, help="Number of threads to use (default: 4)" + "--threads", + "-t", + default=4, + type=int, + help="Number of threads to use (default: 4)", ) parser.add_argument( "--skip_demux", diff --git a/anglerfish/demux/demux.py b/anglerfish/demux/demux.py index f6220ea..5abb564 100644 --- a/anglerfish/demux/demux.py +++ b/anglerfish/demux/demux.py @@ -233,8 +233,11 @@ def cluster_matches( def write_demuxedfastq(beds, fastq_in, fastq_out): """ + Intended for multiprocessing Take a set of coordinates in bed format [[seq1, start, end, ..][seq2, ..]] from over a set of fastq entries in the input files and do extraction. + + Return: PID of the process """ gz_buf = 131072 fq_files = glob.glob(fastq_in) @@ -263,4 +266,5 @@ def write_demuxedfastq(beds, fastq_in, fastq_out): outfqs += "+\n" outfqs += f"{qual[bed[1] : bed[2]]}\n" oz.stdin.write(outfqs.encode("utf-8")) - log.debug(f" Wrote {fastq_out}, size: {os.path.getsize(fastq_out)} bytes") + + return os.getpid() diff --git a/anglerfish/explore/explore.py b/anglerfish/explore/explore.py index 8f72498..fe0f846 100644 --- a/anglerfish/explore/explore.py +++ b/anglerfish/explore/explore.py @@ -134,9 +134,9 @@ def run_explore( ) match_col_df = match_col_df.astype({"match_1_len": "int32"}) - df_good_hits.loc[ - match_col_df.index, match_col_df.columns - ] = match_col_df + df_good_hits.loc[match_col_df.index, match_col_df.columns] = ( + match_col_df + ) thres = round( ( diff --git a/setup.py b/setup.py index 3102168..47d8660 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ conda install -c bioconda anglerfish """ + from pathlib import Path from setuptools import find_packages, setup @@ -17,7 +18,7 @@ this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() -version = "0.6.1" +version = "0.7.0dev0" setup( name="bio-anglerfish",