diff --git a/sm/engine/msm_basic/formula_imager_segm.py b/sm/engine/msm_basic/formula_imager_segm.py index c9d992c..3c3a360 100644 --- a/sm/engine/msm_basic/formula_imager_segm.py +++ b/sm/engine/msm_basic/formula_imager_segm.py @@ -1,4 +1,5 @@ import sys +from collections import defaultdict import pandas as pd from itertools import izip, repeat, islice import numpy as np @@ -26,7 +27,7 @@ def _find_mz_bounds(mz_grid, workload_per_mz, n=32): return mz_bounds -def _create_mz_buckets(mz_bounds, ppm): +def _create_mz_segments(mz_bounds, ppm): mz_buckets = [] for i, (l, r) in enumerate(zip([0] + mz_bounds, mz_bounds + [sys.float_info.max])): l -= l * ppm * 1e-6 @@ -41,7 +42,7 @@ def _create_mz_buckets(mz_bounds, ppm): # for s_i, (l, r) in enumerate(mz_buckets)] -def _segment_spectra(sp, mz_buckets): +def _segment_spectrum(sp, mz_buckets): sp_id, mzs, ints = sp for s_i, (l, r) in enumerate(mz_buckets): smask = (mzs >= l) & (mzs <= r) @@ -70,7 +71,7 @@ def _gen_iso_images(spectra_it, sp_indexes, sf_peak_df, nrows, ncols, ppm, peaks # a bit slower than using pure numpy arrays but much shorter # may leak memory because of https://github.com/pydata/pandas/issues/2659 or smth else sp_df = pd.DataFrame(_sp_df_gen(sp_list, sp_indexes), - columns=['idx', 'mz', 'ints'], dtype=np.float64).sort_values(by='mz') + columns=['idx', 'mz', 'ints']).sort_values(by='mz') # print sp_df.info() # -1, + 1 are needed to extend sf_peak_mz range so that it covers 100% of spectra @@ -91,17 +92,49 @@ def _gen_iso_images(spectra_it, sp_indexes, sf_peak_df, nrows, ncols, ppm, peaks (sf_peak_df.peak_i.iloc[i], coo_matrix((data, (row_inds, col_inds)), shape=(nrows, ncols))) -def _img_pairs_to_list(pairs): +def _img_pairs_to_list(pairs, shape): """ list of (coord, value) pairs -> list of values """ if not pairs: return None - length = max([i for i, img in pairs]) + 1 - res = np.ndarray((length,), dtype=object) - for i, img in pairs: - res[i] = img + + d = defaultdict(lambda: coo_matrix(shape)) + for k, m in pairs: + _m = d[k] + d[k] = _m if _m.nnz >= m.nnz else m + distinct_pairs = d.items() + + res = np.ndarray((max(d.keys()) + 1,), dtype=object) + for i, m in distinct_pairs: + res[i] = m return res.tolist() +def find_mz_segments(spectra, sf_peak_df, ppm): + # spectra_sample = spectra.take(200) + spectra_sample = spectra.takeSample(withReplacement=False, num=200) + mz_grid, workload_per_mz = _estimate_mz_workload(spectra_sample, sf_peak_df, bins=10000) + mz_bounds = _find_mz_bounds(mz_grid, workload_per_mz, n=1024) + mz_segments = _create_mz_segments(mz_bounds, ppm=ppm) + return spectra_sample, mz_segments + + +def gen_iso_peak_images(sc, ds, sf_peak_df, segm_spectra, peaks_per_sp_segm, ppm): + sp_indexes_brcast = sc.broadcast(ds.norm_img_pixel_inds) + sf_peak_df_brcast = sc.broadcast(sf_peak_df) # TODO: replace broadcast variable with rdd and cogroup + nrows, ncols = ds.get_dims() + iso_peak_images = (segm_spectra.flatMap(lambda (s_i, sp_segm): + _gen_iso_images(sp_segm, sp_indexes_brcast.value, sf_peak_df_brcast.value, + nrows, ncols, ppm, peaks_per_sp_segm))) + return iso_peak_images + + +def gen_iso_sf_images(iso_peak_images, shape): + iso_sf_images = (iso_peak_images + .groupByKey(numPartitions=256) + .mapValues(lambda img_pairs_it: _img_pairs_to_list(list(img_pairs_it), shape))) + return iso_sf_images + + # TODO: add tests def compute_sf_images(sc, ds, sf_peak_df, ppm): """ Compute isotopic images for all formula @@ -111,27 +144,16 @@ def compute_sf_images(sc, ds, sf_peak_df, ppm): : pyspark.rdd.RDD RDD of sum formula, list[sparse matrix of intensities] """ - nrows, ncols = ds.get_dims() spectra_rdd = ds.get_spectra() - # spectra_rdd.cache() - spectra_sample = spectra_rdd.takeSample(withReplacement=False, num=200) - mz_grid, workload_per_mz = _estimate_mz_workload(spectra_sample, sf_peak_df, bins=10000) - - mz_bounds = _find_mz_bounds(mz_grid, workload_per_mz, n=1024) - mz_buckets = _create_mz_buckets(mz_bounds, ppm=ppm) + spectra_sample, mz_segments = find_mz_segments(spectra_rdd, sf_peak_df, ppm) segm_spectra = (spectra_rdd - .flatMap(lambda sp: _segment_spectra(sp, mz_buckets)) - .groupByKey(numPartitions=len(mz_buckets))) + .flatMap(lambda sp: _segment_spectrum(sp, mz_segments)) + .groupByKey(numPartitions=len(mz_segments))) - peaks_per_sp_segm = int(np.mean([t[1].shape[0] for t in spectra_sample])) / len(mz_buckets) - sp_indexes_brcast = sc.broadcast(ds.norm_img_pixel_inds) - sf_peak_df_brcast = sc.broadcast(sf_peak_df) # TODO: replace broadcast variable with rdd and cogroup - iso_peak_images = (segm_spectra.flatMap(lambda (s_i, sp_segm): - _gen_iso_images(sp_segm, sp_indexes_brcast.value, sf_peak_df_brcast.value, - nrows, ncols, ppm, peaks_per_sp_segm))) + peaks_per_sp_segm = int(np.mean([t[1].shape[0] for t in spectra_sample])) / len(mz_segments) + iso_peak_images = gen_iso_peak_images(sc, ds, sf_peak_df, segm_spectra, peaks_per_sp_segm, ppm) + iso_sf_images = gen_iso_sf_images(iso_peak_images, shape=ds.get_dims()) - iso_sf_images = (iso_peak_images - .groupByKey(numPartitions=256) - .mapValues(lambda img_pairs_it: _img_pairs_to_list(list(img_pairs_it)))) return iso_sf_images + diff --git a/sm/engine/tests/msm_basic/test_formula_imager_segm.py b/sm/engine/tests/msm_basic/test_formula_imager_segm.py new file mode 100644 index 0000000..a5767da --- /dev/null +++ b/sm/engine/tests/msm_basic/test_formula_imager_segm.py @@ -0,0 +1,26 @@ +from scipy.sparse import coo_matrix + +from sm.engine.tests.util import spark_context +from sm.engine.msm_basic.formula_imager_segm import gen_iso_sf_images + + +def test_gen_iso_sf_images(spark_context): + iso_peak_images = spark_context.parallelize([((3079, '+H'), (0, coo_matrix([[1., 0., 0.]]))), + ((3079, '+H'), (3, coo_matrix([[2., 1., 0.]]))), + ((3079, '+H'), (3, coo_matrix([[0., 0., 10.]])))]) + exp_iso_sf_imgs = [((3079, '+H'), [coo_matrix([[1., 0., 0.]]), + None, + None, + coo_matrix([[2., 1., 0.]])])] + + iso_sf_imgs = gen_iso_sf_images(iso_peak_images, shape=(1, 3)).collect() + + assert len(iso_sf_imgs) == len(exp_iso_sf_imgs) + for (k, l), (ek, el) in zip(iso_sf_imgs, exp_iso_sf_imgs): + assert k == ek + assert len(l) == len(el) + for m, em in zip(l, el): + if em is None: + assert m is None + else: + assert (m == em).toarray().all() diff --git a/tests/reports/spheroid_12h_search_res.csv b/tests/reports/spheroid_12h_search_res.csv index 0bf7717..7c97493 100644 --- a/tests/reports/spheroid_12h_search_res.csv +++ b/tests/reports/spheroid_12h_search_res.csv @@ -112,7 +112,7 @@ C11H10O7 +K 0.976315789474 0.0 0.953741700383 C11H11F3N2O3 +K 0.0 0.0 0.942679626657 C11H11F3N2O4 +K 0.982216494845 0.0294604948304 0.953647643013 C11H11NO2 +K 0.941666666667 0.0 0.944314212783 -C11H11NO3 +H 0.960407569141 0.0 0.951904869996 +C11H11NO3 +H 0.995960739902 0.0 0.951904869996 C11H11NO3 +K 0.981077694236 0.0 0.943664621254 C11H11NO4 +H 0.929777777778 0.0 0.963007681842 C11H12Cl2N2O5 +Na 0.9375 0.0 0.858224057253 @@ -2301,7 +2301,7 @@ C46H84NO8P +H 0.999588477366 0.958440753291 0.982990026501 C46H84NO8P +K 0.998063872255 0.253617907717 0.903428997805 C46H84NO8P +Na 0.975414781297 0.630554518247 0.915396518583 C46H84O13P2 +H 0.0 0.0 0.851762937609 -C46H86NO7P +H 0.95 0.0 0.855117050438 +C46H86NO7P +H 0.997692307692 0.448313830417 0.941610217342 C46H86NO7P +Na 0.0 0.0 0.855151947092 C46H86NO8P +H 0.998567335244 0.405050835985 0.869712787869 C46H86NO8P +K 0.993885819521 0.447644458556 0.900866918992 diff --git a/tests/sci_test_search_job_spheroid_dataset.py b/tests/sci_test_search_job_spheroid_dataset.py index 556b8ad..088d37b 100644 --- a/tests/sci_test_search_job_spheroid_dataset.py +++ b/tests/sci_test_search_job_spheroid_dataset.py @@ -14,6 +14,18 @@ from sm.engine.util import proj_root, SMConfig +# def sm_config(): +# with open(join(proj_root(), 'conf/config.json')) as f: +# return json.load(f) + +SMConfig.set_path(join(proj_root(), 'conf/config.json')) +sm_config = SMConfig.get_conf() + +ds_name = 'sci_test_spheroid_12h' +data_dir_path = join(SMConfig.get_conf()['fs']['base_path'], ds_name) +input_dir_path = join(proj_root(), 'test/data/sci_test_search_job_spheroid_dataset') +ds_config_path = join(input_dir_path, 'config.json') + SEARCH_RES_SELECT = ("select sf, adduct, stats " "from iso_image_metrics s " "join formula_db sf_db on sf_db.id = s.db_id " diff --git a/tests/test_search_job_imzml_example.py b/tests/test_search_job_imzml_example.py index 4dd6046..572342c 100644 --- a/tests/test_search_job_imzml_example.py +++ b/tests/test_search_job_imzml_example.py @@ -37,7 +37,7 @@ def create_fill_sm_database(create_test_db, drop_test_db, sm_config): db.close() -@patch('sm.engine.formula_img_validator.get_compute_img_metrics') +@patch('sm.engine.msm_basic.formula_img_validator.get_compute_img_metrics') def test_search_job_imzml_example(get_compute_img_measures_mock, create_fill_sm_database, sm_config): get_compute_img_measures_mock.return_value = lambda *args: (0.9, 0.9, 0.9)