From 2553832a5c252fe8be43978be4b42d27a8b00846 Mon Sep 17 00:00:00 2001 From: NatureGeorge <414731811@qq.com> Date: Fri, 30 Oct 2020 23:41:33 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=B7=E2=80=8D=E2=99=82=EF=B8=8FImplemen?= =?UTF-8?q?t=20cachetools=20to=20fix=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pdb_profiling/processors/pdbe/record.py | 234 +++++++++++++++++------- pdb_profiling/utils.py | 2 +- setup.py | 3 +- 3 files changed, 167 insertions(+), 72 deletions(-) diff --git a/pdb_profiling/processors/pdbe/record.py b/pdb_profiling/processors/pdbe/record.py index 8964fcd..e540141 100644 --- a/pdb_profiling/processors/pdbe/record.py +++ b/pdb_profiling/processors/pdbe/record.py @@ -42,6 +42,7 @@ from pdb_profiling.warnings import PISAErrorWarning, MultiWrittenWarning from textdistance import sorensen from warnings import warn +from cachetools import LRUCache API_SET = {api for apiset in API_SET for api in apiset[1]} @@ -73,9 +74,7 @@ def __call__(self, that): class Base(object): folder = None - - def __init__(self): - self.tasks = dict() + tasks = LRUCache(maxsize=1024) def get_id(self): pass @@ -83,8 +82,9 @@ def get_id(self): def __repr__(self): return f"<{self.__class__.__name__} {self.get_id()}>" - def register_task(self, key: Hashable, task: Unfuture): - self.tasks[key] = task + @classmethod + def register_task(cls, key: Hashable, task: Unfuture): + cls.tasks[key] = task def set_neo4j_connection(self, api): pass @@ -129,7 +129,7 @@ def check_folder(cls): def fetch_from_web_api(self, api_suffix: str, then_func: Optional[Callable[[Unfuture], Unfuture]] = None, json: bool = False, mask_id: str = None) -> Unfuture: assert api_suffix in API_SET, f"Invlaid API SUFFIX! Valid set:\n{API_SET}" - task = self.tasks.get((api_suffix, then_func, json, mask_id), None) + task = self.tasks.get((repr(self), api_suffix, then_func, json, mask_id), None) if task is not None: return task @@ -143,7 +143,7 @@ def fetch_from_web_api(self, api_suffix: str, then_func: Optional[Callable[[Unfu task = ProcessPDBe.single_retrieve(**args) if then_func is not None: task = task.then(then_func) - self.register_task((api_suffix, then_func, json, mask_id), task) + self.register_task((repr(self), api_suffix, then_func, json, mask_id), task) return task @classmethod @@ -171,6 +171,8 @@ async def to_dataframe_with_kwargs(cls, path, **kwargs): class PDB(Base): + tasks = LRUCache(maxsize=1024) + protein_sequence_pat = re_compile(r'([A-Z]{1}|\([A-Z0-9]+\))') nucleotide_sequence_pat = re_compile(r'([AUCG]{1}|\(DA\)|\(DT\)|\(DC\)|\(DG\)|\(UNK\))') StatsProteinEntitySeq = namedtuple( @@ -237,7 +239,6 @@ def status(self): def __init__(self, pdb_id: str): self.check_folder() self.set_id(pdb_id) - self.tasks = dict() self.pdb_ob = self self.properties_inited = False @@ -250,7 +251,7 @@ def get_id(self): def fetch_from_modelServer_api(self, api_suffix: str, method: str = 'post', data_collection=None, params=None, then_func: Optional[Callable[[Unfuture], Unfuture]] = None) -> Unfuture: assert api_suffix in PDBeModelServer.api_set, f"Invlaid API SUFFIX! Valid set:\n{PDBeModelServer.api_set}" - task = self.tasks.get((PDBeModelServer.root, api_suffix, method, data_collection, params, then_func), None) + task = self.tasks.get((repr(self), PDBeModelServer.root, api_suffix, method, data_collection, params, then_func), None) if task is not None: return task task = PDBeModelServer.single_retrieve( @@ -263,12 +264,12 @@ def fetch_from_modelServer_api(self, api_suffix: str, method: str = 'post', data params=params) if then_func is not None: task = task.then(then_func) - self.register_task((PDBeModelServer.root, api_suffix, method, data_collection, params, then_func), task) + self.register_task((repr(self), PDBeModelServer.root, api_suffix, method, data_collection, params, then_func), task) return task def fetch_from_PDBArchive(self, api_suffix: str, then_func: Optional[Callable[[Unfuture], Unfuture]] = None, **kwargs) -> Unfuture: assert api_suffix in PDBArchive.api_set, f"Invlaid API SUFFIX! Valid set:\n{PDBArchive.api_set}" - task = self.tasks.get((PDBArchive.root, api_suffix, then_func), None) + task = self.tasks.get((repr(self), PDBArchive.root, api_suffix, then_func), None) if task is not None: return task task = PDBArchive.single_retrieve( @@ -279,7 +280,7 @@ def fetch_from_PDBArchive(self, api_suffix: str, then_func: Optional[Callable[[U **kwargs) if then_func is not None: task = task.then(then_func) - self.register_task((PDBArchive.root, api_suffix, then_func), task) + self.register_task((repr(self), PDBArchive.root, api_suffix, then_func), task) return task @classmethod @@ -335,8 +336,9 @@ def to_rank(rank_dict, assembly_id, struct_asym_id): assg_cols = ('_pdbx_struct_assembly_gen.asym_id_list', '_pdbx_struct_assembly_gen.oper_expression', '_pdbx_struct_assembly_gen.assembly_id') - oper_cols = ('_pdbx_struct_oper_list.id', - '_pdbx_struct_oper_list.symmetry_operation') + oper_cols = ('_pdbx_struct_oper_list.id', + '_pdbx_struct_oper_list.symmetry_operation', + '_pdbx_struct_oper_list.name') async with aiofiles_open(await path, 'rt') as file_io: handle = await file_io.read() handle = (i+'\n' for i in handle.split('\n')) @@ -363,11 +365,16 @@ def to_rank(rank_dict, assembly_id, struct_asym_id): assg_df['asym_id_rank'] = assg_df.apply(lambda x: to_rank( rank_dict, x['assembly_id'], x['struct_asym_id']), axis=1) - oper_dict = dict(zip(*[mmcif_dict[col] for col in oper_cols])) + oper_dict_1 = dict(zip(*[mmcif_dict[col] for col in oper_cols[:2]])) assg_df['symmetry_operation'] = assg_df.oper_expression.apply( - lambda x: [oper_dict[i] for i in json.loads(x)]) + lambda x: [oper_dict_1[i] for i in json.loads(x)]) assg_df.symmetry_operation = assg_df.symmetry_operation.apply( lambda x: json.dumps(x).decode('utf-8')) + oper_dict_2 = dict(zip(*[mmcif_dict[col] for col in (oper_cols[0], oper_cols[2])])) + assg_df['symmetry_id'] = assg_df.oper_expression.apply( + lambda x: [oper_dict_2[i] for i in json.loads(x)]) + assg_df.symmetry_id = assg_df.symmetry_id.apply( + lambda x: json.dumps(x).decode('utf-8')) assg_df.assembly_id = assg_df.assembly_id.astype(int) return assg_df @@ -375,7 +382,7 @@ def to_rank(rank_dict, assembly_id, struct_asym_id): @unsync async def pipe_assg_data_collection(self) -> str: '''demo_dict = {"atom_site": [{"label_asym_id": "A", "label_seq_id": 23}]}''' - res_df = await self.pdb_ob.fetch_from_web_api('api/pdb/entry/residue_listing/', self.to_dataframe) + res_df = await self.pdb_ob.fetch_from_web_api('api/pdb/entry/residue_listing/', Base.to_dataframe) # res_dict = iter_first(res_df, lambda row: row.observed_ratio > 0) # assert res_dict is not None, f"{self.pdb_ob}: Unexpected Cases, without observed residues?!" res_dict = res_df.loc[res_df.observed_ratio.gt(0).idxmax()].to_dict() @@ -413,12 +420,12 @@ async def set_res2eec_df(self, merge_with_molecules_info:bool=True): * 6a8r * 6e32 ''' - res_df = await self.fetch_from_web_api('api/pdb/entry/residue_listing/', self.to_dataframe) + res_df = await self.fetch_from_web_api('api/pdb/entry/residue_listing/', Base.to_dataframe) eec_df = res_df[['pdb_id', 'entity_id', 'chain_id', 'struct_asym_id']].drop_duplicates().reset_index(drop=True) # eec_df['struct_asym_id_in_assembly'] = eec_df.struct_asym_id if merge_with_molecules_info: - mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) self.res2eec_df = eec_df.merge(mol_df, how='left') else: self.res2eec_df = eec_df @@ -460,8 +467,8 @@ def to_assembly_id(pdb_id, assemblys): for assembly_id in assemblys: yield f"{pdb_id}/{assembly_id}" - ass_eec_df = await self.fetch_from_web_api('api/pdb/entry/assembly/', self.to_dataframe) - mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + ass_eec_df = await self.fetch_from_web_api('api/pdb/entry/assembly/', Base.to_dataframe) + mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) mol_df = mol_df[~mol_df.molecule_type.isin(('water', 'bound', 'carbohydrate_polymer'))] ass_eec_df = ass_eec_df[ass_eec_df.details.notnull() & ass_eec_df.entity_id.isin(mol_df.entity_id)] check_chain_num = ass_eec_df.groupby('assembly_id').in_chains.apply(lambda x: sum(i.count(',')+1 for i in x)) @@ -562,7 +569,7 @@ def dumpsOperators(ops: Iterable[Iterable[str]], sep1:str=',', sep2:str='&') -> async def stats_protein_entity_seq(self): store = await self.sqlite_api.StatsProteinEntitySeq.objects.filter(pdb_id=self.pdb_id).all() if not store: - muta_df = await self.to_dataframe_with_kwargs( + muta_df = await Base.to_dataframe_with_kwargs( self.fetch_from_web_api('api/pdb/entry/mutated_AA_or_NA/'), usecols=lambda x: not x.startswith('author') and not x in ('struct_asym_id', 'chain_id')) if muta_df is not None: @@ -604,7 +611,7 @@ def regist_info(record): muta_dict.get(record['entity_id'], tuple()))) # assert len(seq)-len(std_res) == len(non_res) - mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) mol_df.apply(regist_info, axis=1) if store: await self.sqlite_api.async_insert(self.sqlite_api.StatsProteinEntitySeq, store) @@ -643,7 +650,7 @@ def regist_info(record): to_interval(unk_res), len(unk_res))) - mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) mol_df.apply(regist_info, axis=1) if store: await self.sqlite_api.async_insert(self.sqlite_api.StatsNucleotideEntitySeq, store) @@ -657,7 +664,7 @@ def regist_info(record): ].append( (json.loads(record['start'])['residue_number'], json.loads(record['end'])['residue_number'])) - c_df = await self.fetch_from_web_api('api/pdb/entry/polymer_coverage/', self.to_dataframe) + c_df = await self.fetch_from_web_api('api/pdb/entry/polymer_coverage/', Base.to_dataframe) c_df.apply(regist_info, axis=1) return [self.StatsChainSeq(*key, value, range_len(value)) for key, value in store.items()] @@ -673,8 +680,8 @@ def regist_info(record): store[(record['pdb_id'], record['entity_id'], record['chain_id'], record['struct_asym_id']) ].append((record['residue_number'], record['observed_ratio'])) - res_df = await self.fetch_from_web_api('api/pdb/entry/residue_listing/', self.to_dataframe) - # mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + res_df = await self.fetch_from_web_api('api/pdb/entry/residue_listing/', Base.to_dataframe) + # mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) # res_df = res_df.merge(mol_df[mol_df.molecule_type.isin(('polypeptide(L)', 'polypeptide(D)'))][['entity_id']]) res_df[res_df.observed_ratio.gt(0)].apply(lambda x: regist_info(x), axis=1) @@ -683,7 +690,7 @@ def regist_info(record): # TODO: make sure whether bound_count including sugar & other bound_count = sum(value for key, value in summary_dict.items() if key in ('ligand', 'sugar', 'other', 'carbohydrate_polymer')) if bound_count: - br_df = await self.to_dataframe_with_kwargs( + br_df = await Base.to_dataframe_with_kwargs( self.fetch_from_web_api('api/pdb/entry/binding_sites/'), usecols=lambda x: not x.startswith('author')) else: @@ -731,7 +738,7 @@ async def get_sequence(self, **kwargs): Get SEQRES Sequence via entity_id | chain_id (default protein) | struct_asym_id ''' sequence_col = 'sequence' if kwargs.get('one_letter_code', True) else 'pdb_sequence' - mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) if 'entity_id' in kwargs: try: return mol_df.loc[mol_df.entity_id.eq(kwargs['entity_id']).idxmax(), sequence_col] @@ -775,7 +782,7 @@ async def profile_id(self): assg_oper_df = await self.fetch_from_modelServer_api( 'atoms', data_collection=await self.pipe_assg_data_collection(), - then_func=self.to_assg_oper_df) + then_func=PDB.to_assg_oper_df) res2eec_df = await self.get_res2eec_df() focus_res2eec_df = res2eec_df[['pdb_id', 'entity_id', 'molecule_type', 'chain_id', 'struct_asym_id']] add_0_assg_oper_df = focus_res2eec_df.copy() @@ -784,6 +791,7 @@ async def profile_id(self): add_0_assg_oper_df['asym_id_rank'] = 1 add_0_assg_oper_df['oper_expression'] = '' add_0_assg_oper_df['symmetry_operation'] = '' + add_0_assg_oper_df['symmetry_id'] = '' if assg_oper_df is not None: focus_assg_oper_df = assg_oper_df[assg_oper_df.struct_asym_id.isin(focus_res2eec_df.struct_asym_id)] @@ -796,7 +804,7 @@ async def profile_id(self): return add_0_assg_oper_df to_fetch_assembly = new_focus_assg_oper_df[ - new_focus_assg_oper_df.assembly_id.ne(0) & new_focus_assg_oper_df.symmetry_operation.ne('["x,y,z"]')].assembly_id.unique() + new_focus_assg_oper_df.assembly_id.ne(0) & new_focus_assg_oper_df.symmetry_id.ne('["1_555"]')].assembly_id.unique() if len(to_fetch_assembly)> 0: in_ass_df = await self.profile_asym_id_in_assembly(to_fetch_assembly) in_ass_df.columns = [i.replace('_pdbe_chain_remapping.', '') for i in in_ass_df.columns] @@ -829,7 +837,7 @@ async def profile_id(self): profile_id_df.loc[profile_id_df[profile_id_df.assembly_id.isin(self.subset_assembly)].index, 'au_subset'] = True # TODO: Check - ass_df = await self.fetch_from_web_api('api/pdb/entry/assembly/', self.to_dataframe) + ass_df = await self.fetch_from_web_api('api/pdb/entry/assembly/', Base.to_dataframe) a_df = profile_id_df[['assembly_id', 'entity_id']].drop_duplicates().query('assembly_id != 0') for assembly_id, entity_id in zip(a_df.assembly_id, a_df.entity_id): profile_lyst = tuple(profile_id_df[profile_id_df.assembly_id.eq(assembly_id) & profile_id_df.entity_id.eq(entity_id)].struct_asym_id_in_assembly.sort_values().tolist()) @@ -844,14 +852,14 @@ async def get_expanded_map_res_df(self, UniProt, unp_range, pdb_range, **kwargs) assert kwargs.keys() <= frozenset({'chain_id', 'entity_id', 'struct_asym_id', 'pdb_id'}) res_map_df = DataFrame(zip(expand_interval(unp_range),expand_interval(pdb_range)), columns=['unp_residue_number', 'residue_number']) res_map_df['UniProt'] = UniProt - res_df = await self.fetch_from_web_api('api/pdb/entry/residue_listing/', self.to_dataframe) + res_df = await self.fetch_from_web_api('api/pdb/entry/residue_listing/', Base.to_dataframe) res_map_df_full = res_map_df.merge(res_df.query( 'and '.join(f'{key}=="{value}"' if isinstance(value, str) else f'{key}=={value}' for key,value in kwargs.items()))) assert res_map_df_full.shape[0] == res_map_df.shape[0] return res_map_df_full - async def pipe_interface_res_dict(self, chain_pairs=None, au2bu:bool=False, focus_assembly_ids=None, func='set_interface', **kwargs): - await self.set_assembly(focus_assembly_ids=focus_assembly_ids) + async def pipe_interface_res_dict(self, chain_pairs=None, au2bu:bool=False, focus_assembly_ids=None, func='set_interface', discard_multimer_chains_cutoff=16, **kwargs): + await self.set_assembly(focus_assembly_ids=focus_assembly_ids, discard_multimer_chains_cutoff=discard_multimer_chains_cutoff) for assembly in self.assembly.values(): await getattr(assembly, func)(**kwargs) if len(assembly.interface) == 0: @@ -887,6 +895,8 @@ async def expand_multiple_conformers(dfrm: Union[DataFrame, Unfuture, Coroutine] class PDBAssemble(PDB): + tasks = LRUCache(maxsize=1024) + id_pattern = re_compile(r"([a-z0-9]{4})/([0-9]+)") struct_range_pattern = re_compile(r"\[.+\]([A-Z]+):-?[0-9]+\??") # e.g. [FMN]B:149 [C2E]A:301 [ACE]H:-8? rare_pat = re_compile(r"([A-Z]+)_([0-9]+)") # e.g. 2rde assembly 1 A_1, B_1... @@ -904,8 +914,11 @@ def __init__(self, pdb_ass_id, pdb_ob: Optional[PDB]=None): self.pdb_ob = PDB(self.pdb_id) else: self.pdb_ob = pdb_ob + ''' + NOTE: reference: + ''' self.interface_filters = { - 'structure_2.symmetry_operator': ('eq', 'x,y,z'), + 'structure_2.symmetry_id': ('eq', '1_555'), 'css': ('ge', 0)} def set_id(self, pdb_ass_id: str): @@ -968,9 +981,10 @@ def to_interface_id(pdb_assembly_id, focus_interface_ids): try: interfacelist_df = await self.fetch_from_web_api( 'api/pisa/interfacelist/', - self.to_interfacelist_df, + PDBAssemble.to_interfacelist_df, mask_id=f'{self.pdb_id}/0' if use_au else None) except WithoutExpectedKeyError: + warn(f"cannot get interfacelist data from PISA: {self.get_id()}", PISAErrorWarning) self.interface = dict() return @@ -1086,6 +1100,8 @@ async def pipe_protein_nucleotide_interface(self, molecule_types=None): class PDBInterface(PDBAssemble): + tasks = LRUCache(maxsize=1024) + id_pattern = re_compile(r"([a-z0-9]{4})/([0-9]+)/([0-9]+)") def __init__(self, pdb_ass_int_id, pdbAssemble_ob: Optional[PDBAssemble]=None, use_au:bool=False): @@ -1150,8 +1166,9 @@ def check_struct_selection(interfacedetail_df, colName): @unsync async def set_interface_res(self, keep_interface_res_df:bool=False): try: - interfacedetail_df = await self.fetch_from_web_api('api/pisa/interfacedetail/', self.to_interfacedetail_df, mask_id=f'{self.pdb_id}/0/{self.interface_id}' if self.use_au else None) + interfacedetail_df = await self.fetch_from_web_api('api/pisa/interfacedetail/', PDBInterface.to_interfacedetail_df, mask_id=f'{self.pdb_id}/0/{self.interface_id}' if self.use_au else None) except WithoutExpectedKeyError: + warn(f"cannot get interfacedetail data from PISA: {self.get_id()}", PISAErrorWarning) return except Exception as e: raise AssertionError(e) @@ -1165,7 +1182,7 @@ async def set_interface_res(self, keep_interface_res_df:bool=False): # NOTE: Exception example: 2beq assembly_id 1 interface_id 32 warn(f"interfacedetail({struct_sele_set}) inconsistent with interfacelist({set(self.info['chains'])}) ! May miss some data.", PISAErrorWarning) eec_as_df = await self.pdbAssemble_ob.get_assemble_eec_as_df() - res_df = await self.pdbAssemble_ob.pdb_ob.fetch_from_web_api('api/pdb/entry/residue_listing/', self.to_dataframe) + res_df = await self.pdbAssemble_ob.pdb_ob.fetch_from_web_api('api/pdb/entry/residue_listing/', Base.to_dataframe) interfacedetail_df = interfacedetail_df.merge(eec_as_df, how="left") interfacedetail_df = interfacedetail_df.merge(res_df, how="left") if keep_interface_res_df: @@ -1252,13 +1269,15 @@ async def get_interface_res_dict(self, **kwargs): class SIFTS(PDB): + tasks = LRUCache(maxsize=1024) + EntityChain = namedtuple('EntityChain', 'pdb_id entity_chain_info entity_count chain_count') UniProtEntity = namedtuple('UniProtEntity', 'pdb_id unp_entity_info entity_unp_info entity_with_unp_count min_unp_count') OligoState = namedtuple('OligoState', 'pdb_id oligo_state has_unmapped_protein') MappingMeta = namedtuple('MappingMeta', 'UniProt species identity') - chain_filter = 'UNK_COUNT == 0 and ca_p_only == False and identity >=0.9 and repeated == False and reversed == False and OBS_COUNT > 20' - entry_filter = '(experimental_method == "X-ray diffraction" and resolution <= 3) or experimental_method == "Solution NMR"' + chain_filter = 'UNK_COUNT < SEQRES_COUNT and ca_p_only == False and identity >=0.9 and repeated == False and reversed == False and OBS_COUNT > 20' + entry_filter = '(experimental_method in ["X-ray diffraction", "Electron Microscopy"] and resolution <= 3) or experimental_method == "Solution NMR"' complete_chains_run_as_completed = False @@ -1288,24 +1307,29 @@ async def complete_chains(cls, dfrm: Union[DataFrame, Unfuture, Coroutine]): if cls.complete_chains_run_as_completed: res = await SIFTSs(dfrm.pdb_id.unique()).fetch('fetch_from_web_api', api_suffix='api/mappings/all_isoforms/', - then_func=SIFTS.to_dataframe).run() + then_func=Base.to_dataframe).run() else: res = [await task for task in SIFTSs(dfrm.pdb_id.unique()).fetch('fetch_from_web_api', api_suffix='api/mappings/all_isoforms/', - then_func=SIFTS.to_dataframe).tasks] + then_func=Base.to_dataframe).tasks] return concat(res, sort=False, ignore_index=True) @staticmethod @unsync - async def reformat(dfrm: Union[DataFrame, Unfuture, Coroutine]) -> DataFrame: - if isinstance(dfrm, (Coroutine, Unfuture)): - dfrm = await dfrm + def reformat(dfrm: Union[DataFrame, Unfuture]) -> DataFrame: + if isinstance(dfrm, Unfuture): + dfrm = dfrm.result() if 'pdb_start' not in dfrm.columns: flat_dict_in_df(dfrm, dfrm.start.apply(json.loads), ('residue_number',)) flat_dict_in_df(dfrm, dfrm.end.apply(json.loads), ('residue_number',)) dfrm.rename(columns={ 'start.residue_number': 'pdb_start', 'end.residue_number': 'pdb_end'}, inplace=True) + ''' + NOTE: sort for cases like P00441 5j0c (chain A,B) + NOTE: hasn't handle multiple identity values! + ''' + dfrm.sort_values(by=['UniProt', 'pdb_id', 'struct_asym_id', 'pdb_start'], inplace=True) group_info_col = ['pdb_id', 'chain_id', 'UniProt'] range_info_col = ['pdb_start', 'pdb_end', 'unp_start', 'unp_end'] reader = SeqRangeReader(group_info_col) @@ -1320,9 +1344,9 @@ async def reformat(dfrm: Union[DataFrame, Unfuture, Coroutine]) -> DataFrame: @staticmethod @unsync - async def dealWithInDel(dfrm: Union[DataFrame, Unfuture, Coroutine], sort_by_unp: bool = True) -> DataFrame: - if isinstance(dfrm, (Coroutine, Unfuture)): - dfrm = await dfrm + def dealWithInDel(dfrm: Union[DataFrame, Unfuture], sort_by_unp: bool = True) -> DataFrame: + if isinstance(dfrm, Unfuture): + dfrm = dfrm.result() def add_tage_to_range(df: DataFrame, tage_name: str): # ADD TAGE FOR SIFTS @@ -1406,14 +1430,14 @@ def reverse_dict(info_dict): def min_unp(info_dict): return min(len(set(res)) for res in product(*info_dict.values())) - mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', self.to_dataframe) + mol_df = await self.fetch_from_web_api('api/pdb/entry/molecules/', Base.to_dataframe) mol_df = mol_df[mol_df.molecule_type.eq('polypeptide(L)')] info_dict = dict(zip(mol_df.entity_id,mol_df.in_chains)) entity_count = len(info_dict) chain_count = sum(i.count(',')+1 for i in info_dict.values()) try: - best_iso = await self.fetch_from_web_api('api/mappings/isoforms/', self.to_dataframe).then(self.reformat) + best_iso = await self.fetch_from_web_api('api/mappings/isoforms/', Base.to_dataframe).then(self.reformat) # best_iso = best_iso.sort_values(by='identity', ascending=False).drop_duplicates(subset='entity_id') unp_entity_info = defaultdict(dict) best_iso.apply(lambda x: regist_info(unp_entity_info, x), axis=1) @@ -1493,7 +1517,7 @@ async def add_residue_conflict(cls, dfrm: Union[DataFrame, Tuple, Unfuture, Coro async def fetch_new_residue_mapping(self, entity_id, start, end): df = await self.fetch_from_web_api( 'graph-api/residue_mapping/', - self.to_dataframe, + Base.to_dataframe, mask_id=f'{self.pdb_id}/{entity_id}/{start}/{end}') if df is not None: await self.sqlite_api.async_insert(self.sqlite_api.ResidueMapping, df.to_dict('records')) @@ -1623,7 +1647,7 @@ async def a_sliding_alignment(cls, range_diff, pdb_range, unp_range, pdb_id, ent @unsync async def pipe_score(self, sifts_df=None, complete_chains:bool=False, weight=array([1, -1, -1, -1.79072623, -2.95685934, -4.6231746])): if sifts_df is None: - init_task = await self.fetch_from_web_api('api/mappings/all_isoforms/', self.to_dataframe) + init_task = await self.fetch_from_web_api('api/mappings/all_isoforms/', Base.to_dataframe) if init_task is None: return if complete_chains: init_task = await self.complete_chains(init_task) @@ -1774,11 +1798,12 @@ async def search_partner_from_i3d(Entry, interaction_types, columns='pdb_id,Entr return DataFrame(res, columns=columns.split(',')) @staticmethod - def parallel_interact_df(sifts_df, i3d_df): - # TODO: .drop(columns=['']) - sifts_df_ = sifts_df.add_suffix('_1').rename(columns={'pdb_id_1': 'pdb_id'}) + def parallel_interact_df(sifts_df, i3d_df, common_cols=('revision_date', 'deposition_date', 'experimental_method', 'experimental_method_class', 'multi_method', 'resolution', '1/resolution')): + rename_dict = dict(zip((f'{i}_1' for i in common_cols), common_cols)) + rename_dict['pdb_id_1'] = 'pdb_id' + sifts_df_ = sifts_df.add_suffix('_1').rename(columns=rename_dict) i3d_df = i3d_df.merge(sifts_df_) - sifts_df_ = sifts_df.add_suffix('_2').rename(columns={'pdb_id_2': 'pdb_id'}) + sifts_df_ = sifts_df.drop(columns=sifts_df.columns & set(common_cols)).add_suffix('_2').rename(columns={'pdb_id_2': 'pdb_id'}) i3d_df = i3d_df.merge(sifts_df_) swap_index = i3d_df[ (i3d_df.struct_asym_id_1 > i3d_df.struct_asym_id_2) | @@ -1807,8 +1832,7 @@ async def pipe_interface_res_dict(p_df, pdb_id): pass @unsync - async def pipe_select_ho(self, exclude_pdbs=frozenset(), interface_mapped_cov_cutoff=0.8, unp_range_DSC_cutoff=0.8, wasserstein_distance_cutoff=0.2, run_as_completed:bool=False, progress_bar=None): - i3d_df = await self.search_partner_from_i3d(self.identifier.split('-')[0], 'ho') + async def pipe_select_ho(self, exclude_pdbs=frozenset(), interface_mapped_cov_cutoff=0.8, unp_range_DSC_cutoff=0.8, wasserstein_distance_cutoff=0.2, run_as_completed:bool=False, progress_bar=None,**kwargs): sele_df = await self.pipe_select_mo(exclude_pdbs) if sele_df is None: return @@ -1816,7 +1840,7 @@ async def pipe_select_ho(self, exclude_pdbs=frozenset(), interface_mapped_cov_cu lambda x: frozenset(combinations_with_replacement(x, 2))) # res = [i for pdb_id in chain_pairs.index async for i in PDB(pdb_id).pipe_interface_res_dict(chain_pairs=chain_pairs[pdb_id], au2bu=True, func='pipe_protein_protein_interface')] # interact_df = DataFrame(i for i in res if i is not None) - tasks = [self.pird_task_unit(pdb_id, chain_pairs[pdb_id]) for pdb_id in chain_pairs.index] + tasks = [self.pird_task_unit(pdb_id, chain_pairs[pdb_id], **kwargs) for pdb_id in chain_pairs.index] if run_as_completed: if progress_bar is not None: res = [await i for i in progress_bar(as_completed(tasks), total=len(tasks))] @@ -1834,6 +1858,7 @@ async def pipe_select_ho(self, exclude_pdbs=frozenset(), interface_mapped_cov_cu ''' interact_df = interact_df[(~interact_df.interface_range_1.isnull()) & (~interact_df.interface_range_2.isnull())] p_a_df = self.parallel_interact_df(sele_df, interact_df) + i3d_df = await self.search_partner_from_i3d(self.identifier.split('-')[0], 'ho') if len(i3d_df) == 0: p_df = p_a_df p_df['in_i3d'] = False @@ -1866,14 +1891,67 @@ def sele_func(dfrm): return p_df + @unsync + async def pipe_select_ho_iso(self, exclude_pdbs=frozenset(), interface_mapped_cov_cutoff=0.8, DSC_cutoff=0.2, run_as_completed:bool=False, progress_bar=None, then_sort_interact:bool=True, **kwargs): + entry = self.identifier.split('-')[0] + sele_df = await self.pipe_select_mo(exclude_pdbs, complete_chains=True) + if sele_df is None: + return + sele_df = sele_df[sele_df.Entry.eq(entry)] + chain_pairs = sele_df.groupby('pdb_id').struct_asym_id.apply( + lambda x: frozenset(combinations_with_replacement(x, 2))) + tasks = [self.pird_task_unit(pdb_id, chain_pairs[pdb_id], **kwargs) for pdb_id in chain_pairs.index] + if run_as_completed: + if progress_bar is not None: + res = [await i for i in progress_bar(as_completed(tasks), total=len(tasks))] + else: + res = [await i for i in as_completed(tasks)] + else: + res = [await i for i in tasks] + interact_df = DataFrame(j for i in res for j in i if j is not None) + if len(interact_df) == 0: + return + interact_df = interact_df[(~interact_df.interface_range_1.isnull()) & (~interact_df.interface_range_2.isnull())] + p_a_df = self.parallel_interact_df(sele_df, interact_df) + p_a_df = p_a_df[(p_a_df.UniProt_1.eq(self.identifier) | p_a_df.UniProt_2.eq(self.identifier)) & (p_a_df.UniProt_1 != p_a_df.UniProt_2) & (p_a_df.struct_asym_id_in_assembly_1 != p_a_df.struct_asym_id_in_assembly_2)].reset_index(drop=True) + i3d_df = await self.search_partner_from_i3d(entry, 'ho') + if len(i3d_df) == 0: + p_df = p_a_df + p_df['in_i3d'] = False + else: + p_b_df = self.parallel_interact_df(sele_df[['UniProt', 'pdb_id', 'chain_id', 'struct_asym_id']], i3d_df) + p_df = p_a_df.merge(p_b_df, how='left') + p_df['in_i3d'] = p_df.organism.apply(lambda x: False if isna(x) else True) + p_df.drop(columns=['organism', 'interaction_type'], inplace=True) + p_df['best_select_rank'] = p_df[['select_rank_1', + 'select_rank_2']].apply(lambda x: min(x), axis=1) + p_df['unp_interface_range_1'] = p_df.apply(lambda x: to_interval(self.convert_index(x['new_unp_range_1'], x['new_pdb_range_1'], expand_interval(x['interface_range_1']))), axis=1) + p_df['unp_interface_range_2'] = p_df.apply(lambda x: to_interval(self.convert_index(x['new_unp_range_2'], x['new_pdb_range_2'], expand_interval(x['interface_range_2']))), axis=1) + p_df['i_select_tag'] = False + p_df['i_select_rank'] = -1 + p_df['i_group'] = p_df.apply(lambda x: tuple( + sorted((x['UniProt_1'], x['UniProt_2']))), axis=1) + allow_p_df = p_df[ + (p_df.best_select_rank > 0) & + ((p_df.unp_interface_range_1.apply(range_len)/p_df.interface_range_1.apply(range_len)) >= interface_mapped_cov_cutoff) & + ((p_df.unp_interface_range_2.apply(range_len)/p_df.interface_range_2.apply(range_len)) >= interface_mapped_cov_cutoff)] + + def sele_func(dfrm): + rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution', 'revision_date', 'id_score_1', 'id_score_2'], ascending=False).index + p_df.loc[rank_index, 'i_select_rank'] = range(1, len(rank_index)+1) + return select_he_range(dfrm.UniProt_1, dfrm.UniProt_2, dfrm.unp_interface_range_1, dfrm.unp_interface_range_2, rank_index, cutoff=DSC_cutoff) + + sele_indexes = allow_p_df.groupby('i_group').apply(sele_func) + p_df.loc[[j for i in sele_indexes for j in i], 'i_select_tag'] = True + return (await self.sort_interact_cols(p_df)) if then_sort_interact else p_df + @staticmethod @unsync - async def pird_task_unit(pdb_id, chain_pairs): - return [i async for i in PDB(pdb_id).pipe_interface_res_dict(chain_pairs=chain_pairs, au2bu=True, func='pipe_protein_protein_interface')] + async def pird_task_unit(pdb_id, chain_pairs, **kwargs): + return [i async for i in PDB(pdb_id).pipe_interface_res_dict(chain_pairs=chain_pairs, au2bu=True, func='pipe_protein_protein_interface', **kwargs)] @unsync - async def pipe_select_he(self, exclude_pdbs=frozenset(), interface_mapped_cov_cutoff=0.8, DSC_cutoff=0.2, run_as_completed:bool=False, progress_bar=None): - i3d_df = await self.search_partner_from_i3d(self.identifier.split('-')[0], "he") + async def pipe_select_he(self, exclude_pdbs=frozenset(), interface_mapped_cov_cutoff=0.8, DSC_cutoff=0.2, run_as_completed:bool=False, progress_bar=None, then_sort_interact:bool=True, **kwargs): sele_df = await self.pipe_select_mo(exclude_pdbs, complete_chains=True) if sele_df is None: return @@ -1881,7 +1959,7 @@ async def pipe_select_he(self, exclude_pdbs=frozenset(), interface_mapped_cov_cu return chain_pairs = sele_df.groupby(['pdb_id', 'entity_id']).struct_asym_id.apply(frozenset).groupby('pdb_id').apply(tuple).apply(lambda x: frozenset(res for i,j in combinations(range(len(x)), 2) for res in product(x[i], x[j]))) chain_pairs = chain_pairs[chain_pairs.apply(len) > 0] - tasks = [self.pird_task_unit(pdb_id, chain_pairs[pdb_id]) for pdb_id in chain_pairs.index] + tasks = [self.pird_task_unit(pdb_id, chain_pairs[pdb_id], **kwargs) for pdb_id in chain_pairs.index] if run_as_completed: if progress_bar is not None: res = [await i for i in progress_bar(as_completed(tasks), total=len(tasks))] @@ -1897,6 +1975,7 @@ async def pipe_select_he(self, exclude_pdbs=frozenset(), interface_mapped_cov_cu p_a_df = p_a_df[(p_a_df.UniProt_1.eq(self.identifier) | p_a_df.UniProt_2.eq(self.identifier)) & (p_a_df.Entry_1 != p_a_df.Entry_2)].reset_index(drop=True) if len(p_a_df) == 0: return + i3d_df = await self.search_partner_from_i3d(self.identifier.split('-')[0], "he") if len(i3d_df) == 0: p_df = p_a_df p_df['in_i3d'] = False @@ -1918,13 +1997,13 @@ async def pipe_select_he(self, exclude_pdbs=frozenset(), interface_mapped_cov_cu ((p_df.unp_interface_range_2.apply(range_len)/p_df.interface_range_2.apply(range_len)) >= interface_mapped_cov_cutoff)] def sele_func(dfrm): - rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution_1', 'revision_date_1', 'id_score_1', 'id_score_2'], ascending=False).index + rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution', 'revision_date', 'id_score_1', 'id_score_2'], ascending=False).index p_df.loc[rank_index, 'i_select_rank'] = range(1, len(rank_index)+1) return select_he_range(dfrm.UniProt_1, dfrm.UniProt_2, dfrm.unp_interface_range_1, dfrm.unp_interface_range_2, rank_index, cutoff=DSC_cutoff) sele_indexes = allow_p_df.groupby('i_group').apply(sele_func) p_df.loc[[j for i in sele_indexes for j in i], 'i_select_tag'] = True - return p_df + return (await self.sort_interact_cols(p_df)) if then_sort_interact else p_df @unsync async def pipe_select_ho_(self, exclude_pdbs=frozenset(), consider_interface:bool=False, unp_range_DSC_cutoff=0.8, wasserstein_distance_cutoff=0.2): @@ -1995,7 +2074,7 @@ async def pipe_select_he_(self, exclude_pdbs=frozenset(), consider_interface:boo allow_p_df = p_df[p_df.best_select_rank > 0] def sele_func(dfrm): - rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution_1', 'revision_date_1', 'id_score_1', 'id_score_2'], ascending=False).index + rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution', 'revision_date', 'id_score_1', 'id_score_2'], ascending=False).index p_df.loc[rank_index, 'i_select_rank'] = range(1, len(rank_index)+1) return select_he_range(dfrm.UniProt_1, dfrm.UniProt_2, dfrm.unp_interface_range_1, dfrm.unp_interface_range_2, rank_index, cutoff=DSC_cutoff) @@ -2006,7 +2085,7 @@ def sele_func(dfrm): allow_p_df = p_df[p_df.best_select_rank > 0] def sele_func(dfrm): - rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution_1', 'revision_date_1', 'id_score_1', 'id_score_2'], ascending=False).index + rank_index = dfrm.sort_values(by=['RAW_BS_1', 'RAW_BS_2', '1/resolution', 'revision_date', 'id_score_1', 'id_score_2'], ascending=False).index p_df.loc[rank_index, 'i_select_rank'] = range(1, len(rank_index)+1) return select_he_range(dfrm.Entry_1, dfrm.Entry_2, dfrm.new_unp_range_1, dfrm.new_unp_range_2, rank_index, cutoff=DSC_cutoff) @@ -2015,9 +2094,25 @@ def sele_func(dfrm): return p_df + @unsync + def sort_interact_cols(self, dfrm): + assert self.level == 'UniProt' + if isinstance(dfrm, Unfuture): + dfrm = dfrm.result() + swap_index = dfrm[dfrm.UniProt_1.ne(self.identifier)].index + cols_1 = [col for col in dfrm.columns if '_1' in col] + cols_2 = [col for col in dfrm.columns if '_2' in col] + store_1 = dfrm.loc[swap_index, cols_1].rename(columns=dict(zip(cols_1, [col.replace('_1', '_2') for col in cols_1]))) + store_2 = dfrm.loc[swap_index, cols_2].rename(columns=dict(zip(cols_2, [col.replace('_2', '_1') for col in cols_2]))) + dfrm.loc[swap_index, cols_1] = store_2 + dfrm.loc[swap_index, cols_2] = store_1 + return dfrm + class Compounds(Base): + tasks = LRUCache(maxsize=1024) + def set_id(self, identifier: str): assert len(identifier) > 0, "Empty string is not a valid identifier!" self.identifier = identifier.upper() @@ -2028,7 +2123,6 @@ def get_id(self): def __init__(self, identifier:str): self.check_folder() self.set_id(identifier) - self.tasks = dict() class PDBs(tuple): diff --git a/pdb_profiling/utils.py b/pdb_profiling/utils.py index 73f787d..bf6ae2d 100644 --- a/pdb_profiling/utils.py +++ b/pdb_profiling/utils.py @@ -675,7 +675,7 @@ def flat_dict_in_df(dfrm:DataFrame, targetCol:Union[str, Series], cols:List): standardAA = list(SEQ_DICT.keys()) -standardNu = ['DA', 'DT', 'DC', 'DG', 'A', 'U', 'C', 'G'] +standardNu = ['DA', 'DT', 'DC', 'DG', 'DI', 'A', 'U', 'C', 'G', 'I'] def range_len(lyst: Union[List, str, float]) -> int: diff --git a/setup.py b/setup.py index f497e2c..f87f9a3 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,8 @@ 'orm>=0.1.5', 'smart_open>=1.9.0', 'scipy>=1.4.1', - 'slugify>=0.0.1' + 'slugify>=0.0.1', + 'cachetools>=4.1.0' ], license="MIT", author_email="1730416009@stu.suda.edu.cn",