Skip to content

Commit

Permalink
Alternative import method via Elexis REST API
Browse files Browse the repository at this point in the history
  • Loading branch information
kernc committed Jul 27, 2021
1 parent 99c8687 commit 176151b
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 173 deletions.
30 changes: 25 additions & 5 deletions app/importing/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

from bson import ObjectId
from pydantic import AnyHttpUrl, FilePath, HttpUrl, root_validator, validator
from pydantic import AnyHttpUrl, Field, FilePath, HttpUrl, root_validator, validator

from app.models import BaseModel, Genre, Language, ReleasePolicy, _AutoStrEnum

Expand All @@ -18,15 +18,16 @@ class _ImportMeta(BaseModel):
release: ReleasePolicy
sourceLanguage: Optional[Language]
genre: Optional[List[Genre]]
api_key: str


class ImportJob(BaseModel):
class FileImportJob(BaseModel):
state: JobStatus
api_key: str
dict_id: Optional[ObjectId]
url: Optional[Url] # type: ignore
file: Optional[FilePath]
state: JobStatus
meta: _ImportMeta
dict_id: Optional[ObjectId]
id: Optional[ObjectId] = Field(None, alias='_id')

@validator('url', 'file')
def cast_to_str(cls, v):
Expand All @@ -36,3 +37,22 @@ def cast_to_str(cls, v):
def check_valid(cls, values):
assert values['url'] or values['file']
return values


class ApiImportJob(BaseModel):
state: JobStatus
api_key: str
dict_id: Optional[ObjectId]
url: Optional[Url] # type: ignore
remote_dict_id: str
remote_api_key: Optional[str]
id: Optional[ObjectId] = Field(None, alias='_id')

@validator('url')
def cast_to_str(cls, v):
return str(v) if v else None

@root_validator
def check_valid(cls, values):
assert values['url']
return values
190 changes: 149 additions & 41 deletions app/importing/ops.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import logging
import os
import time
import traceback
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, Tuple
from urllib.parse import urljoin

import httpx
import orjson
from bson import ObjectId

from .models import ImportJob, JobStatus
from .models import ApiImportJob, FileImportJob, JobStatus
from ..models import Dictionary, RdfFormats, ReleasePolicy
from ..settings import settings
from ..db import get_db_sync, reset_db_client, safe_path
from ..rdf import file_to_obj
Expand All @@ -21,21 +26,21 @@ def _get_upload_filename(username, filename) -> str:
f'{now}-{safe_path(username)}-{safe_path(filename)}')


def _process_one_dict(job_id: str):
def _process_one_file(job_id: str):
job_id = ObjectId(job_id)
log = logging.getLogger(__name__)
log.debug('Start import job %s', job_id)
log.info('Start file import job %s', job_id)
reset_db_client()
with get_db_sync() as db:
job = db.import_jobs.find_one({'_id': job_id})
job = ImportJob(**job)
job = FileImportJob(**job)
assert job.state == JobStatus.SCHEDULED
filename = job.file
try:
# Download
if job.url and not filename:
log.debug('Download %s from %r', job_id, job.url)
filename = _get_upload_filename(job.meta.api_key, job.url)
filename = _get_upload_filename(job.api_key, job.url)
with httpx.stream("GET", job.url) as response:
num_bytes_expected = int(response.headers["Content-Length"])
with open(filename, 'wb') as fd:
Expand All @@ -49,45 +54,11 @@ def _process_one_dict(job_id: str):
log.debug('Parse %s from %r', job_id, filename)
obj = file_to_obj(filename, job.meta.sourceLanguage)

# Transfer properties
obj['_id'] = job_id
obj['import_time'] = str(datetime.now())
# We add job.meta properrties on base object, which in
# We add job.meta properties on base object, which in
# router /about get overriden by meta from file
obj.update(job.meta.dict(exclude_none=True, exclude_unset=True))

# Check if our dict should replace entries from other dict_id
dict_id = job.dict_id or job_id
if job.dict_id is not None:
log.debug('Job %s replaces dict %s', job_id, dict_id)
obj['_id'] = dict_id

old_obj = db.dicts.find_one({'api_key': job.meta.api_key,
'_id': dict_id},
{'_id': True})
if old_obj is None:
raise Exception('E403, forbidden')

# Transfer entry ids from old dict
obj = _transfer_ids(obj, dict_id, db)

# Extract entries separately, assign them dict id
entries = obj.pop('entries')
assert entries, 'No entries in dictionary'
obj['n_entries'] = len(entries)
for entry in entries:
entry['_dict_id'] = dict_id

log.debug('Insert %s with %d entries', dict_id, len(entries))
# Remove previous dict/entries
db.entry.delete_many({'_dict_id': dict_id})
db.dicts.delete_one({'_id': dict_id})

# Insert dict, entries
result = db.entry.insert_many(entries)
obj['_entries'] = result.inserted_ids # Inverse of _dict_id
result = db.dicts.insert_one(obj)
assert result.inserted_id == dict_id
_create_or_update_dict(db, obj, job, log, job.dict_id)

# Mark job done
db.import_jobs.update_one(
Expand All @@ -105,6 +76,103 @@ def _process_one_dict(job_id: str):
os.remove(filename)


def _process_one_api(job_id: str): # noqa: C901

def _response_to_entry_obj(fmt: RdfFormats, response: httpx.Response):
if fmt == RdfFormats.JSON:
obj = response.json()
obj.pop('@context', None)
text = '''\
{
"dummy": {
"meta": {
"release": "PRIVATE",
"sourceLanguage": "xx"
},
"entries": [ %s ]
}
}''' % orjson.dumps(obj).decode()
elif fmt == RdfFormats.ONTOLEX:
text = response.text # Already valid input
elif fmt == RdfFormats.TEI:
text = f'''\
<TEI xmlns="http://www.tei-c.org/ns/1.0">
<teiHeader></teiHeader>
<text><body>
{response.text}
</body></text></TEI>'''
else:
assert False, fmt

with NamedTemporaryFile(mode='w', encoding='utf-8', delete=False) as fd:
fd.write(text)
filename = fd.name
try:
dict_obj = file_to_obj(filename)
finally:
os.remove(filename)
entry_obj = dict_obj['entries'][0]
return entry_obj

def get_one_entry(origin_entry_obj):
time.sleep(.05) # Poor-man's rate limit. 20 rq/s will d/l 64k dict in ~an hour.
origin_entry_id = origin_entry_obj['id']
fmt = sorted(origin_entry_obj['formats'], key=RdfFormats.sort_key())[0]
response = client.get(urljoin(endpoint,
f'{fmt}/{origin_dict_id}/{origin_entry_id}'))
entry_obj = _response_to_entry_obj(fmt, response)
entry_obj['_origin_id'] = origin_entry_id
return entry_obj

job_id = ObjectId(job_id)
log = logging.getLogger(__name__)
log.info('Start API import job %s', job_id)
reset_db_client()
with get_db_sync() as db:
job = db.import_jobs.find_one({'_id': job_id})
job = ApiImportJob(**job)
assert job.state == JobStatus.SCHEDULED
endpoint = job.url
origin_dict_id = job.remote_dict_id

headers = {'X-API-Key': job.remote_api_key} if job.remote_api_key else None
with httpx.Client(headers=headers, timeout=10) as client:
log.debug('Import job %s, dict %r from %r', job_id, origin_dict_id, endpoint)
entry = None
try:
response = client.get(urljoin(endpoint, f'about/{origin_dict_id}'))
dict_obj = {
'meta': Dictionary(**response.json()).dict(exclude_none=True,
exclude_unset=True),
'_origin_id': origin_dict_id,
'_origin_endpoint': endpoint,
'_origin_api_key': job.remote_api_key,
}

response = client.get(urljoin(endpoint, f'list/{origin_dict_id}'))
entries_list = response.json()
# THIS, is absolutely not how it was supposed to be done
entries = []
for entry in entries_list:
if entry.get('release', ReleasePolicy.PUBLIC) == ReleasePolicy.PUBLIC:
entries.append(get_one_entry(entry))

dict_obj['entries'] = entries

_create_or_update_dict(db, dict_obj, job, log, job.dict_id or None)

# Mark job done
db.import_jobs.update_one(
{'_id': job_id}, {'$set': {'state': JobStatus.DONE}})
log.debug('Done job %s', job_id)
except Exception:
log.error('Job %s failed on entry %s', job_id, entry)
log.exception('Error processing job %s', job_id)
db.import_jobs.update_one(
{'_id': job_id}, {'$set': {'state': JobStatus.ERROR,
'error': traceback.format_exc()}})


def _transfer_ids(new_obj, old_dict_id, db):
def entry_to_key(entry):
key = (
Expand All @@ -129,3 +197,43 @@ def entry_to_key(entry):
if id is not None:
entry['_id'] = id
return new_obj


def _create_or_update_dict(db, dict_obj, job, log, override_dict_id):
# Transfer properties
dict_obj['_id'] = job.id
dict_obj['api_key'] = job.api_key
dict_obj['_import_time'] = str(datetime.now())

# Check if our dict should replace entries from other dict_id
dict_id = override_dict_id or job.id
if override_dict_id is not None:
log.debug('Job %s replaces dict %s', job.id, dict_id)
dict_obj['_id'] = dict_id

old_obj = db.dicts.find_one({'api_key': job.api_key,
'_id': override_dict_id},
{'_id': True})
if old_obj is None:
raise Exception('E403, forbidden')

# Transfer entry ids from old dict
dict_obj = _transfer_ids(dict_obj, override_dict_id, db)

# Extract entries separately, assign them dict id
entries = dict_obj.pop('entries')
assert entries, 'No entries in dictionary'
dict_obj['n_entries'] = len(entries)
for entry in entries:
entry['_dict_id'] = dict_id

log.info('Insert %s (api_key: %s) with %d entries', dict_id, job.api_key, len(entries))
# Remove previous dict/entries
db.entry.delete_many({'_dict_id': dict_id})
db.dicts.delete_one({'_id': dict_id})

# Insert dict, entries
result = db.entry.insert_many(entries)
dict_obj['_entries'] = result.inserted_ids # Inverse of _dict_id
result = db.dicts.insert_one(dict_obj)
assert result.inserted_id == dict_id
Loading

0 comments on commit 176151b

Please sign in to comment.