Skip to content
This repository has been archived by the owner on Nov 15, 2018. It is now read-only.

Commit

Permalink
msm_basic.formula_imager_segm refactoring and bug fixing
Browse files Browse the repository at this point in the history
* splitted compute_sf_images method into several ones
* added code for handling cases when gen_iso_peak_images returns two images for the same formula-adduct-peak
* sci test report update (fix of the previous buggy report)
* new test module for msm_basic.formula_imager_segm
  • Loading branch information
intsco committed May 9, 2016
1 parent 721c075 commit 295173a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 29 deletions.
74 changes: 48 additions & 26 deletions sm/engine/msm_basic/formula_imager_segm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
from collections import defaultdict
import pandas as pd
from itertools import izip, repeat, islice
import numpy as np
Expand Down Expand Up @@ -26,7 +27,7 @@ def _find_mz_bounds(mz_grid, workload_per_mz, n=32):
return mz_bounds


def _create_mz_buckets(mz_bounds, ppm):
def _create_mz_segments(mz_bounds, ppm):
mz_buckets = []
for i, (l, r) in enumerate(zip([0] + mz_bounds, mz_bounds + [sys.float_info.max])):
l -= l * ppm * 1e-6
Expand All @@ -41,7 +42,7 @@ def _create_mz_buckets(mz_bounds, ppm):
# for s_i, (l, r) in enumerate(mz_buckets)]


def _segment_spectra(sp, mz_buckets):
def _segment_spectrum(sp, mz_buckets):
sp_id, mzs, ints = sp
for s_i, (l, r) in enumerate(mz_buckets):
smask = (mzs >= l) & (mzs <= r)
Expand Down Expand Up @@ -70,7 +71,7 @@ def _gen_iso_images(spectra_it, sp_indexes, sf_peak_df, nrows, ncols, ppm, peaks
# a bit slower than using pure numpy arrays but much shorter
# may leak memory because of https://github.com/pydata/pandas/issues/2659 or smth else
sp_df = pd.DataFrame(_sp_df_gen(sp_list, sp_indexes),
columns=['idx', 'mz', 'ints'], dtype=np.float64).sort_values(by='mz')
columns=['idx', 'mz', 'ints']).sort_values(by='mz')
# print sp_df.info()

# -1, + 1 are needed to extend sf_peak_mz range so that it covers 100% of spectra
Expand All @@ -91,17 +92,49 @@ def _gen_iso_images(spectra_it, sp_indexes, sf_peak_df, nrows, ncols, ppm, peaks
(sf_peak_df.peak_i.iloc[i], coo_matrix((data, (row_inds, col_inds)), shape=(nrows, ncols)))


def _img_pairs_to_list(pairs):
def _img_pairs_to_list(pairs, shape):
""" list of (coord, value) pairs -> list of values """
if not pairs:
return None
length = max([i for i, img in pairs]) + 1
res = np.ndarray((length,), dtype=object)
for i, img in pairs:
res[i] = img

d = defaultdict(lambda: coo_matrix(shape))
for k, m in pairs:
_m = d[k]
d[k] = _m if _m.nnz >= m.nnz else m
distinct_pairs = d.items()

res = np.ndarray((max(d.keys()) + 1,), dtype=object)
for i, m in distinct_pairs:
res[i] = m
return res.tolist()


def find_mz_segments(spectra, sf_peak_df, ppm):
# spectra_sample = spectra.take(200)
spectra_sample = spectra.takeSample(withReplacement=False, num=200)
mz_grid, workload_per_mz = _estimate_mz_workload(spectra_sample, sf_peak_df, bins=10000)
mz_bounds = _find_mz_bounds(mz_grid, workload_per_mz, n=1024)
mz_segments = _create_mz_segments(mz_bounds, ppm=ppm)
return spectra_sample, mz_segments


def gen_iso_peak_images(sc, ds, sf_peak_df, segm_spectra, peaks_per_sp_segm, ppm):
sp_indexes_brcast = sc.broadcast(ds.norm_img_pixel_inds)
sf_peak_df_brcast = sc.broadcast(sf_peak_df) # TODO: replace broadcast variable with rdd and cogroup
nrows, ncols = ds.get_dims()
iso_peak_images = (segm_spectra.flatMap(lambda (s_i, sp_segm):
_gen_iso_images(sp_segm, sp_indexes_brcast.value, sf_peak_df_brcast.value,
nrows, ncols, ppm, peaks_per_sp_segm)))
return iso_peak_images


def gen_iso_sf_images(iso_peak_images, shape):
iso_sf_images = (iso_peak_images
.groupByKey(numPartitions=256)
.mapValues(lambda img_pairs_it: _img_pairs_to_list(list(img_pairs_it), shape)))
return iso_sf_images


# TODO: add tests
def compute_sf_images(sc, ds, sf_peak_df, ppm):
""" Compute isotopic images for all formula
Expand All @@ -111,27 +144,16 @@ def compute_sf_images(sc, ds, sf_peak_df, ppm):
: pyspark.rdd.RDD
RDD of sum formula, list[sparse matrix of intensities]
"""
nrows, ncols = ds.get_dims()
spectra_rdd = ds.get_spectra()
# spectra_rdd.cache()

spectra_sample = spectra_rdd.takeSample(withReplacement=False, num=200)
mz_grid, workload_per_mz = _estimate_mz_workload(spectra_sample, sf_peak_df, bins=10000)

mz_bounds = _find_mz_bounds(mz_grid, workload_per_mz, n=1024)
mz_buckets = _create_mz_buckets(mz_bounds, ppm=ppm)
spectra_sample, mz_segments = find_mz_segments(spectra_rdd, sf_peak_df, ppm)
segm_spectra = (spectra_rdd
.flatMap(lambda sp: _segment_spectra(sp, mz_buckets))
.groupByKey(numPartitions=len(mz_buckets)))
.flatMap(lambda sp: _segment_spectrum(sp, mz_segments))
.groupByKey(numPartitions=len(mz_segments)))

peaks_per_sp_segm = int(np.mean([t[1].shape[0] for t in spectra_sample])) / len(mz_buckets)
sp_indexes_brcast = sc.broadcast(ds.norm_img_pixel_inds)
sf_peak_df_brcast = sc.broadcast(sf_peak_df) # TODO: replace broadcast variable with rdd and cogroup
iso_peak_images = (segm_spectra.flatMap(lambda (s_i, sp_segm):
_gen_iso_images(sp_segm, sp_indexes_brcast.value, sf_peak_df_brcast.value,
nrows, ncols, ppm, peaks_per_sp_segm)))
peaks_per_sp_segm = int(np.mean([t[1].shape[0] for t in spectra_sample])) / len(mz_segments)
iso_peak_images = gen_iso_peak_images(sc, ds, sf_peak_df, segm_spectra, peaks_per_sp_segm, ppm)
iso_sf_images = gen_iso_sf_images(iso_peak_images, shape=ds.get_dims())

iso_sf_images = (iso_peak_images
.groupByKey(numPartitions=256)
.mapValues(lambda img_pairs_it: _img_pairs_to_list(list(img_pairs_it))))
return iso_sf_images

26 changes: 26 additions & 0 deletions sm/engine/tests/msm_basic/test_formula_imager_segm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from scipy.sparse import coo_matrix

from sm.engine.tests.util import spark_context
from sm.engine.msm_basic.formula_imager_segm import gen_iso_sf_images


def test_gen_iso_sf_images(spark_context):
iso_peak_images = spark_context.parallelize([((3079, '+H'), (0, coo_matrix([[1., 0., 0.]]))),
((3079, '+H'), (3, coo_matrix([[2., 1., 0.]]))),
((3079, '+H'), (3, coo_matrix([[0., 0., 10.]])))])
exp_iso_sf_imgs = [((3079, '+H'), [coo_matrix([[1., 0., 0.]]),
None,
None,
coo_matrix([[2., 1., 0.]])])]

iso_sf_imgs = gen_iso_sf_images(iso_peak_images, shape=(1, 3)).collect()

assert len(iso_sf_imgs) == len(exp_iso_sf_imgs)
for (k, l), (ek, el) in zip(iso_sf_imgs, exp_iso_sf_imgs):
assert k == ek
assert len(l) == len(el)
for m, em in zip(l, el):
if em is None:
assert m is None
else:
assert (m == em).toarray().all()
4 changes: 2 additions & 2 deletions tests/reports/spheroid_12h_search_res.csv
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ C11H10O7 +K 0.976315789474 0.0 0.953741700383
C11H11F3N2O3 +K 0.0 0.0 0.942679626657
C11H11F3N2O4 +K 0.982216494845 0.0294604948304 0.953647643013
C11H11NO2 +K 0.941666666667 0.0 0.944314212783
C11H11NO3 +H 0.960407569141 0.0 0.951904869996
C11H11NO3 +H 0.995960739902 0.0 0.951904869996
C11H11NO3 +K 0.981077694236 0.0 0.943664621254
C11H11NO4 +H 0.929777777778 0.0 0.963007681842
C11H12Cl2N2O5 +Na 0.9375 0.0 0.858224057253
Expand Down Expand Up @@ -2301,7 +2301,7 @@ C46H84NO8P +H 0.999588477366 0.958440753291 0.982990026501
C46H84NO8P +K 0.998063872255 0.253617907717 0.903428997805
C46H84NO8P +Na 0.975414781297 0.630554518247 0.915396518583
C46H84O13P2 +H 0.0 0.0 0.851762937609
C46H86NO7P +H 0.95 0.0 0.855117050438
C46H86NO7P +H 0.997692307692 0.448313830417 0.941610217342
C46H86NO7P +Na 0.0 0.0 0.855151947092
C46H86NO8P +H 0.998567335244 0.405050835985 0.869712787869
C46H86NO8P +K 0.993885819521 0.447644458556 0.900866918992
Expand Down
12 changes: 12 additions & 0 deletions tests/sci_test_search_job_spheroid_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@
from sm.engine.util import proj_root, SMConfig


# def sm_config():
# with open(join(proj_root(), 'conf/config.json')) as f:
# return json.load(f)

SMConfig.set_path(join(proj_root(), 'conf/config.json'))
sm_config = SMConfig.get_conf()

ds_name = 'sci_test_spheroid_12h'
data_dir_path = join(SMConfig.get_conf()['fs']['base_path'], ds_name)
input_dir_path = join(proj_root(), 'test/data/sci_test_search_job_spheroid_dataset')
ds_config_path = join(input_dir_path, 'config.json')

SEARCH_RES_SELECT = ("select sf, adduct, stats "
"from iso_image_metrics s "
"join formula_db sf_db on sf_db.id = s.db_id "
Expand Down
2 changes: 1 addition & 1 deletion tests/test_search_job_imzml_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def create_fill_sm_database(create_test_db, drop_test_db, sm_config):
db.close()


@patch('sm.engine.formula_img_validator.get_compute_img_metrics')
@patch('sm.engine.msm_basic.formula_img_validator.get_compute_img_metrics')
def test_search_job_imzml_example(get_compute_img_measures_mock, create_fill_sm_database, sm_config):
get_compute_img_measures_mock.return_value = lambda *args: (0.9, 0.9, 0.9)

Expand Down

0 comments on commit 295173a

Please sign in to comment.