Skip to content

Commit

Permalink
increased code coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
soymintc committed Jan 27, 2025
1 parent 985cd5a commit cff5d5b
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 122 deletions.
118 changes: 1 addition & 117 deletions hairloom/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,48 +79,6 @@ def extract_read_data(bam:pysam.AlignmentFile, contig:str, start=None, end=None,
return df


def pull_breakpoints_from_reads_in_sv_regions(bam, tra, get_read_table=False, min_n_breakpoint=2, margin=10) -> list:
"""Extract and append ``BreakpointChain`` objects from a bam file and a table of SVs
Args:
bam (pysam.AlignmentFile): BAM file
tra (pandas.DataFrame): Table of SVs
get_read_table (bool, optional): Return table of read alignment stats. Defaults to False.
min_n_breakpoint (int, optional): Minimum number of breakpoints required to be saved. Useful in selecting complex rearrangements if the number is high. Defaults to 3.
margin (int, optional): Margin (bp) from breakpoints to fetch reads. Defaults to 10.
Returns:
list[BreakpointChain]: A list of BreakpointChain objects
"""
bundle = []
saved_qnames = set()
canonical_chroms = [str(c) for c in range(1, 22+1)] + ['X', 'Y']
canonical_chroms += ['chr'+c for c in canonical_chroms]
read_info = pd.DataFrame()
for _, row in tra.iterrows():
# row = tra.iloc[6] # breakpoint pair: SV
chrom1 = row['chromosome_1']
pos1 = row['position_1']
chrom2 = row['chromosome_2']
pos2 = row['position_2']
reads1 = bam.fetch(chrom1, pos1-margin-1, pos1+margin)
reads2 = bam.fetch(chrom2, pos2-margin-1, pos2+margin)
for reads in [reads1, reads2]:
alignments = extract_split_alignments(reads)
df = make_split_read_table(alignments)
df = df[~df['qname'].isin(saved_qnames)] # don't double-count breakpoints already saved
read_info = pd.concat([read_info, df])
df = df[df['chrom'].isin(canonical_chroms)] # remove segments in decoys
_qnames = set(df['qname'].unique().tolist())
saved_qnames.update(_qnames)
_bundle = make_bundle(df)
for brks in _bundle:
if len(brks) >= min_n_breakpoint:
bundle.append(brks)
if get_read_table:
return bundle, read_info
return bundle

def make_bundle(reads):
"""Make a list of ``BreapointChain`` based on alignment table
Expand Down Expand Up @@ -332,78 +290,4 @@ def find_presence_of_matching_sv(sv1, sv2, margin=50):
match = _match
else:
match |= _match
return match

def make_tumor_sv_table(bundle:list, sv=None, margin=10, get_support=True) -> pd.DataFrame:
"""Make SV table from list of ``BreakpointChain``
Args:
bundle (list): List of ``BreakpointChain``
sv (pandas.DataFrame, optional): Table of source SVs as reference for `in_source` flag. Defaults to None
margin (int, optional): Margin (bp) for merging clustered breakpoints. Defaults to 10.
get_support (bool, optional): Merge breakpoints with same coordinates and add count as `support`. Defaults to True.
Returns:
pandas.DataFrame: SV table from bundle [, with `in_source` labels] [, collapsed by coordinate with support counts]
"""
breakpoint_support = get_breakpoint_support_from_bundle(bundle)
coord_map, coord_map_log = map_similar_coordinate_to_higher_rank(bundle, breakpoint_support, margin=margin)
bundle = fix_lower_support_coordinates(bundle, coord_map)
# breakpoint_support_post = get_breakpoint_support_from_bundle(bundle)
sv_cols = ['chromosome_1', 'position_1', 'strand_1', 'chromosome_2', 'position_2', 'strand_2', 'type']
data = []
for brks in bundle:
for tra in brks.tras:
svtype = get_svtype(tra)
chrom1, chrom2 = tra.brk1.chrom, tra.brk2.chrom
pos1, pos2 = tra.brk1.pos, tra.brk2.pos
ori1, ori2 = tra.brk1.ori, tra.brk2.ori
field = [chrom1, pos1, ori1, chrom2, pos2, ori2, svtype]
data.append(field)
tumor_sv = pd.DataFrame(data, columns=sv_cols)
tumor_sv = normalize_sv_table(tumor_sv)
ix_cols = ['chromosome_1', 'position_1', 'strand_1', 'chromosome_2', 'position_2', 'strand_2', 'type']
if get_support:
tumor_sv = tumor_sv.groupby(ix_cols).value_counts().reset_index()
tumor_sv.rename(columns={'count':'support'}, inplace=True)
if type(sv) == pd.DataFrame:
tumor_sv['in_source'] = find_presence_of_matching_sv(tumor_sv, sv, margin=50)
return tumor_sv

def get_normalized_sv(tra:BreakpointPair) -> tuple:
"""
Sorts (normalizes) a ``BreakpointPair`` based on chromosome and position order.
This function ensures a consistent ordering of breakpoints in a ``BreakpointPair``
by sorting them based on chromosome precedence and genomic position.
Args:
tra (``BreakpointPair``): A pair of breakpoints to normalize. Each breakpoint
in the pair is expected to have the attributes `chrom`, `pos`, and `ori`.
Returns:
tuple: A flattened tuple of the normalized breakpoint coordinates in the format:
[chrom1, pos1, ori1, chrom2, pos2, ori2].
Notes:
- Chromosomes are sorted based on their order in ``Breakpoint.chroms``.
- If the chromosomes are the same, the breakpoints are sorted by position.
- The orientation (`ori`) remains associated with its respective breakpoint.
Example:
>>> from your_module import Breakpoint, BreakpointPair, get_normalized_sv
>>> brk1 = Breakpoint("chr2", 100, "+")
>>> brk2 = Breakpoint("chr1", 200, "-")
>>> pair = BreakpointPair(brk1, brk2)
>>> get_normalized_sv(pair)
('chr1', 200, '-', 'chr2', 100, '+')
"""
chrom1, pos1, ori1 = tra.brk1.chrom, tra.brk1.pos, tra.brk1.ori
chrom2, pos2, ori2 = tra.brk2.chrom, tra.brk2.pos, tra.brk2.ori
if Breakpoint.chroms.index(chrom1) < Breakpoint.chroms.index(chrom1):
chrom1, pos1, ori1, chrom2, pos2, ori2 = chrom2, pos2, ori2, chrom1, pos1, ori1
elif chrom1 == chrom2:
if pos1 > pos2:
chrom1, pos1, ori1, chrom2, pos2, ori2 = chrom2, pos2, ori2, chrom1, pos1, ori1
return (chrom1, pos1, ori1, chrom2, pos2, ori2)

return match
149 changes: 146 additions & 3 deletions tests/test_collect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import pytest
from collections import Counter
from unittest.mock import MagicMock
import importlib.resources as pkg_resources

import pysam
Expand All @@ -11,7 +12,7 @@

from hairloom.collect import (map_similar_coordinate_to_higher_rank, fix_lower_support_coordinates,
get_breakpoint_support_from_bundle, normalize_sv_table, pull_sv_supporting_reads_from_bundle, find_presence_of_matching_sv,
extract_read_data)
extract_read_data, make_bundle, get_svtype)

@pytest.fixture
def bundle():
Expand All @@ -32,6 +33,86 @@ def bundle():
bundle = [BreakpointChain(brk_chain1), BreakpointChain(brk_chain2), BreakpointChain(brk_chain3)]
return bundle

@pytest.fixture
def mock_bam():
# Create a mock object for `pysam.AlignmentFile`
bam = MagicMock()

# Mock the `.fetch()` method to yield mocked reads
def fetch(contig=None, start=None, end=None):
# Simulate reads based on parameters
reads = [
MagicMock(
reference_name="chr1",
pos=100,
cigarstring="100M",
is_reverse=False,
qname="read1",
get_tag=MagicMock(return_value="NM:i:1")
),
MagicMock(
reference_name="chr1",
pos=200,
cigarstring="50M50S",
is_reverse=True,
qname="read2",
get_tag=MagicMock(return_value="NM:i:2")
),
]
for read in reads:
yield read

bam.fetch.side_effect = fetch
return bam

@pytest.fixture
def mock_enumerate_breakpoints(monkeypatch):
# Mock `enumerate_breakpoints` to return predefined `BreakpointChain`
def mock_function(df):
breakpoints = [
Breakpoint("chr1", 100, "+"),
Breakpoint("chr1", 200, "-"),
]
return BreakpointChain(breakpoints)

# Patch the original `enumerate_breakpoints` function
monkeypatch.setattr("hairloom.collect.enumerate_breakpoints", mock_function)
return mock_function

# def test_extract_read_data_with_start_and_end(mock_bam):
# # Mock reads with secondaries
# mock_reads = [
# MagicMock(reference_name="chr1", pos=100, cigarstring="100M", is_reverse=False, qname="read1"),
# MagicMock(reference_name="chr1", pos=200, cigarstring="50M50S", is_reverse=True, qname="read2")
# ]
# mock_bam.fetch.return_value = iter(mock_reads)

# # Call the function
# df = extract_read_data(mock_bam, contig="chr1", start=100, end=200, max_reads=10)

# # Validate the DataFrame
# assert isinstance(df, pd.DataFrame)
# assert len(df) == len(mock_reads)

# def test_extract_read_data_no_start_or_end(mock_bam):
# mock_bam.fetch.return_value = iter([])
# df = extract_read_data(mock_bam, contig="chr1", start=None, end=None)
# assert df.empty

def test_make_bundle_empty():
reads = pd.DataFrame(columns=["qname", "alignment"])
bundle = make_bundle(reads)
assert bundle == []

def test_make_bundle_with_data(mock_enumerate_breakpoints):
reads = pd.DataFrame({
"qname": ["read1", "read1", "read2"],
"alignment": ["data1", "data2", "data3"]
})
mock_enumerate_breakpoints.return_value = MagicMock(tras=[], qname=None, info={})
bundle = make_bundle(reads)
assert len(bundle) == 2

def test_get_breakpoint_support_from_bundle(bundle):
expected_support = Counter({
"chr1:1000:+": 1,
Expand All @@ -46,14 +127,29 @@ def test_get_breakpoint_support_from_bundle(bundle):
assert result == expected_support

def test_map_similar_coordinate_to_higher_rank(bundle):
#bundle # fixture
margin = 10
print(f"Called with margin={margin}, bundle size={len(bundle)}")
breakpoint_support = get_breakpoint_support_from_bundle(bundle)
coord_map, coord_map_log = map_similar_coordinate_to_higher_rank(bundle, breakpoint_support, margin=10)
coord_map, coord_map_log = map_similar_coordinate_to_higher_rank(bundle, breakpoint_support, margin=margin)
assert coord_map['chr1:1000:+'] == 'chr1:1001:+', coord_map
assert coord_map['chr1:1000:-'] == 'chr1:1000:-', coord_map
assert coord_map['chr1:1011:+'] == 'chr1:1001:+', coord_map
assert coord_map['chr1:1012:+'] == 'chr1:1012:+', coord_map

# def test_map_similar_coordinate_to_higher_rank():
# bundle = [
# [Breakpoint("chr1", 100, "+"), Breakpoint("chr1", 110, "+")],
# [Breakpoint("chr1", 200, "-")]
# ]
# breakpoint_support = Counter({
# "chr1:100:+": 5,
# "chr1:110:+": 3,
# "chr1:200:-": 1
# })
# coord_map, coord_map_log = map_similar_coordinate_to_higher_rank(bundle, breakpoint_support, margin=10)
# assert coord_map["chr1:110:+"] == "chr1:100:+"
# assert coord_map["chr1:200:-"] == "chr1:200:-"

def test_fix_lower_support_coordinates(bundle):
breakpoint_support = get_breakpoint_support_from_bundle(bundle)
coord_map, coord_map_log = map_similar_coordinate_to_higher_rank(bundle, breakpoint_support, margin=10)
Expand Down Expand Up @@ -149,3 +245,50 @@ def test_extract_read_data():
bam = pysam.AlignmentFile(bam_path)
df = extract_read_data(bam, contig='PBEF1NeoTransposon', start=1477, end=1478)
assert np.all(df.to_numpy().astype(str) == expected.astype(str))

def test_get_svtype_translocation():
# Different chromosomes
brk1 = Breakpoint("chr1", 100, "+")
brk2 = Breakpoint("chr2", 200, "-")
tra = BreakpointPair(brk1, brk2)
assert get_svtype(tra) == "TRA"

def test_get_svtype_inversion():
# Same chromosome, same orientation
brk1 = Breakpoint("chr1", 100, "+")
brk2 = Breakpoint("chr1", 200, "+")
tra = BreakpointPair(brk1, brk2)
assert get_svtype(tra) == "INV"

brk1 = Breakpoint("chr1", 200, "-")
brk2 = Breakpoint("chr1", 100, "-")
tra = BreakpointPair(brk1, brk2)
assert get_svtype(tra) == "INV"

def test_get_svtype_deletion():
# Same chromosome, '+-' orientation
brk1 = Breakpoint("chr1", 100, "+")
brk2 = Breakpoint("chr1", 200, "-")
tra = BreakpointPair(brk1, brk2)
assert get_svtype(tra) == "DEL"

tra = BreakpointPair(brk2, brk1)
assert get_svtype(tra) == "DEL"

def test_get_svtype_duplication():
# Same chromosome, '-+' orientation
brk1 = Breakpoint("chr1", 100, "-")
brk2 = Breakpoint("chr1", 200, "+")
tra = BreakpointPair(brk1, brk2)
assert get_svtype(tra) == "DUP"

tra = BreakpointPair(brk2, brk1)
assert get_svtype(tra) == "DUP"

def test_get_svtype_invalid_orientation():
# Same chromosome but unsupported orientation
brk1 = Breakpoint("chr1", 100, "+")
brk2 = Breakpoint("chr1", 200, "*") # Invalid orientation
tra = BreakpointPair(brk1, brk2)
with pytest.raises(ValueError):
get_svtype(tra)
44 changes: 42 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import pandas as pd
from pysam import AlignedSegment

import pytest

from hairloom.datatypes import BreakpointChain, Breakpoint
from hairloom.utils import enumerate_breakpoints, get_secondaries, make_split_read_table, make_seg_table, make_brk_table, make_tra_table
from hairloom.utils import (enumerate_breakpoints, get_secondaries, make_split_read_table, make_seg_table, make_brk_table, make_tra_table,
is_breakpoints_not_sorted, reverse_complement)


class MockAlignment:
Expand Down Expand Up @@ -199,4 +202,41 @@ def test_make_tra_table():
"ori2": ["-"],
"support": [1]
})
pd.testing.assert_frame_equal(result, expected)
pd.testing.assert_frame_equal(result, expected)


def test_is_breakpoints_not_sorted():
chrom_order = ["chr1", "chr2", "chr3", "chrX", "chrY"]

# Test for unsorted chromosomes
assert is_breakpoints_not_sorted("chr2", 100, "chr1", 200, chrom_order) is True

# Test for sorted chromosomes
assert is_breakpoints_not_sorted("chr1", 200, "chr2", 100, chrom_order) is False

# Test for same chromosome, unsorted positions
assert is_breakpoints_not_sorted("chr1", 300, "chr1", 200, chrom_order) is True

# Test for same chromosome, sorted positions
assert is_breakpoints_not_sorted("chr1", 100, "chr1", 200, chrom_order) is False

# Test for edge case of equal chromosomes and positions
assert is_breakpoints_not_sorted("chr1", 200, "chr1", 200, chrom_order) is False


def test_reverse_complement():
# Test for a valid DNA sequence
assert reverse_complement("ATCG") == "CGAT"

# Test for a sequence with 'N' (undefined base)
assert reverse_complement("ATGCN") == ".GCAT"

# Test for an empty sequence
assert reverse_complement("") == ""

# Test for a palindrome sequence
assert reverse_complement("ATGCAT") == "ATGCAT"

# Test for case sensitivity
with pytest.raises(KeyError):
reverse_complement("atcg") # Lowercase input should raise a KeyError

0 comments on commit cff5d5b

Please sign in to comment.