Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multiprocessed fastq output #65

Merged
merged 6 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# .pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.13
rev: v0.3.0
hooks:
- id: ruff
- id: ruff-format
Expand Down
3 changes: 3 additions & 0 deletions anglerfish/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import multiprocessing

from .anglerfish import anglerfish

if __name__ == "__main__":
multiprocessing.freeze_support()
anglerfish()
87 changes: 68 additions & 19 deletions anglerfish/anglerfish.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import glob
import gzip
import logging
import multiprocessing
import os
import uuid
from collections import Counter
Expand All @@ -25,8 +26,12 @@
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("anglerfish")

MAX_PROCESSES = 64 # Ought to be enough for anybody


def run_demux(args):
multiprocessing.set_start_method("spawn")

if args.debug:
log.setLevel(logging.DEBUG)
run_uuid = str(uuid.uuid4())
Expand All @@ -51,6 +56,11 @@ def run_demux(args):
)
exit()
log.debug(f"Samplesheet bc_dist == {bc_dist}")
if args.threads > MAX_PROCESSES:
log.warning(
f" Setting threads to {MAX_PROCESSES} as the maximum number of processes is {MAX_PROCESSES}"
)
args.threads = MAX_PROCESSES

# Sort the adaptors by type and size
adaptors_t = [(entry.adaptor.name, entry.ont_barcode) for entry in ss]
Expand Down Expand Up @@ -103,6 +113,7 @@ def run_demux(args):
flipped_i7 = False
flipped_i5 = False
flips = {
"original": {"i7_reversed": False, "i5_reversed": False},
"i7": {"i7_reversed": True, "i5_reversed": False},
"i5": {"i7_reversed": False, "i5_reversed": True},
"i7+i5": {"i7_reversed": True, "i5_reversed": True},
Expand All @@ -119,38 +130,58 @@ def run_demux(args):
)
flipped_i7, flipped_i5 = flips[args.force_rc].values()
elif args.lenient: # Try reverse complementing the I5 and/or i7 indices and choose the best match
no_matches, matches = cluster_matches(
adaptors_sorted[key], fragments, args.max_distance
)
flipped = {}
results = []
pool = multiprocessing.Pool(
processes=4 if args.threads >= 4 else args.threads
)
results = []
for flip, rev in flips.items():
rc_no_matches, rc_matches = cluster_matches(
adaptors_sorted[key], fragments, args.max_distance, **rev
spawn = pool.apply_async(
cluster_matches,
args=(
adaptors_sorted[key],
fragments,
args.max_distance,
rev["i7_reversed"],
rev["i5_reversed"],
),
)
flipped[flip] = (rc_matches, rc_no_matches, len(rc_matches))
best_flip = max(flipped, key=lambda k: flipped[k][2])
results.append((spawn, flip))
pool.close()
pool.join()
flipped = {result[1]: result[0].get() for result in results}

best_flip = max(flipped, key=lambda k: len(flipped[k][1]))

# There are no barcode flips with unambiguously more matches, so we abort
if (
sorted([i[2] for i in flipped.values()])[-1]
== sorted([i[2] for i in flipped.values()])[-2]
sorted([len(i[1]) for i in flipped.values()])[-1]
== sorted([len(i[1]) for i in flipped.values()])[-2]
):
log.info(
"Could not find any barcode reverse complements with unambiguously more matches"
log.warning(
" Lenient mode: Could not find any barcode reverse complements with unambiguously more matches. Using original index orientation for all adaptors. Please study the results carefully!"
)
elif flipped[best_flip][2] > len(matches) * args.lenient_factor:
no_matches, matches = flipped["original"]
elif (
best_flip != "None"
and len(flipped[best_flip][1])
> len(flipped["original"][1]) * args.lenient_factor
):
log.info(
f" Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches"
f" Lenient mode: Reverse complementing {best_flip} index for adaptor {adaptor_name} found at least {args.lenient_factor} times more matches"
)
matches, no_matches, _ = flipped[best_flip]
flipped_i7, flipped_i5 = flips[best_flip].values()
no_matches, matches = flipped[best_flip]
else:
log.info(f" Using original index orientation for {adaptor_name}")
log.info(
f" Lenient mode: using original index orientation for {adaptor_name}"
)
no_matches, matches = flipped["original"]
else:
no_matches, matches = cluster_matches(
adaptors_sorted[key], fragments, args.max_distance
)

out_pool = []
for k, v in groupby(sorted(matches, key=lambda x: x[3]), key=lambda y: y[3]):
# To avoid collisions in fastq filenames, we add the ONT barcode to the sample name
fq_prefix = k
Expand Down Expand Up @@ -179,7 +210,21 @@ def run_demux(args):
)
report.add_sample_stat(sample_stat)
if not args.skip_demux:
write_demuxedfastq(sample_dict, fastq_path, fq_name)
out_pool.append((sample_dict, fastq_path, fq_name))

# Write demuxed fastq files
pool = multiprocessing.Pool(processes=args.threads)
results = []
for out in out_pool:
log.debug(f" Writing {out[2]}")
spawn = pool.starmap_async(write_demuxedfastq, [out])
results.append((spawn, out[2]))
pool.close()
pool.join()
for result in results:
log.debug(
f" PID-{result[0].get()}: wrote {result[1]}, size {os.path.getsize(result[1])} bytes"
)

# Top unmatched indexes
nomatch_count = Counter([x[3] for x in no_matches])
Expand Down Expand Up @@ -224,7 +269,11 @@ def anglerfish():
help="Analysis output folder (default: Current dir)",
)
parser.add_argument(
"--threads", "-t", default=4, help="Number of threads to use (default: 4)"
"--threads",
"-t",
default=4,
type=int,
help="Number of threads to use (default: 4)",
)
parser.add_argument(
"--skip_demux",
Expand Down
6 changes: 5 additions & 1 deletion anglerfish/demux/demux.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ def cluster_matches(

def write_demuxedfastq(beds, fastq_in, fastq_out):
"""
Intended for multiprocessing
Take a set of coordinates in bed format [[seq1, start, end, ..][seq2, ..]]
from over a set of fastq entries in the input files and do extraction.

Return: PID of the process
"""
gz_buf = 131072
fq_files = glob.glob(fastq_in)
Expand Down Expand Up @@ -263,4 +266,5 @@ def write_demuxedfastq(beds, fastq_in, fastq_out):
outfqs += "+\n"
outfqs += f"{qual[bed[1] : bed[2]]}\n"
oz.stdin.write(outfqs.encode("utf-8"))
log.debug(f" Wrote {fastq_out}, size: {os.path.getsize(fastq_out)} bytes")

return os.getpid()
6 changes: 3 additions & 3 deletions anglerfish/explore/explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def run_explore(
)
match_col_df = match_col_df.astype({"match_1_len": "int32"})

df_good_hits.loc[
match_col_df.index, match_col_df.columns
] = match_col_df
df_good_hits.loc[match_col_df.index, match_col_df.columns] = (
match_col_df
)

thres = round(
(
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

conda install -c bioconda anglerfish
"""

from pathlib import Path

from setuptools import find_packages, setup

this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()

version = "0.6.1"
version = "0.7.0dev0"

setup(
name="bio-anglerfish",
Expand Down
Loading