Skip to content

Commit

Permalink
Merge pull request #65 from remiolsen/parallel_fastq_out
Browse files Browse the repository at this point in the history
Support for multiprocessed fastq output
  • Loading branch information
remiolsen authored Mar 21, 2024
2 parents db19f30 + 6c0baad commit 762d826
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# .pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.13
rev: v0.3.0
hooks:
- id: ruff
- id: ruff-format
Expand Down
3 changes: 3 additions & 0 deletions anglerfish/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import multiprocessing

from .anglerfish import anglerfish

if __name__ == "__main__":
multiprocessing.freeze_support()
anglerfish()
87 changes: 68 additions & 19 deletions anglerfish/anglerfish.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import glob
import gzip
import logging
import multiprocessing
import os
import uuid
from collections import Counter
Expand All @@ -25,8 +26,12 @@
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("anglerfish")

MAX_PROCESSES = 64 # Ought to be enough for anybody


def run_demux(args):
multiprocessing.set_start_method("spawn")

if args.debug:
log.setLevel(logging.DEBUG)
run_uuid = str(uuid.uuid4())
Expand All @@ -51,6 +56,11 @@ def run_demux(args):
)
exit()
log.debug(f"Samplesheet bc_dist == {bc_dist}")
if args.threads > MAX_PROCESSES:
log.warning(
f" Setting threads to {MAX_PROCESSES} as the maximum number of processes is {MAX_PROCESSES}"
)
args.threads = MAX_PROCESSES

# Sort the adaptors by type and size
adaptors_t = [(entry.adaptor.name, entry.ont_barcode) for entry in ss]
Expand Down Expand Up @@ -103,6 +113,7 @@ def run_demux(args):
flipped_i7 = False
flipped_i5 = False
flips = {
"original": {"i7_reversed": False, "i5_reversed": False},
"i7": {"i7_reversed": True, "i5_reversed": False},
"i5": {"i7_reversed": False, "i5_reversed": True},
"i7+i5": {"i7_reversed": True, "i5_reversed": True},
Expand All @@ -119,38 +130,58 @@ def run_demux(args):
)
flipped_i7, flipped_i5 = flips[args.force_rc].values()
elif args.lenient: # Try reverse complementing the I5 and/or i7 indices and choose the best match
no_matches, matches = cluster_matches(
adaptors_sorted[key], fragments, args.max_distance
)
flipped = {}
results = []
pool = multiprocessing.Pool(
processes=4 if args.threads >= 4 else args.threads
)
results = []
for flip, rev in flips.items():
rc_no_matches, rc_matches = cluster_matches(
adaptors_sorted[key], fragments, args.max_distance, **rev
spawn = pool.apply_async(
cluster_matches,
args=(
adaptors_sorted[key],
fragments,
args.max_distance,
rev["i7_reversed"],
rev["i5_reversed"],
),
)
flipped[flip] = (rc_matches, rc_no_matches, len(rc_matches))
best_flip = max(flipped, key=lambda k: flipped[k][2])
results.append((spawn, flip))
pool.close()
pool.join()
flipped = {result[1]: result[0].get() for result in results}

best_flip = max(flipped, key=lambda k: len(flipped[k][1]))

# There are no barcode flips with unambiguously more matches, so we abort
if (
sorted([i[2] for i in flipped.values()])[-1]
== sorted([i[2] for i in flipped.values()])[-2]
sorted([len(i[1]) for i in flipped.values()])[-1]
== sorted([len(i[1]) for i in flipped.values()])[-2]
):
log.info(
"Could not find any barcode reverse complements with unambiguously more matches"
log.warning(
" Lenient mode: Could not find any barcode reverse complements with unambiguously more matches. Using original index orientation for all adaptors. Please study the results carefully!"
)
elif flipped[best_flip][2] > len(matches) * args.lenient_factor:
no_matches, matches = flipped["original"]
elif (
best_flip != "None"
and len(flipped[best_flip][1])
> len(flipped["original"][1]) * args.lenient_factor
):
log.info(
f" Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches"
f" Lenient mode: Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches"
)
matches, no_matches, _ = flipped[best_flip]
flipped_i7, flipped_i5 = flips[best_flip].values()
no_matches, matches = flipped[best_flip]
else:
log.info(f" Using original index orientation for {adaptor_name}")
log.info(
f" Lenient mode: using original index orientation for {adaptor_name}"
)
no_matches, matches = flipped["original"]
else:
no_matches, matches = cluster_matches(
adaptors_sorted[key], fragments, args.max_distance
)

out_pool = []
for k, v in groupby(sorted(matches, key=lambda x: x[3]), key=lambda y: y[3]):
# To avoid collisions in fastq filenames, we add the ONT barcode to the sample name
fq_prefix = k
Expand Down Expand Up @@ -179,7 +210,21 @@ def run_demux(args):
)
report.add_sample_stat(sample_stat)
if not args.skip_demux:
write_demuxedfastq(sample_dict, fastq_path, fq_name)
out_pool.append((sample_dict, fastq_path, fq_name))

# Write demuxed fastq files
pool = multiprocessing.Pool(processes=args.threads)
results = []
for out in out_pool:
log.debug(f" Writing {out[2]}")
spawn = pool.starmap_async(write_demuxedfastq, [out])
results.append((spawn, out[2]))
pool.close()
pool.join()
for result in results:
log.debug(
f" PID-{result[0].get()}: wrote {result[1]}, size {os.path.getsize(result[1])} bytes"
)

# Top unmatched indexes
nomatch_count = Counter([x[3] for x in no_matches])
Expand Down Expand Up @@ -224,7 +269,11 @@ def anglerfish():
help="Analysis output folder (default: Current dir)",
)
parser.add_argument(
"--threads", "-t", default=4, help="Number of threads to use (default: 4)"
"--threads",
"-t",
default=4,
type=int,
help="Number of threads to use (default: 4)",
)
parser.add_argument(
"--skip_demux",
Expand Down
6 changes: 5 additions & 1 deletion anglerfish/demux/demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ def cluster_matches(

def write_demuxedfastq(beds, fastq_in, fastq_out):
"""
Intended for multiprocessing
Take a set of coordinates in bed format [[seq1, start, end, ..][seq2, ..]]
from over a set of fastq entries in the input files and do extraction.
Return: PID of the process
"""
gz_buf = 131072
fq_files = glob.glob(fastq_in)
Expand Down Expand Up @@ -263,4 +266,5 @@ def write_demuxedfastq(beds, fastq_in, fastq_out):
outfqs += "+\n"
outfqs += f"{qual[bed[1] : bed[2]]}\n"
oz.stdin.write(outfqs.encode("utf-8"))
log.debug(f" Wrote {fastq_out}, size: {os.path.getsize(fastq_out)} bytes")

return os.getpid()
6 changes: 3 additions & 3 deletions anglerfish/explore/explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def run_explore(
)
match_col_df = match_col_df.astype({"match_1_len": "int32"})

df_good_hits.loc[
match_col_df.index, match_col_df.columns
] = match_col_df
df_good_hits.loc[match_col_df.index, match_col_df.columns] = (
match_col_df
)

thres = round(
(
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
conda install -c bioconda anglerfish
"""

from pathlib import Path

from setuptools import find_packages, setup

this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()

version = "0.6.1"
version = "0.7.0dev0"

setup(
name="bio-anglerfish",
Expand Down

0 comments on commit 762d826

Please sign in to comment.