diff --git a/app/importing/models.py b/app/importing/models.py index d6f0d53..faa1daa 100644 --- a/app/importing/models.py +++ b/app/importing/models.py @@ -2,7 +2,7 @@ from typing import List, Optional from bson import ObjectId -from pydantic import AnyHttpUrl, FilePath, HttpUrl, root_validator, validator +from pydantic import AnyHttpUrl, Field, FilePath, HttpUrl, root_validator, validator from app.models import BaseModel, Genre, Language, ReleasePolicy, _AutoStrEnum @@ -18,15 +18,16 @@ class _ImportMeta(BaseModel): release: ReleasePolicy sourceLanguage: Optional[Language] genre: Optional[List[Genre]] - api_key: str -class ImportJob(BaseModel): +class FileImportJob(BaseModel): + state: JobStatus + api_key: str + dict_id: Optional[ObjectId] url: Optional[Url] # type: ignore file: Optional[FilePath] - state: JobStatus meta: _ImportMeta - dict_id: Optional[ObjectId] + id: Optional[ObjectId] = Field(None, alias='_id') @validator('url', 'file') def cast_to_str(cls, v): @@ -36,3 +37,22 @@ def cast_to_str(cls, v): def check_valid(cls, values): assert values['url'] or values['file'] return values + + +class ApiImportJob(BaseModel): + state: JobStatus + api_key: str + dict_id: Optional[ObjectId] + url: Optional[Url] # type: ignore + remote_dict_id: str + remote_api_key: Optional[str] + id: Optional[ObjectId] = Field(None, alias='_id') + + @validator('url') + def cast_to_str(cls, v): + return str(v) if v else None + + @root_validator + def check_valid(cls, values): + assert values['url'] + return values diff --git a/app/importing/ops.py b/app/importing/ops.py index faff9ee..7d8e2ca 100644 --- a/app/importing/ops.py +++ b/app/importing/ops.py @@ -1,15 +1,20 @@ import logging import os +import time import traceback from collections import defaultdict from datetime import datetime from pathlib import Path +from tempfile import NamedTemporaryFile from typing import Dict, Tuple +from urllib.parse import urljoin import httpx +import orjson from bson import ObjectId -from .models import ImportJob, JobStatus +from .models import ApiImportJob, FileImportJob, JobStatus +from ..models import Dictionary, RdfFormats, ReleasePolicy from ..settings import settings from ..db import get_db_sync, reset_db_client, safe_path from ..rdf import file_to_obj @@ -21,21 +26,21 @@ def _get_upload_filename(username, filename) -> str: f'{now}-{safe_path(username)}-{safe_path(filename)}') -def _process_one_dict(job_id: str): +def _process_one_file(job_id: str): job_id = ObjectId(job_id) log = logging.getLogger(__name__) - log.debug('Start import job %s', job_id) + log.info('Start file import job %s', job_id) reset_db_client() with get_db_sync() as db: job = db.import_jobs.find_one({'_id': job_id}) - job = ImportJob(**job) + job = FileImportJob(**job) assert job.state == JobStatus.SCHEDULED filename = job.file try: # Download if job.url and not filename: log.debug('Download %s from %r', job_id, job.url) - filename = _get_upload_filename(job.meta.api_key, job.url) + filename = _get_upload_filename(job.api_key, job.url) with httpx.stream("GET", job.url) as response: num_bytes_expected = int(response.headers["Content-Length"]) with open(filename, 'wb') as fd: @@ -49,45 +54,11 @@ def _process_one_dict(job_id: str): log.debug('Parse %s from %r', job_id, filename) obj = file_to_obj(filename, job.meta.sourceLanguage) - # Transfer properties - obj['_id'] = job_id - obj['import_time'] = str(datetime.now()) - # We add job.meta properrties on base object, which in + # We add job.meta properties on base object, which in # router /about get overriden by meta from file obj.update(job.meta.dict(exclude_none=True, exclude_unset=True)) - # Check if our dict should replace entries from other dict_id - dict_id = job.dict_id or job_id - if job.dict_id is not None: - log.debug('Job %s replaces dict %s', job_id, dict_id) - obj['_id'] = dict_id - - old_obj = db.dicts.find_one({'api_key': job.meta.api_key, - '_id': dict_id}, - {'_id': True}) - if old_obj is None: - raise Exception('E403, forbidden') - - # Transfer entry ids from old dict - obj = _transfer_ids(obj, dict_id, db) - - # Extract entries separately, assign them dict id - entries = obj.pop('entries') - assert entries, 'No entries in dictionary' - obj['n_entries'] = len(entries) - for entry in entries: - entry['_dict_id'] = dict_id - - log.debug('Insert %s with %d entries', dict_id, len(entries)) - # Remove previous dict/entries - db.entry.delete_many({'_dict_id': dict_id}) - db.dicts.delete_one({'_id': dict_id}) - - # Insert dict, entries - result = db.entry.insert_many(entries) - obj['_entries'] = result.inserted_ids # Inverse of _dict_id - result = db.dicts.insert_one(obj) - assert result.inserted_id == dict_id + _create_or_update_dict(db, obj, job, log, job.dict_id) # Mark job done db.import_jobs.update_one( @@ -105,6 +76,103 @@ def _process_one_dict(job_id: str): os.remove(filename) +def _process_one_api(job_id: str): # noqa: C901 + + def _response_to_entry_obj(fmt: RdfFormats, response: httpx.Response): + if fmt == RdfFormats.JSON: + obj = response.json() + obj.pop('@context', None) + text = '''\ + { + "dummy": { + "meta": { + "release": "PRIVATE", + "sourceLanguage": "xx" + }, + "entries": [ %s ] + } + }''' % orjson.dumps(obj).decode() + elif fmt == RdfFormats.ONTOLEX: + text = response.text # Already valid input + elif fmt == RdfFormats.TEI: + text = f'''\ + + + + {response.text} + ''' + else: + assert False, fmt + + with NamedTemporaryFile(mode='w', encoding='utf-8', delete=False) as fd: + fd.write(text) + filename = fd.name + try: + dict_obj = file_to_obj(filename) + finally: + os.remove(filename) + entry_obj = dict_obj['entries'][0] + return entry_obj + + def get_one_entry(origin_entry_obj): + time.sleep(.05) # Poor-man's rate limit. 20 rq/s will d/l 64k dict in ~an hour. + origin_entry_id = origin_entry_obj['id'] + fmt = sorted(origin_entry_obj['formats'], key=RdfFormats.sort_key())[0] + response = client.get(urljoin(endpoint, + f'{fmt}/{origin_dict_id}/{origin_entry_id}')) + entry_obj = _response_to_entry_obj(fmt, response) + entry_obj['_origin_id'] = origin_entry_id + return entry_obj + + job_id = ObjectId(job_id) + log = logging.getLogger(__name__) + log.info('Start API import job %s', job_id) + reset_db_client() + with get_db_sync() as db: + job = db.import_jobs.find_one({'_id': job_id}) + job = ApiImportJob(**job) + assert job.state == JobStatus.SCHEDULED + endpoint = job.url + origin_dict_id = job.remote_dict_id + + headers = {'X-API-Key': job.remote_api_key} if job.remote_api_key else None + with httpx.Client(headers=headers, timeout=10) as client: + log.debug('Import job %s, dict %r from %r', job_id, origin_dict_id, endpoint) + entry = None + try: + response = client.get(urljoin(endpoint, f'about/{origin_dict_id}')) + dict_obj = { + 'meta': Dictionary(**response.json()).dict(exclude_none=True, + exclude_unset=True), + '_origin_id': origin_dict_id, + '_origin_endpoint': endpoint, + '_origin_api_key': job.remote_api_key, + } + + response = client.get(urljoin(endpoint, f'list/{origin_dict_id}')) + entries_list = response.json() + # THIS, is absolutely not how it was supposed to be done + entries = [] + for entry in entries_list: + if entry.get('release', ReleasePolicy.PUBLIC) == ReleasePolicy.PUBLIC: + entries.append(get_one_entry(entry)) + + dict_obj['entries'] = entries + + _create_or_update_dict(db, dict_obj, job, log, job.dict_id or None) + + # Mark job done + db.import_jobs.update_one( + {'_id': job_id}, {'$set': {'state': JobStatus.DONE}}) + log.debug('Done job %s', job_id) + except Exception: + log.error('Job %s failed on entry %s', job_id, entry) + log.exception('Error processing job %s', job_id) + db.import_jobs.update_one( + {'_id': job_id}, {'$set': {'state': JobStatus.ERROR, + 'error': traceback.format_exc()}}) + + def _transfer_ids(new_obj, old_dict_id, db): def entry_to_key(entry): key = ( @@ -129,3 +197,43 @@ def entry_to_key(entry): if id is not None: entry['_id'] = id return new_obj + + +def _create_or_update_dict(db, dict_obj, job, log, override_dict_id): + # Transfer properties + dict_obj['_id'] = job.id + dict_obj['api_key'] = job.api_key + dict_obj['_import_time'] = str(datetime.now()) + + # Check if our dict should replace entries from other dict_id + dict_id = override_dict_id or job.id + if override_dict_id is not None: + log.debug('Job %s replaces dict %s', job.id, dict_id) + dict_obj['_id'] = dict_id + + old_obj = db.dicts.find_one({'api_key': job.api_key, + '_id': override_dict_id}, + {'_id': True}) + if old_obj is None: + raise Exception('E403, forbidden') + + # Transfer entry ids from old dict + dict_obj = _transfer_ids(dict_obj, override_dict_id, db) + + # Extract entries separately, assign them dict id + entries = dict_obj.pop('entries') + assert entries, 'No entries in dictionary' + dict_obj['n_entries'] = len(entries) + for entry in entries: + entry['_dict_id'] = dict_id + + log.info('Insert %s (api_key: %s) with %d entries', dict_id, job.api_key, len(entries)) + # Remove previous dict/entries + db.entry.delete_many({'_dict_id': dict_id}) + db.dicts.delete_one({'_id': dict_id}) + + # Insert dict, entries + result = db.entry.insert_many(entries) + dict_obj['_entries'] = result.inserted_ids # Inverse of _dict_id + result = db.dicts.insert_one(dict_obj) + assert result.inserted_id == dict_id diff --git a/app/importing/router.py b/app/importing/router.py index 2db9496..627e314 100644 --- a/app/importing/router.py +++ b/app/importing/router.py @@ -9,8 +9,8 @@ from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile from fastapi.responses import PlainTextResponse -from .ops import _get_upload_filename, _process_one_dict -from .models import ImportJob, JobStatus, Url +from .ops import _get_upload_filename, _process_one_api, _process_one_file +from .models import ApiImportJob, FileImportJob, JobStatus, Url from ..db import _DbType, get_db from ..models import Genre, Language, ReleasePolicy from ..settings import settings @@ -20,7 +20,8 @@ router = APIRouter() -_import_queue: SimpleQueue = SimpleQueue() +_file_import_queue: SimpleQueue = SimpleQueue() +_api_import_queue: SimpleQueue = SimpleQueue() @router.post('/import', @@ -28,9 +29,9 @@ response_model=str, response_class=PlainTextResponse, summary='Import a new dictionary.', - description='Import a new dictionary by direct file upload ' - 'or an URL from where the dictionary can be fetched.') -async def dict_import( + description='Import a new dictionary by direct file upload or ' + 'an URL where the dictionary file can be fetched from.') +async def file_url_import( db: _DbType = Depends(get_db), # TODO secure it url: Optional[Url] = Query( None, @@ -73,23 +74,67 @@ async def dict_import( with open(upload_path, 'wb') as fd: shutil.copyfileobj(file.file, fd, 10_000_000) try: - job = ImportJob( + job = FileImportJob( url=url, file=upload_path, dict_id=dictionary and ObjectId(dictionary), state=JobStatus.SCHEDULED, + api_key=api_key, meta=dict( release=release, sourceLanguage=sourceLanguage, genre=genre, - api_key=api_key, )) except Exception as e: log.exception('Invalid request params: %s', e) raise HTTPException(status_code=HTTPStatus.BAD_REQUEST, detail=str(e)) result = await db.import_jobs.insert_one(job.dict()) id = str(result.inserted_id) - _import_queue.put(id) + _file_import_queue.put(id) + return id + + +@router.post('/api_import', + status_code=HTTPStatus.CREATED, + response_model=str, + response_class=PlainTextResponse, + summary='Import a new dictionary via Elexis REST API.', + description='Import a dictionary from another Elexis REST API endpoint.') +async def api_import( + db: _DbType = Depends(get_db), + url: Url = Query( + None, + description='URL of the Elexis API endpoint.', + ), + remote_dictionary: str = Query( + ..., + description='Id of dictionary to replace.', + ), + local_dictionary: Optional[str] = Query( + None, + description='Id of dictionary to replace.', + regex='^[a-z0-f]{24}$', + ), + remote_api_key: Optional[str] = Query( + None, description='API key to access the remote dictionary.'), + local_api_key: str = Query( + ..., description='API key of the local user.'), +): + try: + job = ApiImportJob( + url=url, + remote_dict_id=remote_dictionary, + dict_id=local_dictionary and ObjectId(local_dictionary), + remote_api_key=remote_api_key, + api_key=local_api_key, + state=JobStatus.SCHEDULED, + ) + except Exception as e: + log.exception('Invalid request params: %s', e) + raise HTTPException(status_code=HTTPStatus.BAD_REQUEST, detail=str(e)) + result = await db.import_jobs.insert_one(job.dict()) + id = str(result.inserted_id) + _api_import_queue.put(id) return id @@ -100,8 +145,13 @@ def ensure_upload_dir(): @router.on_event('startup') def prepare_import_queue_and_start_workers(): - Task(target=_process_one_dict, - queue=_import_queue, + Task(target=_process_one_file, + queue=_file_import_queue, n_workers=settings.UPLOAD_N_WORKERS, - name='dict_import', + name='file_import', timeout=settings.UPLOAD_TIMEOUT_SECONDS).start() + Task(target=_process_one_api, + queue=_api_import_queue, + n_workers=settings.API_IMPORT_N_WORKERS, + name='api_import', + timeout=12*settings.API_IMPORT_TIMEOUT_SECONDS).start() diff --git a/app/linking/ops.py b/app/linking/ops.py index eb95cf5..03a5a1c 100644 --- a/app/linking/ops.py +++ b/app/linking/ops.py @@ -15,9 +15,11 @@ from .models import ( LinkingJob, LinkingJobPrivate, LinkingJobStatus, LinkingOneResult, - LinkingSource, LinkingStatus, SenseLink, + LinkingStatus, SenseLink, ) -from ..rdf import add_entry_sense_ids, export_for_naisc, file_to_obj, removeprefix +from ..importing.models import ApiImportJob, JobStatus +from ..importing.ops import _process_one_api +from ..rdf import add_entry_sense_ids, export_for_naisc, removeprefix from ..settings import settings from ..db import get_db_sync, reset_db_client @@ -64,7 +66,38 @@ def _local_endpoint(): return urljoin(settings.SITEURL, router.prefix) -def process_linking_job(id: str): # noqa: C901 +def _import_from_api(endpoint, remote_api_key, local_api_key, dict_id, entries): + with get_db_sync() as db: + job = ApiImportJob( + url=endpoint, + remote_dict_id=dict_id, + remote_api_key=remote_api_key, + api_key=local_api_key, + state=JobStatus.SCHEDULED, + ) + result = db.import_jobs.insert_one(job.dict()) + id = str(result.inserted_id) + _process_one_api(id) + + while db.import_jobs.find_one({'_id': result.inserted_id, + 'state': JobStatus.SCHEDULED}) is not None: + time.sleep(3) + job_dict = db.import_jobs.find_one({'_id': result.inserted_id}) + job = ApiImportJob(**job_dict) + assert job.state == JobStatus.DONE, job + + dict_id = str(job.id) + entry_origin_ids = list(db.entry.find( + {'_dict_id': ObjectId(dict_id), '_origin_id': {'$exists': True}}, + {'_origin_id': True} + )) + to_our_id = {i['_origin_id']: str(i['_id']) + for i in entry_origin_ids} + our_entries = [to_our_id[i] for i in (entries or [])] + return dict_id, our_entries + + +def process_linking_job(job_id: str): # noqa: C901 reset_db_client() remote_task_id = None service_url = None @@ -72,11 +105,12 @@ def process_linking_job(id: str): # noqa: C901 origin_result = None job = None new_status = LinkingStatus(state=LinkingJobStatus.COMPLETED, message='') + log.info('Start linking job %s', job_id) try: # Get job handle while True: # Maybe retry if db not yet synced with get_db_sync() as db: - job = db.linking_jobs.find_one({'_id': ObjectId(id)}) + job = db.linking_jobs.find_one({'_id': ObjectId(job_id)}) if job: break job = LinkingJobPrivate(**job, id=job['_id']) @@ -98,17 +132,27 @@ def process_linking_job(id: str): # noqa: C901 # Fetch linked entries to obtain a local copy origin_source_dict_id = None origin_target_dict_id = None + + def ensure_local_source(source): + DEFAULT_API_KEY = 'linking' + local_api_key = source.apiKey or DEFAULT_API_KEY + source.id, entries = _import_from_api(source.endpoint, + source.apiKey, + local_api_key, + source.id, + source.entries) + if entries: + source.entries = entries + if job.source.endpoint: origin_source_dict_id = job.source.id - job.source = _get_entries(job.source) - else: - job.source.endpoint = _local_endpoint() + ensure_local_source(job.source) + job.source.endpoint = _local_endpoint() if not is_babelnet: if job.target.endpoint: origin_target_dict_id = job.target.id - job.target = _get_entries(job.target) - else: - job.target.endpoint = _local_endpoint() + ensure_local_source(job.target) + job.target.endpoint = _local_endpoint() assert job.source.endpoint.startswith(_local_endpoint()) assert is_babelnet or job.target.endpoint.startswith(_local_endpoint()) @@ -156,18 +200,19 @@ def process_linking_job(id: str): # noqa: C901 res[results_key] = to_origin_id[res[results_key]] except Exception: - log.exception('Unexpected error for linking task %s: %s', id, job) + log.exception('Unexpected error for linking task %s: %s', job_id, job) new_status = LinkingStatus(state=LinkingJobStatus.FAILED, message=traceback.format_exc()) + our_result = [] finally: assert our_result is not None n_links = sum(len(i['linking']) for i in our_result) log.debug('Linking job %s finished: total %d links ' 'between %d pairs of entries found', - str(id), n_links, len(our_result)) + str(job_id), n_links, len(our_result)) with get_db_sync() as db: db.linking_jobs.update_one( - {'_id': ObjectId(id)}, + {'_id': ObjectId(job_id)}, {'$set': dict(new_status, our_result=our_result, origin_result=origin_result, @@ -175,103 +220,6 @@ def process_linking_job(id: str): # noqa: C901 remote_task_id=remote_task_id)}) -def _get_entries(source: LinkingSource) -> LinkingSource: # noqa: C901 - def response_to_entry_obj(fmt: str, response: httpx.Response): - if fmt == 'json': - text = f'''\ - {{"{our_dict_id}": {{ - "meta": {{ - "release": "PRIVATE", - "sourceLanguage": "xx" - }}, - "entries": [ - {response.text} - ] - }} }}''' - elif fmt == 'ontolex': - text = response.text # Already valid input - elif fmt == 'tei': - text = f'''\ - - - - {response.text} - ''' - else: - assert False, f'invalid fmt= {fmt!r}' - - with NamedTemporaryFile(mode='w', encoding='utf-8', delete=False) as fd: - fd.write(text) - filename = fd.name - try: - dict_obj = file_to_obj(filename) - finally: - os.remove(filename) - entry_obj = dict_obj['entries'][0] - return entry_obj - - def get_one_entry(origin_entry_id): - result = db.entry.find_one({'_origin_id': origin_entry_id, - '_dict_id': our_dict_id}) - if result: - return str(result['_id']) - - for fmt in ('json', 'ontolex', 'tei'): - response = client.get(urljoin(endpoint, - f'{fmt}/{origin_dict_id}/{origin_entry_id}')) - if not response.is_error: - break - else: - raise ValueError('No suitable format for entry ' - f'{origin_dict_id}/{origin_entry_id}') - - entry_obj = response_to_entry_obj(fmt, response) - entry_obj['_dict_id'] = our_dict_id - entry_obj['_origin_id'] = origin_entry_id - - result = db.entry.insert_one(entry_obj) - our_entry_id = str(result.inserted_id) - db.dicts.update_one({'_id': our_dict_id}, - {'$push': {'_entries': our_entry_id}}) - return our_entry_id - - headers = {'X-API-Key': source.apiKey} if source.apiKey else {} - assert source.endpoint - endpoint = source.endpoint - with get_db_sync() as db, \ - httpx.Client(headers=headers) as client: - - origin_dict_id = source.id - result = db.dicts.find_one( - {'_origin_id': origin_dict_id, - '_origin_endpoint': endpoint}, {'_id': True}) - if result: - our_dict_id = result['_id'] - else: - response = client.get(urljoin(endpoint, f'about/{origin_dict_id}')) - dict_obj = response.json() - dict_obj['api_key'] = source.apiKey - dict_obj['_origin_id'] = origin_dict_id - dict_obj['_origin_endpoint'] = endpoint - our_dict_id = db.dicts.insert_one(dict_obj).inserted_id - - origin_entry_ids = source.entries - if not origin_entry_ids: - response = client.get(urljoin(endpoint, f'list/{origin_dict_id}')) - origin_entry_ids = [i['id'] for i in response.json()] - - # THIS, is absolutely not how it was supposed to be done - our_entry_ids = [get_one_entry(i) for i in origin_entry_ids] - - new_source = LinkingSource( - id=str(our_dict_id), - endpoint=_local_endpoint(), - apiKey=source.apiKey, - # Don't request explicit entries if none were passed to us - entries=our_entry_ids if source.entries else None) - return new_source - - def _linking_from_naisc_executable(job) -> List[dict]: assert settings.LINKING_NAISC_EXECUTABLE assert job.source.endpoint.startswith(_local_endpoint()) diff --git a/app/models.py b/app/models.py index 7eb3d60..7bab469 100644 --- a/app/models.py +++ b/app/models.py @@ -6,7 +6,7 @@ import orjson from pydantic import ( BaseModel as _BaseModel, - Field, HttpUrl, conlist, constr, root_validator, + Field, HttpUrl, conlist, constr, root_validator, validator, ) @@ -43,6 +43,10 @@ class RdfFormats(_AutoStrEnum): JSON = 'json' ONTOLEX = 'ontolex' + @classmethod + def sort_key(cls): + return [cls.JSON, cls.TEI, cls.ONTOLEX].index + class ReleasePolicy(_AutoStrEnum): PUBLIC, NONCOMMERCIAL, RESEARCH, PRIVATE = _AutoStrEnum._auto(4) @@ -103,6 +107,12 @@ class Dictionary(BaseModel): creator: Optional[Union[List, str]] publisher: Optional[Union[List, str]] + @validator('genre', 'targetLanguage', pre=True) + def ensure_list(cls, v): + if isinstance(v, str): + return [v] + return v + class Lemma(BaseModel): lemma: str @@ -147,6 +157,7 @@ class Entry(BaseModel): type: LexicalEntry = Field(alias='@type') id: Optional[str] = Field(alias='@id') + lemma: Optional[str] canonicalForm: _CanonicalForm partOfSpeech: PartOfSpeech senses: conlist(_Sense, min_items=0) # type: ignore diff --git a/app/rdf.py b/app/rdf.py index 8733460..78a363e 100644 --- a/app/rdf.py +++ b/app/rdf.py @@ -104,6 +104,9 @@ def _from_json(filename): if isinstance(sense['definition'], str): sense['definition'] = {lang: sense['definition']} + if 'lemma' not in entry: + entry['lemma'] = entry['canonicalForm']['writtenRep'][lang][0] + obj = JsonDictionary(**obj).dict(exclude_none=True, exclude_unset=True) return obj diff --git a/app/settings.py b/app/settings.py index f6ff3f4..17a2939 100644 --- a/app/settings.py +++ b/app/settings.py @@ -31,6 +31,9 @@ class _Settings(BaseSettings): UPLOAD_REMOVE_ON_SUCCESS: bool = True UPLOAD_REMOVE_ON_FAILURE: bool = True + API_IMPORT_N_WORKERS: int = 2 + API_IMPORT_TIMEOUT_SECONDS: float = 60 * 60 * 3 + LOGGING_CONFIG_FILE: FilePath = str(_APP_PATH / 'logging.dictConfig.json') # type: ignore LOG_LEVEL: Optional[str] = None LOG_FILE: Optional[FilePath] = None diff --git a/tests/test_linking.py b/tests/test_linking.py index 7b8fdb4..107c0c0 100644 --- a/tests/test_linking.py +++ b/tests/test_linking.py @@ -24,7 +24,8 @@ async def forwarder(client, example_id, example_entry_ids): .respond_with_json((await client.get(f'/list/{example_id}')).json()) json_responses = [Response((await client.get(f'/json/{example_id}/{id}')).content) for id in example_entry_ids] - json_responses_iter = iter(json_responses) + # We expect two sequential fetches: for mock source and target endpoint + json_responses_iter = iter(2 * json_responses) httpserver \ .expect_request(re.compile(rf'/json/{example_id}/[0-9a-f]+')) \ .respond_with_handler(lambda _: next(json_responses_iter)) @@ -120,9 +121,12 @@ async def _test(client, example_id, monkeypatch, httpserver, endpoint, linking_r assert response.json()['state'] == 'PROCESSING', response.json() # ... but by now it did. - time.sleep(.5) - response = await client.post('/linking/status', content=task_id) - assert not response.is_error, response.json() + for i in range(5): + time.sleep(1) + response = await client.post('/linking/status', content=task_id) + assert not response.is_error, response.json() + if response.json()['state'] != 'PROCESSING': + break assert response.json()['state'] == 'COMPLETED', response.json() response = await client.post('/linking/result', content=task_id)