Skip to content

Commit

Permalink
🚲Fix retry Error
Browse files Browse the repository at this point in the history
  • Loading branch information
NatureGeorge committed Sep 28, 2020
1 parent 9f8a765 commit 6fde377
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 177 deletions.
11 changes: 6 additions & 5 deletions pdb_profiling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,21 @@ def default_id_tag(identifier:str, default:str='', raise_error:bool=False):
return default


def default_config():
def default_config(folder='./'):
from pdb_profiling.log import Abclog
from pdb_profiling.fetcher.webfetch import UnsyncFetch
from pdb_profiling.processors.pdbe.record import Base
from pdb_profiling.processors.pdbe.api import ProcessPDBe
from pdb_profiling.processors.proteins.record import Identifier
# Use Existing Handled PDBe API Results (e.g. tsv format results)
ProcessPDBe.use_existing = True
# Use Existing API Results (e.g. json format results downloaded from web)
UnsyncFetch.use_existing = True
# Init Abclog Logger
Abclog.init_logger(logName='PDB-Profiling')
# Init WebFetcher's Logger (pass it with Abclog Logger)
UnsyncFetch.init_setting(Abclog.logger)
# Set WebFetcher's Semaphore
Base.set_web_semaphore(30)
Base.set_web_semaphore(30).result()
Identifier.set_web_semaphore(30).result()
# Set Folder that store downloaded and handled files
Base.set_folder('./')
Base.set_folder(folder)
Identifier.set_folder(folder)
17 changes: 2 additions & 15 deletions pdb_profiling/fetcher/webfetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,10 @@ class UnsyncFetch(Abclog):
* https://tenacity.readthedocs.io/en/latest/
'''

retry_kwargs = {
'wait': wait_random(max=10),
'stop': stop_after_attempt(3),
# 'after': after_log(logger, logging.WARNING)
}
use_existing: bool = False
logger_set:bool = False

@classmethod
def init_setting(cls, logger: Optional[logging.Logger] = None):
if not cls.logger_set:
cls.init_logger(cls.__name__, logger)
cls.retry_kwargs['after'] = after_log(cls.logger, logging.WARNING)
cls.http_download = retry(cls.http_download, **cls.retry_kwargs)
cls.ftp_download = retry(cls.ftp_download, **cls.retry_kwargs)
cls.logger_set = True

@classmethod
@retry(wait=wait_random(max=10), stop=stop_after_attempt(3))
async def http_download(cls, method: str, info: Dict, path: str):
if cls.use_existing is True and os.path.exists(path):
return path
Expand All @@ -96,6 +82,7 @@ async def http_download(cls, method: str, info: Dict, path: str):
raise Exception(mes)

@classmethod
@retry(wait=wait_random(max=10), stop=stop_after_attempt(3))
async def ftp_download(cls, method: str, info: Dict, path: str):
url = furl(info['url'])
fileName = url.path.segments[-1]
Expand Down
143 changes: 1 addition & 142 deletions pdb_profiling/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,148 +15,7 @@
from pdb_profiling.processors.pdbe.api import PDBeModelServer, PDBArchive,PDBVersioned
from pdb_profiling.processors.uniprot.api import UniProtFASTA
from pdb_profiling.processors.proteins.api import ProteinsAPI
from pdb_profiling.processors.proteins.record import Identifier
from pdb_profiling.processors.ensembl.api import EnsemblAPI
from pdb_profiling.processors.eutils.api import EutilsAPI
from pdb_profiling.processors.swissmodel.api import SMR
from pdb_profiling.processors.proteins import ProteinsDB
from pdb_profiling.log import Abclog
from pdb_profiling.utils import init_folder_from_suffix,a_seq_reader, a_load_json
from re import compile as re_compile
from pathlib import Path
from typing import Union, Optional
from unsync import unsync


class Identifier(Abclog):
suffix = r'[0-9]+)[\.]*([0-9]*)'
pats = {
('RefSeq', 'transcript'): re_compile(f'(NM_{suffix}'),
('RefSeq', 'protein'): re_compile(f'(NP_{suffix}'),
('Ensembl', 'gene'): re_compile(f'(ENSG{suffix}'),
('Ensembl', 'transcript'): re_compile(f'(ENST{suffix}'),
('Ensembl', 'protein'): re_compile(f'(ENSP{suffix}')}

@classmethod
def set_folder(cls, folder: Union[Path, str]):
cls.folder = Path(folder)
cls.sqlite_api = ProteinsDB(
"sqlite:///%s" % (init_folder_from_suffix(cls.folder, 'local_db')/"proteinsAPI.db"))
cls.proteins_api_folder = init_folder_from_suffix(
cls.folder, 'proteins/api/proteins')
cls.seq_folder = dict(
RefSeq=init_folder_from_suffix(cls.folder, 'eutils/efetch'),
Ensembl=init_folder_from_suffix(cls.folder, 'ensembl/sequence/id'))
cls.ensembl_archive_folder = init_folder_from_suffix(
cls.folder, 'ensembl/archive/id')

@classmethod
def get_type(cls, identifier: str):
for key, pat in cls.pats.items():
res = pat.fullmatch(identifier)
if bool(res):
return key, res.groups()

def __init__(self, identifier: str, folder: Optional[Union[Path, str]] = None):
try:
(self.source, self.level), (self.identifier,
self.version) = self.get_type(identifier)
self.raw_identifier = identifier
if folder is not None:
self.set_db(folder)
getattr(self, 'sqlite_api')
except TypeError:
raise ValueError(f"Unexcepted identifier type: {identifier}")
except AttributeError:
raise AttributeError(
"Please specify class variable `sqlite_api` via set_folder() first or pass `folder` in this method!")
self.ensembl_status = None
self.refseq_status = None

def __repr__(self):
return f'<{self.source} {self.level} {self.identifier} {self.version}>'

@unsync
async def set_status(self):
if self.source == 'Ensembl':
headers = {'Content-Type': 'application/json'}
self.ensembl_status = await EnsemblAPI.single_retrieve('archive/id/',
self.identifier,
headers,
self.ensembl_archive_folder,
Base.get_web_semaphore(),
headers=headers).then(a_load_json)

@unsync
async def fetch_from_ProteinsAPI(self):
dbReferences_df, iso_df = ProteinsAPI.pipe_summary(await ProteinsAPI.single_retrieve(
'proteins/',
dict(offset=0, size=-1, reviewed='true', isoform=0),
self.proteins_api_folder,
Base.get_web_semaphore(),
identifier=f'{self.source}:{self.identifier}'
).then(a_load_json))
if (dbReferences_df is not None) and (len(dbReferences_df) > 0):
await self.sqlite_api.async_insert(
self.sqlite_api.DB_REFERENCES, dbReferences_df.to_dict('records'))
if iso_df is not None:
await self.sqlite_api.async_insert(
self.sqlite_api.ALTERNATIVE_PRODUCTS, iso_df.rename(columns={'ids': 'isoform'}).to_dict('records'))
else:
self.logger.warning(
f"Can't find ALTERNATIVE_PRODUCTS with {self.identifier}")
else:
self.logger.warning(
f"Can't find dbReference with {self.identifier}")

@unsync
async def map2unp(self):
try:
entry, isoform = await self.sqlite_api.database.fetch_one(
query=f"""
SELECT Entry,isoform FROM dbReferences
WHERE type == '{self.source}' AND {self.level} == '{self.raw_identifier}'""")
except TypeError:
return

if isoform is not None:
sequenceStatus = await self.sqlite_api.database.fetch_val(
query=f"""
SELECT sequenceStatus FROM ALTERNATIVE_PRODUCTS
WHERE Entry == '{entry}' AND isoform == '{isoform}'""")
if sequenceStatus == 'displayed':
return entry, entry
else:
res = await self.sqlite_api.database.fetch_one(
query=f"""
SELECT sequenceStatus FROM ALTERNATIVE_PRODUCTS
WHERE Entry == '{entry}'""")
if res is None:
return entry, entry
return entry, isoform

@unsync
async def fetch_sequence(self, newest: bool = True):
if self.source == 'RefSeq':
return await EutilsAPI.single_retrieve('efetch.fcgi',
dict(
db='sequences', id=self.identifier if newest else self.raw_identifier, rettype='fasta'),
self.seq_folder['RefSeq'],
Base.get_web_semaphore()).then(a_seq_reader)
elif self.source == 'Ensembl':
if self.ensembl_status is None:
await self.set_status()
if self.ensembl_status['is_current'] != '1':
self.logger.warning(
f'Not exists in current archive: \n{self.ensembl_status}')
return
elif self.ensembl_status is False:
self.logger.warning(
f'Invalid Identifier!')
if not newest:
self.logger.warning(
"Can't retrieve older version Ensembl Sequence via Ensembl REST API!")
return await EnsemblAPI.single_retrieve('sequence/id/',
self.identifier,
dict(type='protein'),
self.seq_folder['Ensembl'],
Base.get_web_semaphore()).then(a_seq_reader)
24 changes: 14 additions & 10 deletions pdb_profiling/processors/ensembl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,31 @@ class EnsemblAPI(Abclog):

@classmethod
def get_file_suffix(cls, headers: Optional[Dict]) -> str:
if headers is None:
headers = cls.headers
res = headers["Content-Type"].split('/')[1]
assert res in ('plain', 'x-seqxml+xml', 'x-fasta', 'json'), f"Unexcepted Case: {cls.headers}"
return res.replace('x-', '').replace('seqxml+', '')

@classmethod
def task_unit(cls, suffix: str, identifier: str, params: Dict, folder: Path, headers:Optional[Dict]) -> Tuple:
args = dict(
url=f'{BASE_URL}{suffix}{identifier}',
headers=cls.headers if headers is None else headers,
params=params)
def task_unit(cls, suffix: str, identifier: str, params: Optional[Dict], folder: Path, headers:Optional[Dict]) -> Tuple:
headers = cls.headers if headers is None else headers
if params is not None:
args = dict(
url=f'{BASE_URL}{suffix}{identifier}',
headers=headers,
params=params)
else:
args = dict(
url=f'{BASE_URL}{suffix}{identifier}',
headers=headers)
return 'get', args, folder/f'{identifier}.{cls.get_file_suffix(headers)}'

@classmethod
def yieldTasks(cls, suffix: str, identifiers: Iterable[str], params_collection: Iterable[Dict], folder: Path, headers: Optional[Dict]) -> Generator:
def yieldTasks(cls, suffix: str, identifiers: Iterable[str], params_collection: Iterable[Optional[Dict]], folder: Path, headers: Optional[Dict]) -> Generator:
for identifier, params in zip(identifiers, params_collection):
yield cls.task_unit(suffix, identifier, params, folder, headers)

@classmethod
def retrieve(cls, suffix: str, identifiers: Iterable[str], params_collection: Iterable[Dict], folder: Union[Path, str], concur_req: int = 20, rate: float = 1.5, ret_res: bool = True, headers: Optional[Dict] = None, **kwargs):
def retrieve(cls, suffix: str, identifiers: Iterable[str], params_collection: Iterable[Optional[Dict]], folder: Union[Path, str], concur_req: int = 20, rate: float = 1.5, ret_res: bool = True, headers: Optional[Dict] = None, **kwargs):
assert suffix in cls.api_set, f"Invalid suffix! Valid set is \n{cls.api_set}"
folder = Path(folder)
res = UnsyncFetch.multi_tasks(
Expand All @@ -58,7 +62,7 @@ def retrieve(cls, suffix: str, identifiers: Iterable[str], params_collection: It
return res

@classmethod
def single_retrieve(cls, suffix: str, identifier: str, params: Dict, folder: Union[Path, str], semaphore, rate: float = 1.5, headers: Optional[Dict] = None):
def single_retrieve(cls, suffix: str, identifier: str, params: Optional[Dict], folder: Union[Path, str], semaphore, rate: float = 1.5, headers: Optional[Dict] = None):
assert suffix in cls.api_set, f"Invalid suffix! Valid set is \n{cls.api_set}"
folder = Path(folder)
return UnsyncFetch.single_task(
Expand Down
16 changes: 12 additions & 4 deletions pdb_profiling/processors/pdbe/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class Base(object):

folder = None

def __init__(self):
self.tasks = dict()

def get_id(self):
pass

def __repr__(self):
return f"<{self.__class__.__name__} {self.get_id()}>"

Expand All @@ -61,16 +67,18 @@ def set_sqlite_connection(self, api):
pass

@classmethod
def set_web_semaphore(cls, web_semaphore_value):
cls.web_semaphore = init_semaphore(web_semaphore_value).result()
@unsync
async def set_web_semaphore(cls, web_semaphore_value):
cls.web_semaphore = await init_semaphore(web_semaphore_value)

@classmethod
def get_web_semaphore(cls):
return cls.web_semaphore

@classmethod
def set_db_semaphore(cls, db_semaphore_value):
cls.db_semaphore = init_semaphore(db_semaphore_value).result()
@unsync
async def set_db_semaphore(cls, db_semaphore_value):
cls.db_semaphore = await init_semaphore(db_semaphore_value)

@classmethod
def get_db_semaphore(cls):
Expand Down
Loading

0 comments on commit 6fde377

Please sign in to comment.