From 08d3a905d2cfe88ecb42e0bb0d85802afbf66047 Mon Sep 17 00:00:00 2001 From: Aleksandra Galitsyna Date: Mon, 16 Oct 2023 19:06:12 -0400 Subject: [PATCH] Deduplication reporting update --- pairtools/cli/dedup.py | 18 ++++++++++ pairtools/lib/dedup.py | 79 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 92 insertions(+), 5 deletions(-) diff --git a/pairtools/cli/dedup.py b/pairtools/cli/dedup.py index 94a5c64c..17117ec6 100644 --- a/pairtools/cli/dedup.py +++ b/pairtools/cli/dedup.py @@ -236,6 +236,16 @@ help="Output stats in yaml format instead of table. [output stats format option]", ) +# Add column with the number of duplicates +@click.option( + "--count-dups/--no-count-dups", + is_flag=True, + default=False, + help="Add column to the output pairs with the number of duplicates. " + "Comparible with sklearn and scipy backends only. " + "Is not counted by default. [output dedup pairs format option]", +) + # Filtering options for reporting stats: @click.option( "--filter", @@ -388,6 +398,11 @@ def dedup_py( send_header_to_dedup = send_header_to in ["both", "dedup"] send_header_to_dup = send_header_to in ["both", "dups"] + count_dups = kwargs.get("count_dups", False) + if backend=="cython" and count_dups: + logger.warning("Not countin number of duplicates with Cython backend.") + count_dups = False + instream = fileio.auto_open( pairs_path, mode="r", @@ -486,6 +501,8 @@ def dedup_py( "Pairs file appears not to be sorted, dedup might produce wrong results." ) header = headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME) + if count_dups: + header = headerops.append_columns(header, ["n_dups"]) if send_header_to_dedup: outstream.writelines((l + "\n" for l in header)) if send_header_to_dup and outstream_dups and (outstream_dups != outstream): @@ -555,6 +572,7 @@ def dedup_py( out_stat=out_stat, backend=backend, n_proc=n_proc, + count_dups=count_dups, ) else: raise ValueError("Unknown backend") diff --git a/pairtools/lib/dedup.py b/pairtools/lib/dedup.py index 92e99deb..38c26595 100644 --- a/pairtools/lib/dedup.py +++ b/pairtools/lib/dedup.py @@ -40,7 +40,58 @@ def streaming_dedup( out_stat, backend, n_proc, + count_dups=False ): + """ + Deduplication with sklearn or scipy KD_trees backend. + + Parameters + ---------- + in_stream : file + Input stream of pairs + colnames : list of str + Names of the columns in the input file + chunksize : int + How many lines to read at a time + carryover : int + How many lines to keep in memory to match with the next chunk + method : str + 'sum' or 'max' - whether 'r' uses sum of distances on two ends of pairs, or the + maximal distance + mark_dups : bool + If True, will add "DD" as the pair_type + max_mismatch : int + Allowed distance between two pairs to call them duplicates + extra_col_pairs : list of tuples + List of extra column pairs that need to match between two reads for them be + considered duplicates (e.g. useful if alleles are annotated) + unmapped_chrom : str + Which character denotes unmapped reads in the chrom1/chrom2 fields + outstream : file + Output stream of deduplicated pairs + outstream_dups : file + Output stream of duplicates (optionally with added parent_id, see keep_parent_id option) + outstream_unmapped : file + Output stream of unmapped pairs + keep_parent_id : bool + If True, the read ID of the read that was not labelled as a duplicate from a + group of duplicates is recorded for each read marked as duplicate. + Only possible with non-cython backends. + out_stat : PairCounter + Output statistics + backend : str + 'scipy', 'sklearn'; work effectively the same + n_proc : int + How many cores to use, by default 1 + Only works for 'sklearn' backend + count_dups: bool + if True, will count the number of duplicates for each unique pair and report it as extra column. + Compatible with sklean and scipy backends only. + + Returns + ------- + + """ deduped_chunks = _dedup_stream( in_stream=in_stream, @@ -54,6 +105,7 @@ def streaming_dedup( keep_parent_id=keep_parent_id, backend=backend, n_proc=n_proc, + count_dups=count_dups ) t0 = time.time() @@ -118,6 +170,7 @@ def _dedup_stream( keep_parent_id, backend, n_proc, + count_dups=False, ): # Stream the input dataframe: dfs = pd.read_table( @@ -140,6 +193,7 @@ def _dedup_stream( extra_col_pairs=extra_col_pairs, backend=backend, n_proc=n_proc, + count_dups=count_dups ) df_marked = df_marked.loc[prev_i:, :].reset_index(drop=True) mask_duplicated = df_marked["duplicate"] @@ -165,8 +219,10 @@ def _dedup_chunk( backend, unmapped_chrom="!", n_proc=1, + count_dups=False, ): - """Mark duplicates in a dataframe of pairs + """ + Mark duplicates in a dataframe of pairs Parameters ---------- @@ -193,6 +249,8 @@ def _dedup_chunk( n_proc : int, optional How many cores to use, by default 1 Only works for 'sklearn' backend + count_dups: bool, optional + if True, will count the number of duplicates for each unique pair and report it as extra column Returns ------- @@ -220,6 +278,14 @@ def _dedup_chunk( # Set up columns to store the dedup info: df.loc[:, "clusterid"] = np.nan df.loc[:, "duplicate"] = False + if count_dups: + # If we are counting dups, the input is always a mix of already "counted" duplicates + # and new dataframe chunk. So we need to reset counter to 1 but only if the counter was missing: + if "n_duplicated" not in df.columns: + df.loc[:, "n_duplicates"] = 1 + else: + df.loc[:, "n_duplicates"] = df["n_duplicates"].fillna(1) + # Here, the name "n_duplicates" is a little bit unfair, as in fact it is the total number of reads # Split mapped and unmapped reads: mask_unmapped = (df["chrom1"] == unmapped_chrom) | (df["chrom2"] == unmapped_chrom) @@ -264,7 +330,7 @@ def _dedup_chunk( a_mat = coo_matrix((np.ones_like(a0), (a0, a1)), shape=(N_mapped, N_mapped)) # Set up inferred clusterIDs: - df_mapped.loc[:, "clusterid"] = connected_components(a_mat, directed=False)[1] + df_mapped.loc[:, "clusterid"] = connected_components(a_mat, directed=False)[1].astype(np.uint) mask_dups = df_mapped["clusterid"].duplicated() df_mapped.loc[mask_dups, "duplicate"] = True @@ -278,11 +344,14 @@ def _dedup_chunk( # Reconstruct original dataframe with removed duplicated reads: df = pd.concat([df_unmapped, df_mapped]).reset_index(drop=True) + if count_dups: + df["clusterid"] = df["clusterid"].fillna(-1) + df.loc[:, "n_duplicates"] = df.groupby("clusterid", dropna=False).transform("sum")["n_duplicates"] + df = df.set_index(index_colname) # Set up the original index df = df.drop( ["clusterid"], axis=1 - ) # Remove the information that we don't need anymore: - + ) # Remove the information that we don't need anymore return df @@ -307,7 +376,7 @@ def streaming_dedup_cython( out_stat, mark_dups, keep_parent_id=False, - readid_ind=0, + readid_ind=0 ): """ Cython-powered deduplication with online algorithm based on indexed list.