Skip to content
This repository has been archived by the owner on Nov 15, 2018. It is now read-only.

Commit

Permalink
First implementation of a base search algorithm class
Browse files Browse the repository at this point in the history
* current search algorithm moved to msm_basic package
* SearchAlgorithm is a base class for implementing search algorithms
* SearchResults class api changed
* tests updates
  • Loading branch information
intsco committed May 4, 2016
1 parent 0db1c07 commit 721c075
Show file tree
Hide file tree
Showing 16 changed files with 170 additions and 99 deletions.
Empty file modified scripts/run.sh
100644 → 100755
Empty file.
1 change: 1 addition & 0 deletions sm/engine/formulas_segm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
'AND charge = %s '
'ORDER BY sf_id, adduct')

# TODO: target_decoy_add table is getting too big
THEOR_PEAKS_DECOY_ADD_SEL = (
'SELECT DISTINCT p.sf_id, decoy_add as adduct, centr_mzs, centr_ints '
'FROM theor_peaks p '
Expand Down
Empty file added sm/engine/msm_basic/__init__.py
Empty file.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ def sf_image_metrics_est_fdr(sf_metrics_df, formulas, fdr):
return sf_metrics_df.join(sf_adduct_fdr, how='inner')[['chaos', 'spatial', 'spectral', 'msm', 'fdr']]


def filter_sf_metrics(sf_metrics_df):
return sf_metrics_df[(sf_metrics_df.chaos > 0) | (sf_metrics_df.spatial > 0) | (sf_metrics_df.spectral > 0)]
# return sf_metrics_df[sf_metrics_df.msm > 0]


def filter_sf_images(sf_images, sf_metrics_df):
return sf_images.filter(lambda (sf_i, _): sf_i in sf_metrics_df.index)
# def filter_sf_metrics(sf_metrics_df):
# return sf_metrics_df[(sf_metrics_df.chaos > 0) | (sf_metrics_df.spatial > 0) | (sf_metrics_df.spectral > 0)]
# # return sf_metrics_df[sf_metrics_df.msm > 0]
#
#
# def filter_sf_images(sf_images, sf_metrics_df):
# return sf_images.filter(lambda (sf_i, _): sf_i in sf_metrics_df.index)

31 changes: 31 additions & 0 deletions sm/engine/msm_basic/msm_basic_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from sm.engine.msm_basic.formula_imager_segm import compute_sf_images
from sm.engine.msm_basic.formula_img_validator import sf_image_metrics, sf_image_metrics_est_fdr
from sm.engine.search_algorithm import SearchAlgorithm
from sm.engine.util import logger


class MSMBasicSearch(SearchAlgorithm):

def __init__(self, sc, ds, formulas, fdr, ds_config):
super(MSMBasicSearch, self).__init__(sc, ds, formulas, fdr, ds_config)
self.metrics = ['chaos', 'spatial', 'spectral']

def search(self):
logger.info('Running molecule search')
sf_images = compute_sf_images(self.sc, self.ds, self.formulas.get_sf_peak_df(),
self.ds_config['image_generation']['ppm'])
all_sf_metrics_df = self.calc_metrics(sf_images)
sf_metrics_fdr_df = self.estimate_fdr(all_sf_metrics_df)
sf_metrics_fdr_df = self.filter_sf_metrics(sf_metrics_fdr_df)
return sf_metrics_fdr_df, self.filter_sf_images(sf_images, sf_metrics_fdr_df)

def calc_metrics(self, sf_images):
all_sf_metrics_df = sf_image_metrics(sf_images, self.sc, self.formulas, self.ds, self.ds_config)
return all_sf_metrics_df

def estimate_fdr(self, all_sf_metrics_df):
sf_metrics_fdr_df = sf_image_metrics_est_fdr(all_sf_metrics_df, self.formulas, self.fdr)
return sf_metrics_fdr_df

def filter_sf_metrics(self, sf_metrics_df):
return sf_metrics_df[(sf_metrics_df.chaos > 0) | (sf_metrics_df.spatial > 0) | (sf_metrics_df.spectral > 0)]
25 changes: 25 additions & 0 deletions sm/engine/search_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

class SearchAlgorithm(object):

def __init__(self, sc, ds, formulas, fdr, ds_config):
self.sc = sc
self.ds = ds
self.formulas = formulas
self.fdr = fdr
self.ds_config = ds_config
self.metrics = []

def search(self):
pass

def calc_metrics(self, sf_images):
pass

def estimate_fdr(self, all_sf_metrics_df):
pass

def filter_sf_metrics(self, sf_metrics_df):
return sf_metrics_df[sf_metrics_df.msm > 0]

def filter_sf_images(self, sf_images, sf_metrics_df):
return sf_images.filter(lambda (sf_i, _): sf_i in sf_metrics_df.index)
69 changes: 28 additions & 41 deletions sm/engine/search_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@
.. moduleauthor:: Vitaly Kovalev <[email protected]>
"""
import json
from datetime import datetime
from os.path import join
from pprint import pformat
from datetime import datetime

from pyspark import SparkContext, SparkConf
from sm.engine.msm_basic.msm_basic_search import MSMBasicSearch
from sm.engine.dataset import Dataset
from sm.engine.db import DB
from sm.engine.fdr import FDR
from sm.engine.formula_imager_segm import compute_sf_images
from sm.engine.formula_img_validator import sf_image_metrics, sf_image_metrics_est_fdr, filter_sf_images, filter_sf_metrics
from sm.engine.formulas_segm import FormulasSegm
from sm.engine.imzml_txt_converter import ImzmlTxtConverter
from sm.engine.search_results import SearchResults
from sm.engine.theor_peaks_gen import TheorPeaksGenerator
from sm.engine.util import local_path, proj_root, SMConfig, logger
from sm.engine.work_dir import WorkDirManager

from pyspark import SparkContext, SparkConf
from sm.engine.imzml_txt_converter import ImzmlTxtConverter

DS_ID_SEL = "SELECT id FROM dataset WHERE name = %s"
DB_ID_SEL = "SELECT id FROM formula_db WHERE name = %s"
Expand Down Expand Up @@ -79,6 +78,19 @@ def _init_db(self):
self.db = DB(self.sm_config['db'])
self.sf_db_id = self.db.select_one(DB_ID_SEL, self.ds_config['database']['name'])[0]

# TODO: add tests
def store_job_meta(self):
""" Store search job metadata in the database """
logger.info('Storing job metadata')
self.ds_id = int(self.db.select_one(DS_ID_SEL, self.ds_name)[0])
self.job_id = self.ds_id
self.db.alter(DEL_JOB_SQL, self.job_id)
rows = [(self.job_id, self.sf_db_id, self.ds_id, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))]
self.db.insert(JOB_INS, rows)

rows = [(self.job_id, adduct) for adduct in self.ds_config['isotope_generation']['adducts']]
self.db.insert(ADDUCT_INS, rows)

def run(self, input_path, ds_config_path, clean=False):
""" Entry point of the engine. Molecule search is completed in several steps:
* Copying input data to the engine work dir
Expand Down Expand Up @@ -133,8 +145,17 @@ def run(self, input_path, ds_config_path, clean=False):
self.fdr.decoy_adduct_selection()
self.formulas = FormulasSegm(self.job_id, self.sf_db_id, self.ds_config, self.db)

search_results = self._search()
self._store_results(search_results)
search_alg = MSMBasicSearch(self.sc, self.ds, self.formulas, self.fdr, self.ds_config)
sf_metrics_df, sf_iso_images = search_alg.search()

search_results = SearchResults(self.sf_db_id, self.ds_id, self.job_id,
self.ds_name, self.formulas.get_sf_adduct_peaksn(),
self.db, self.sm_config, self.ds_config)
search_results.sf_metrics_df = sf_metrics_df
search_results.sf_iso_images = sf_iso_images
search_results.metrics = search_alg.metrics
search_results.nrows, search_results.ncols = self.ds.get_dims()
search_results.store()

except Exception as e:
raise
Expand All @@ -144,37 +165,3 @@ def run(self, input_path, ds_config_path, clean=False):
self.sc.stop()
if self.db:
self.db.close()

# TODO: add tests
def store_job_meta(self):
""" Store search job metadata in the database """
logger.info('Storing job metadata')
self.ds_id = int(self.db.select_one(DS_ID_SEL, self.ds_name)[0])
self.job_id = self.ds_id
self.db.alter(DEL_JOB_SQL, self.job_id)
rows = [(self.job_id, self.sf_db_id, self.ds_id, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))]
self.db.insert(JOB_INS, rows)

rows = [(self.job_id, adduct) for adduct in self.ds_config['isotope_generation']['adducts']]
self.db.insert(ADDUCT_INS, rows)

def _search(self):
logger.info('Running molecule search')
sf_images = compute_sf_images(self.sc, self.ds, self.formulas.get_sf_peak_df(),
self.ds_config['image_generation']['ppm'])
all_sf_metrics_df = sf_image_metrics(sf_images, self.sc, self.formulas, self.ds, self.ds_config)
sf_metrics_fdr_df = sf_image_metrics_est_fdr(all_sf_metrics_df, self.formulas, self.fdr)
sf_metrics_fdr_df = filter_sf_metrics(sf_metrics_fdr_df)
sf_images = filter_sf_images(sf_images, sf_metrics_fdr_df)

return SearchResults(self.sf_db_id, self.ds_id, self.job_id,
sf_metrics_fdr_df, sf_images,
self.formulas.get_sf_adduct_peaksn(),
self.db, self.sm_config)

def _store_results(self, search_results):
logger.info('Storing search results to the DB')
search_results.clear_old_results()
search_results.store_sf_img_metrics()
nrows, ncols = self.ds.get_dims()
search_results.store_sf_iso_images(nrows, ncols)
37 changes: 26 additions & 11 deletions sm/engine/search_results.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
from collections import OrderedDict

import numpy as np

from sm.engine.db import DB
from sm.engine.util import logger


METRICS_INS = 'INSERT INTO iso_image_metrics VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
SF_ISO_IMGS_INS = 'INSERT INTO iso_image VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)'
clear_iso_image_sql = 'DELETE FROM iso_image WHERE job_id = %s'
Expand All @@ -32,15 +32,21 @@ class SearchResults(object):
db: engine.db.DB
sm_config: dict
"""
def __init__(self, sf_db_id, ds_id, job_id, sf_metrics_df, sf_iso_images, sf_adduct_peaksn, db, sm_config):
def __init__(self, sf_db_id, ds_id, job_id, ds_name, sf_adduct_peaksn, db,
sm_config, ds_config):
self.sf_db_id = sf_db_id
self.ds_id = ds_id
self.job_id = job_id
self.ds_name = ds_name
self.db = db
self.sm_config = sm_config
self.ds_config = ds_config
self.sf_adduct_peaksn = sf_adduct_peaksn
self.sf_iso_images = sf_iso_images
self.sf_metrics_df = sf_metrics_df
self.sf_iso_images = None
self.sf_metrics_df = None
self.metrics = None
self.ncols = None
self.nrows = None

def clear_old_results(self):
""" Clear all previous search results for the dataset from the database """
Expand All @@ -49,22 +55,23 @@ def clear_old_results(self):
self.db.alter(clear_iso_image_metrics_sql, self.job_id)

@staticmethod
def _metrics_table_row_gen(job_id, db_id, metr_df, sf_adduct_peaksn):
for ind, s in metr_df.reset_index().iterrows():
metr_json = json.dumps(OrderedDict([('chaos', s.chaos), ('spatial', s.spatial), ('spectral', s.spectral)]))
def _metrics_table_row_gen(job_id, db_id, metr_df, sf_adduct_peaksn, metrics):
for ind, r in metr_df.reset_index().iterrows():
metr_json = json.dumps(OrderedDict([(m, r[m]) for m in metrics]))
peaks_n = sf_adduct_peaksn[ind][2]
yield (job_id, db_id, s.sf_id, s.adduct, s.msm, s.fdr, metr_json, peaks_n)
yield (job_id, db_id, r.sf_id, r.adduct, r.msm, r.fdr, metr_json, peaks_n)

def store_sf_img_metrics(self):
""" Store formula image metrics in the database """
logger.info('Storing iso image metrics')
rows = list(self._metrics_table_row_gen(self.job_id, self.sf_db_id, self.sf_metrics_df, self.sf_adduct_peaksn))
rows = list(self._metrics_table_row_gen(self.job_id, self.sf_db_id,
self.sf_metrics_df, self.sf_adduct_peaksn,
self.metrics))
# TODO: for some unknown reason in some cases may be super slow (minutes)
self.db.insert(METRICS_INS, rows)

self.db.alter(RESULTS_TABLE_VIEW_REFRESH)

def store_sf_iso_images(self, nrows, ncols):
def store_sf_iso_images(self):
""" Store formula images in the database
Args
Expand All @@ -77,6 +84,8 @@ def store_sf_iso_images(self, nrows, ncols):
job_id = self.job_id
sf_db_id = self.sf_db_id
db_config = self.sm_config['db']
nrows = self.nrows
ncols = self.ncols

def iso_img_row_gen(((sf_id, adduct), img_list)):
for peak_i, img_sparse in enumerate(img_list):
Expand All @@ -101,3 +110,9 @@ def store_iso_img_rows(row_it):

# self.sf_iso_images.flatMap(iso_img_row_gen).coalesce(32).foreachPartition(store_iso_img_rows)
self.sf_iso_images.flatMap(iso_img_row_gen).foreachPartition(store_iso_img_rows)

def store(self):
logger.info('Storing search results to the DB')
self.clear_old_results()
self.store_sf_img_metrics()
self.store_sf_iso_images()
Empty file.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import numpy as np
from sm.engine.dataset import Dataset
from sm.engine.formulas import Formulas
from mock import MagicMock
from numpy.testing import assert_array_almost_equal

from sm.engine.formula_imager import sample_spectra, compute_sf_peak_images, compute_sf_images
from sm.engine.dataset import Dataset
from sm.engine.formulas import Formulas
from sm.engine.msm_basic.formula_imager import sample_spectra, compute_sf_peak_images, compute_sf_images
from sm.engine.tests.util import spark_context


Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import numpy as np
import pandas as pd
import pytest
from sm.engine.dataset import Dataset
from sm.engine.formula_img_validator import ImgMeasures, sf_image_metrics_est_fdr, filter_sf_images
from sm.engine.formula_img_validator import sf_image_metrics, get_compute_img_metrics
from sm.engine.formulas_segm import FormulasSegm
from mock import patch, MagicMock
from numpy.testing import assert_array_almost_equal
from pandas.util.testing import assert_frame_equal
from scipy.sparse import csr_matrix

from sm.engine.dataset import Dataset
from sm.engine.fdr import FDR
from sm.engine.formulas_segm import FormulasSegm
from sm.engine.msm_basic.formula_img_validator import ImgMeasures, sf_image_metrics_est_fdr
from sm.engine.msm_basic.formula_img_validator import sf_image_metrics, get_compute_img_metrics
from sm.engine.tests.util import spark_context, ds_config


@patch('sm.engine.formula_img_validator.isotope_pattern_match', return_value=0.95)
@patch('sm.engine.formula_img_validator.isotope_image_correlation', return_value=0.8)
@patch('sm.engine.formula_img_validator.measure_of_chaos', return_value=0.99)
@patch('sm.engine.msm_basic.formula_img_validator.isotope_pattern_match', return_value=0.95)
@patch('sm.engine.msm_basic.formula_img_validator.isotope_image_correlation', return_value=0.8)
@patch('sm.engine.msm_basic.formula_img_validator.measure_of_chaos', return_value=0.99)
def test_get_compute_img_measures_pass(chaos_mock, image_corr_mock, pattern_match_mock):
img_gen_conf = {
'nlevels': 30,
Expand Down Expand Up @@ -49,7 +49,7 @@ def ds_formulas_images_mock():


def test_sf_image_metrics(spark_context, ds_formulas_images_mock, ds_config):
with patch('sm.engine.formula_img_validator.get_compute_img_metrics') as mock:
with patch('sm.engine.msm_basic.formula_img_validator.get_compute_img_metrics') as mock:
mock.return_value = lambda *args: (0.9, 0.9, 0.9)

ds_mock, formulas_mock, ref_images = ds_formulas_images_mock
Expand Down Expand Up @@ -86,21 +86,6 @@ def test_add_sf_image_est_fdr():
assert_frame_equal(res_metrics_df, exp_metrics_df)


def test_filter_sf_images(spark_context):
sf_iso_images = spark_context.parallelize([(0, [csr_matrix([[0, 100, 100], [10, 0, 3]]),
csr_matrix([[0, 50, 50], [0, 20, 0]])]),
(1, [csr_matrix([[0, 0, 0], [0, 0, 0]]),
csr_matrix([[0, 0, 0], [0, 0, 0]])])])

sf_metrics_df = (pd.DataFrame([[0, '+H', 0.9, 0.9, 0.9, 0.9**3]],
columns=['sf_id', 'adduct', 'chaos', 'spatial', 'spectral', 'msm'])
.set_index(['sf_id', 'adduct']))

flt_iso_images = filter_sf_images(sf_iso_images, sf_metrics_df)

assert dict(flt_iso_images.take(1)).keys() == dict(sf_iso_images.take(1)).keys()


@pytest.mark.parametrize("nan_value", [None, np.NaN, np.NAN, np.inf])
def test_img_measures_replace_invalid_measure_values(nan_value):
invalid_img_measures = ImgMeasures(nan_value, nan_value, nan_value)
Expand Down
21 changes: 21 additions & 0 deletions sm/engine/tests/msm_basic/test_msm_basic_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from scipy.sparse import csr_matrix
import pandas as pd

from sm.engine.msm_basic.msm_basic_search import MSMBasicSearch
from sm.engine.tests.util import spark_context


def test_filter_sf_images(spark_context):
sf_iso_images = spark_context.parallelize([(0, [csr_matrix([[0, 100, 100], [10, 0, 3]]),
csr_matrix([[0, 50, 50], [0, 20, 0]])]),
(1, [csr_matrix([[0, 0, 0], [0, 0, 0]]),
csr_matrix([[0, 0, 0], [0, 0, 0]])])])

sf_metrics_df = (pd.DataFrame([[0, '+H', 0.9, 0.9, 0.9, 0.9**3]],
columns=['sf_id', 'adduct', 'chaos', 'spatial', 'spectral', 'msm'])
.set_index(['sf_id', 'adduct']))

search_alg = MSMBasicSearch(None, None, None, None, None)
flt_iso_images = search_alg.filter_sf_images(sf_iso_images, sf_metrics_df)

assert dict(flt_iso_images.take(1)).keys() == dict(sf_iso_images.take(1)).keys()
2 changes: 1 addition & 1 deletion tests/sci_test_search_job_spheroid_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


def print_metric_hist(metric_arr, bins=10):
metric_freq, metric_interv = np.histogram(metric_arr, bins=np.linspace(0, 1, 11))
metric_freq, metric_interv = np.histogram(metric_arr, bins=np.linspace(-1, 1, 21))
metric_interv = map(lambda x: round(x, 2), metric_interv)
pprint(zip(zip(metric_interv[:-1], metric_interv[1:]), metric_freq))

Expand Down
Loading

0 comments on commit 721c075

Please sign in to comment.