From 65777b36afb57406d3619422256dcae24116eb1a Mon Sep 17 00:00:00 2001 From: Remi-Andre Olsen Date: Tue, 23 Jan 2024 17:04:08 +0100 Subject: [PATCH 1/6] Support for multiprocessed fastq output --- anglerfish/anglerfish.py | 26 ++++++++++++++++++++++++-- anglerfish/demux/demux.py | 6 +++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/anglerfish/anglerfish.py b/anglerfish/anglerfish.py index 006c158..4638ef6 100755 --- a/anglerfish/anglerfish.py +++ b/anglerfish/anglerfish.py @@ -3,6 +3,7 @@ import glob import gzip import logging +import multiprocessing import os import uuid from collections import Counter @@ -27,6 +28,8 @@ def run_demux(args): + multiprocessing.set_start_method("spawn") + if args.debug: log.setLevel(logging.DEBUG) run_uuid = str(uuid.uuid4()) @@ -151,6 +154,7 @@ def run_demux(args): adaptors_sorted[key], fragments, args.max_distance ) + out_pool = [] for k, v in groupby(sorted(matches, key=lambda x: x[3]), key=lambda y: y[3]): # To avoid collisions in fastq filenames, we add the ONT barcode to the sample name fq_prefix = k @@ -179,7 +183,21 @@ def run_demux(args): ) report.add_sample_stat(sample_stat) if not args.skip_demux: - write_demuxedfastq(sample_dict, fastq_path, fq_name) + out_pool.append((sample_dict, fastq_path, fq_name)) + + # Write demuxed fastq files + pool = multiprocessing.Pool(processes=args.threads) + results = [] + for out in out_pool: + log.debug(f" Writing {out[2]}") + spawn = pool.starmap_async(write_demuxedfastq, [out]) + results.append((spawn, out[2])) + pool.close() + pool.join() + for result in results: + log.debug( + f" PID-{result[0].get()}: wrote {result[1]}, size {os.path.getsize(result[1])} bytes" + ) # Top unmatched indexes nomatch_count = Counter([x[3] for x in no_matches]) @@ -224,7 +242,11 @@ def anglerfish(): help="Analysis output folder (default: Current dir)", ) parser.add_argument( - "--threads", "-t", default=4, help="Number of threads to use (default: 4)" + "--threads", + "-t", + default=4, + type=int, + help="Number of threads to use (default: 4)", ) parser.add_argument( "--skip_demux", diff --git a/anglerfish/demux/demux.py b/anglerfish/demux/demux.py index f6220ea..5abb564 100644 --- a/anglerfish/demux/demux.py +++ b/anglerfish/demux/demux.py @@ -233,8 +233,11 @@ def cluster_matches( def write_demuxedfastq(beds, fastq_in, fastq_out): """ + Intended for multiprocessing Take a set of coordinates in bed format [[seq1, start, end, ..][seq2, ..]] from over a set of fastq entries in the input files and do extraction. + + Return: PID of the process """ gz_buf = 131072 fq_files = glob.glob(fastq_in) @@ -263,4 +266,5 @@ def write_demuxedfastq(beds, fastq_in, fastq_out): outfqs += "+\n" outfqs += f"{qual[bed[1] : bed[2]]}\n" oz.stdin.write(outfqs.encode("utf-8")) - log.debug(f" Wrote {fastq_out}, size: {os.path.getsize(fastq_out)} bytes") + + return os.getpid() From e119808fbce679f16381e1e77dc60f8d8230098f Mon Sep 17 00:00:00 2001 From: Remi-Andre Olsen Date: Wed, 6 Mar 2024 11:02:51 +0100 Subject: [PATCH 2/6] Bump to dev version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3102168..71d6968 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() -version = "0.6.1" +version = "0.7.0dev0" setup( name="bio-anglerfish", From 5bbc42aa1fc9db6018593e27b8e4b95046d882d2 Mon Sep 17 00:00:00 2001 From: Remi-Andre Olsen Date: Wed, 6 Mar 2024 15:20:15 +0100 Subject: [PATCH 3/6] Re-ruff to version 0.3.0 --- .pre-commit-config.yaml | 2 +- anglerfish/explore/explore.py | 6 +++--- setup.py | 1 + 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4162f50..767b45d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ # .pre-commit-config.yaml repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.13 + rev: v0.3.0 hooks: - id: ruff - id: ruff-format diff --git a/anglerfish/explore/explore.py b/anglerfish/explore/explore.py index 8f72498..fe0f846 100644 --- a/anglerfish/explore/explore.py +++ b/anglerfish/explore/explore.py @@ -134,9 +134,9 @@ def run_explore( ) match_col_df = match_col_df.astype({"match_1_len": "int32"}) - df_good_hits.loc[ - match_col_df.index, match_col_df.columns - ] = match_col_df + df_good_hits.loc[match_col_df.index, match_col_df.columns] = ( + match_col_df + ) thres = round( ( diff --git a/setup.py b/setup.py index 71d6968..47d8660 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ conda install -c bioconda anglerfish """ + from pathlib import Path from setuptools import find_packages, setup From 9fe7cfcdc416fca8657921afaf7f3f7bfd23ada1 Mon Sep 17 00:00:00 2001 From: Remi-Andre Olsen Date: Wed, 13 Mar 2024 10:52:51 +0100 Subject: [PATCH 4/6] Working version of parallel lenient --- anglerfish/__main__.py | 3 +++ anglerfish/anglerfish.py | 50 +++++++++++++++++++++++++++------------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/anglerfish/__main__.py b/anglerfish/__main__.py index 72a9288..9310eaa 100644 --- a/anglerfish/__main__.py +++ b/anglerfish/__main__.py @@ -1,4 +1,7 @@ +import multiprocessing + from .anglerfish import anglerfish if __name__ == "__main__": + multiprocessing.freeze_support() anglerfish() diff --git a/anglerfish/anglerfish.py b/anglerfish/anglerfish.py index 4638ef6..404283f 100755 --- a/anglerfish/anglerfish.py +++ b/anglerfish/anglerfish.py @@ -106,6 +106,7 @@ def run_demux(args): flipped_i7 = False flipped_i5 = False flips = { + "original": {"i7_reversed": False, "i5_reversed": False}, "i7": {"i7_reversed": True, "i5_reversed": False}, "i5": {"i7_reversed": False, "i5_reversed": True}, "i7+i5": {"i7_reversed": True, "i5_reversed": True}, @@ -122,33 +123,50 @@ def run_demux(args): ) flipped_i7, flipped_i5 = flips[args.force_rc].values() elif args.lenient: # Try reverse complementing the I5 and/or i7 indices and choose the best match - no_matches, matches = cluster_matches( - adaptors_sorted[key], fragments, args.max_distance - ) flipped = {} + results = [] + pool = multiprocessing.Pool( + processes=4 if args.threads >= 4 else args.threads + ) + results = [] for flip, rev in flips.items(): - rc_no_matches, rc_matches = cluster_matches( - adaptors_sorted[key], fragments, args.max_distance, **rev + spawn = pool.apply_async( + cluster_matches, + args=( + adaptors_sorted[key], + fragments, + args.max_distance, + rev["i7_reversed"], + rev["i5_reversed"], + ), ) - flipped[flip] = (rc_matches, rc_no_matches, len(rc_matches)) - best_flip = max(flipped, key=lambda k: flipped[k][2]) + results.append((spawn, flip)) + pool.close() + pool.join() + flipped = {result[1]: result[0].get() for result in results} + + best_flip = max(flipped, key=lambda k: len(flipped[k][1])) - # There are no barcode flips with unambiguously more matches, so we abort if ( - sorted([i[2] for i in flipped.values()])[-1] - == sorted([i[2] for i in flipped.values()])[-2] + sorted([len(i[1]) for i in flipped.values()])[-1] + == sorted([len(i[1]) for i in flipped.values()])[-2] ): log.info( - "Could not find any barcode reverse complements with unambiguously more matches" + " Lenient mode: Could not find any barcode reverse complements with unambiguously more matches" ) - elif flipped[best_flip][2] > len(matches) * args.lenient_factor: + elif ( + best_flip != "None" + and len(flipped[best_flip][1]) + > len(flipped["original"][1]) * args.lenient_factor + ): log.info( - f" Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches" + f" Lenient mode: Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches" ) - matches, no_matches, _ = flipped[best_flip] - flipped_i7, flipped_i5 = flips[best_flip].values() + no_matches, matches = flipped[best_flip] else: - log.info(f" Using original index orientation for {adaptor_name}") + log.info( + f" Lenient mode: using original index orientation for {adaptor_name}" + ) else: no_matches, matches = cluster_matches( adaptors_sorted[key], fragments, args.max_distance From 40825d41861a3a29a4bdbf794fefedda703fbec0 Mon Sep 17 00:00:00 2001 From: Remi-Andre Olsen Date: Wed, 13 Mar 2024 11:26:37 +0100 Subject: [PATCH 5/6] Make lenient mode more lenient and add more explaination for ambiguous outcome --- anglerfish/anglerfish.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/anglerfish/anglerfish.py b/anglerfish/anglerfish.py index 404283f..b9259b3 100755 --- a/anglerfish/anglerfish.py +++ b/anglerfish/anglerfish.py @@ -26,6 +26,8 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger("anglerfish") +MAX_PROCESSES = 256 # Ought to be enough for anybody + def run_demux(args): multiprocessing.set_start_method("spawn") @@ -54,6 +56,11 @@ def run_demux(args): ) exit() log.debug(f"Samplesheet bc_dist == {bc_dist}") + if args.threads > MAX_PROCESSES: + log.warning( + f" Setting threads to {MAX_PROCESSES} as the maximum number of processes is {MAX_PROCESSES}" + ) + args.threads = MAX_PROCESSES # Sort the adaptors by type and size adaptors_t = [(entry.adaptor.name, entry.ont_barcode) for entry in ss] @@ -151,9 +158,10 @@ def run_demux(args): sorted([len(i[1]) for i in flipped.values()])[-1] == sorted([len(i[1]) for i in flipped.values()])[-2] ): - log.info( - " Lenient mode: Could not find any barcode reverse complements with unambiguously more matches" + log.warning( + " Lenient mode: Could not find any barcode reverse complements with unambiguously more matches. Using original index orientation for all adaptors. Please study the results carefully!" ) + no_matches, matches = flipped["original"] elif ( best_flip != "None" and len(flipped[best_flip][1]) @@ -167,6 +175,7 @@ def run_demux(args): log.info( f" Lenient mode: using original index orientation for {adaptor_name}" ) + no_matches, matches = flipped["original"] else: no_matches, matches = cluster_matches( adaptors_sorted[key], fragments, args.max_distance From 6c0baad53588d7b3d6c60c8dbbf80ebf84d08ff6 Mon Sep 17 00:00:00 2001 From: Remi-Andre Olsen Date: Wed, 13 Mar 2024 13:41:05 +0100 Subject: [PATCH 6/6] Reduce max procs --- anglerfish/anglerfish.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anglerfish/anglerfish.py b/anglerfish/anglerfish.py index b9259b3..f7a56cf 100755 --- a/anglerfish/anglerfish.py +++ b/anglerfish/anglerfish.py @@ -26,7 +26,7 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger("anglerfish") -MAX_PROCESSES = 256 # Ought to be enough for anybody +MAX_PROCESSES = 64 # Ought to be enough for anybody def run_demux(args):