Skip to content

Commit

Permalink
feat: cache submissions in s3 (#1035)
Browse files Browse the repository at this point in the history
* api to update submission in cache in s3

* feat: api to download submission from cache

* transition from POST to GET for download submission from cache endpoint

* added submissions in a zip file as a sozip file in s3

* added sozipfile module

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* feat: postgis func to convert geojson to flatgeobuf

* docs: extra comments for geosjon_to_flatgeobuf

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* build: recrease pdm.lock with sozipfile

* fix: replace all instances of zipfile with sozipfile

* added metadata json file alongside submission.json in s3 bucket

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: rename submissions crud functions for clarity

* refactor: simplify update_submission_in_s3

* refactor: reorder submission logic to check for last_submission

* check the last submission date from submission in s3 and compare with submission from odk

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix:submission path

* update submission download logic with passing task_id as a parameter

* remove: download submission from cache endpoint

* updated parameter task_id to background_task_id

* updated endpoint name

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* updated get data from metadata json

* metadata json added outside of zipfile in s3 bucket

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: reuse metadata_s3_path variable in submission upload

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: spwoodcock <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2023
1 parent 77ad259 commit c66f04c
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 82 deletions.
36 changes: 36 additions & 0 deletions src/backend/app/db/postgis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

from geoalchemy2 import Geometry
from geoalchemy2.shape import to_shape
from geojson import FeatureCollection
from geojson_pydantic import Feature
from shapely.geometry import mapping
from sqlalchemy import text
from sqlalchemy.orm import Session


def timestamp():
Expand Down Expand Up @@ -61,3 +64,36 @@ def get_centroid(geometry: Geometry, properties: str = {}, id: int = None):
}
return Feature(**geojson)
return {}


def geojson_to_flatgeobuf(db: Session, geojson: FeatureCollection):
"""From a given FeatureCollection, return a memory flatgeobuf obj."""
sql = f"""
DROP TABLE IF EXISTS public.temp_features CASCADE;
CREATE TABLE IF NOT EXISTS public.temp_features(
id serial PRIMARY KEY,
geom geometry
);
WITH data AS (SELECT '{geojson}'::json AS fc)
INSERT INTO public.temp_features (geom)
SELECT
ST_AsText(ST_GeomFromGeoJSON(feat->>'geometry')) AS geom
FROM (
SELECT json_array_elements(fc->'features') AS feat
FROM data
) AS f;
WITH thegeom AS
(SELECT * FROM public.temp_features)
SELECT ST_AsFlatGeobuf(thegeom.*)
FROM thegeom;
"""
# Run the SQL
result = db.execute(text(sql))
# Get a memoryview object, then extract to Bytes
flatgeobuf = result.fetchone()[0].tobytes()
# Cleanup table
db.execute(text("DROP TABLE IF EXISTS public.temp_features CASCADE;"))
return flatgeobuf
5 changes: 2 additions & 3 deletions src/backend/app/projects/project_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
import os
import time
import uuid
import zipfile
from asyncio import gather
from concurrent.futures import ThreadPoolExecutor, wait
from io import BytesIO
from typing import List, Optional, Union
from zipfile import ZipFile

import geoalchemy2
import geojson
import pkg_resources
import requests
import segno
import shapely.wkb as wkblib
import sozipfile.sozipfile as zipfile
import sqlalchemy
from asgiref.sync import async_to_sync
from fastapi import File, HTTPException, UploadFile
Expand Down Expand Up @@ -764,7 +763,7 @@ async def update_project_with_zip(
detail=f"File must be a zip. Uploaded file was {uploaded_zip.content_type}",
)

with ZipFile(io.BytesIO(uploaded_zip.file.read()), "r") as zip:
with zipfile.ZipFile(io.BytesIO(uploaded_zip.file.read()), "r") as zip:
# verify valid zip file
bad_file = zip.testzip()
if bad_file:
Expand Down
126 changes: 116 additions & 10 deletions src/backend/app/submission/submission_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@
import json
import os
import threading
import zipfile
import uuid
from asyncio import gather
from datetime import datetime
from io import BytesIO
from pathlib import Path

import sozipfile.sozipfile as zipfile
from asgiref.sync import async_to_sync
from fastapi import HTTPException, Response
from fastapi.responses import FileResponse
from loguru import logger as log
from osm_fieldwork.json2osm import JsonDump
from sqlalchemy.orm import Session

from ..central.central_crud import get_odk_form, get_odk_project
from ..projects import project_crud, project_schemas
from app.central.central_crud import get_odk_form, get_odk_project, list_odk_xforms
from app.config import settings
from app.projects import project_crud, project_schemas
from app.s3 import add_obj_to_bucket, get_obj_from_bucket
from app.tasks import tasks_crud


def get_submission_of_project(db: Session, project_id: int, task_id: int = None):
Expand Down Expand Up @@ -300,7 +306,7 @@ def convert_to_osm(db: Session, project_id: int, task_id: int):
submission = xform.getSubmissions(odkid, task_id, None, False, True)
submission = (json.loads(submission))["value"]
else:
submission = get_all_submissions(db, project_id)
submission = get_all_submissions_json(db, project_id)

if not submission:
raise HTTPException(status_code=404, detail="Submission not found")
Expand Down Expand Up @@ -342,8 +348,12 @@ def convert_to_osm(db: Session, project_id: int, task_id: int):
return FileResponse(final_zip_file_path)


def download_submission_for_project(db, project_id):
log.info(f"Downloading all submissions for a project {project_id}")
def gather_all_submission_csvs(db, project_id):
"""Gather all of the submission CSVs for a project.
Generate a single zip with all submissions.
"""
log.info(f"Downloading all CSV submissions for project {project_id}")

get_project_sync = async_to_sync(project_crud.get_project)
project_info = get_project_sync(db, project_id)
Expand Down Expand Up @@ -436,7 +446,103 @@ def extract_files(zip_file_path):
return final_zip_file_path


def get_all_submissions(db: Session, project_id):
def update_submission_in_s3(
db: Session, project_id: int, background_task_id: uuid.UUID
):
try:
# Get Project
get_project_sync = async_to_sync(project_crud.get_project)
project = get_project_sync(db, project_id)

# Gather metadata
odk_credentials = project_schemas.ODKCentral(
odk_central_url=project.odk_central_url,
odk_central_user=project.odk_central_user,
odk_central_password=project.odk_central_password,
)
odk_forms = list_odk_xforms(project.odkid, odk_credentials, True)

# Get latest submission date
valid_datetimes = [
form["lastSubmission"]
for form in odk_forms
if form["lastSubmission"] is not None
]
last_submission = max(
valid_datetimes, key=lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%fZ")
)

# Check if the file already exists in s3
s3_project_path = f"/{project.organisation_id}/{project_id}"
metadata_s3_path = f"/{s3_project_path}/submissions.meta.json"
try:
# Get the last submission date from the metadata
file = get_obj_from_bucket(settings.S3_BUCKET_NAME, metadata_s3_path)
zip_file_last_submission = (json.loads(file.getvalue()))["last_submission"]
if last_submission <= zip_file_last_submission:
# Update background task status to COMPLETED
update_bg_task_sync = async_to_sync(
project_crud.update_background_task_status_in_database
)
update_bg_task_sync(db, background_task_id, 4) # 4 is COMPLETED
return

except Exception as e:
log.warning(str(e))
pass

# Zip file is outdated, regenerate
metadata = {
"last_submission": last_submission,
}

# Get submissions from ODK Central
submissions = get_all_submissions_json(db, project_id)

submissions_zip = BytesIO()
# Create a sozipfile with metadata and submissions
with zipfile.ZipFile(
submissions_zip,
"w",
compression=zipfile.ZIP_DEFLATED,
chunk_size=zipfile.SOZIP_DEFAULT_CHUNK_SIZE,
) as myzip:
myzip.writestr("submissions.json", json.dumps(submissions))

# Add zipfile to the s3 bucket
add_obj_to_bucket(
settings.S3_BUCKET_NAME,
submissions_zip,
f"/{s3_project_path}/submission.zip",
)

# Upload metadata to s3
metadata_obj = BytesIO(json.dumps(metadata).encode())
add_obj_to_bucket(
settings.S3_BUCKET_NAME,
metadata_obj,
metadata_s3_path,
)

# Update background task status to COMPLETED
update_bg_task_sync = async_to_sync(
project_crud.update_background_task_status_in_database
)
update_bg_task_sync(db, background_task_id, 4) # 4 is COMPLETED

return True

except Exception as e:
log.warning(str(e))
# Update background task status to FAILED
update_bg_task_sync = async_to_sync(
project_crud.update_background_task_status_in_database
)
update_bg_task_sync(db, background_task_id, 2, str(e)) # 2 is FAILED


def get_all_submissions_json(db: Session, project_id):
"""Get all submissions for a project in JSON format."""
get_project_sync = async_to_sync(project_crud.get_project)
project_info = get_project_sync(db, project_id)

Expand All @@ -449,9 +555,9 @@ def get_all_submissions(db: Session, project_id):

project = get_odk_project(odk_credentials)

get_task_id_list_sync = async_to_sync(get_task_id_list)
task_lists = get_task_id_list_sync(db, project_id)
submissions = project.getAllSubmissions(project_info.odkid, task_lists)
get_task_id_list_sync = async_to_sync(tasks_crud.get_task_id_list)
task_list = get_task_id_list_sync(db, project_id)
submissions = project.getAllSubmissions(project_info.odkid, task_list)
return submissions


Expand Down
62 changes: 55 additions & 7 deletions src/backend/app/submission/submission_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
#
import json
import os
from typing import Optional

from fastapi import APIRouter, Depends, Response
from fastapi import APIRouter, BackgroundTasks, Depends, Response
from fastapi.concurrency import run_in_threadpool
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, JSONResponse
from osm_fieldwork.odk_merge import OdkMerge
from osm_fieldwork.osmfile import OsmFile
from sqlalchemy.orm import Session

from ..db import database
from ..projects import project_crud
from app.config import settings
from app.db import database
from app.projects import project_crud, project_schemas

from . import submission_crud

router = APIRouter(
Expand Down Expand Up @@ -104,7 +107,7 @@ async def download_submission(
"""
if not (task_id or export_json):
file = submission_crud.download_submission_for_project(db, project_id)
file = submission_crud.gather_all_submission_csvs(db, project_id)
return FileResponse(file)

return await submission_crud.download_submission(
Expand Down Expand Up @@ -164,7 +167,7 @@ async def conflate_osm_data(
# All Submissions JSON
# NOTE runs in separate thread using run_in_threadpool
submission = await run_in_threadpool(
lambda: submission_crud.get_all_submissions(db, project_id)
lambda: submission_crud.get_all_submissions_json(db, project_id)
)

# Data extracta file
Expand Down Expand Up @@ -214,6 +217,51 @@ async def conflate_osm_data(
return []


@router.post("/download-submission")
async def download_submission_json(
background_tasks: BackgroundTasks,
project_id: int,
background_task_id: Optional[str] = None,
db: Session = Depends(database.get_db),
):
# Get Project
project = await project_crud.get_project(db, project_id)

# Return existing export if complete
if background_task_id:
# Get the backgrund task status
task_status, task_message = await project_crud.get_background_task_status(
background_task_id, db
)

if task_status != 4:
return project_schemas.BackgroundTaskStatus(
status=task_status.name, message=task_message or ""
)

bucket_root = f"{settings.S3_DOWNLOAD_ROOT}/{settings.S3_BUCKET_NAME}"
return JSONResponse(
status_code=200,
content=f"{bucket_root}/{project.organisation_id}/{project_id}/submission.zip",
)

# Create task in db and return uuid
background_task_id = await project_crud.insert_background_task_into_database(
db, "sync_submission", project_id
)

background_tasks.add_task(
submission_crud.update_submission_in_s3, db, project_id, background_task_id
)
return JSONResponse(
status_code=200,
content={
"Message": "Submission update process initiated",
"task_id": str(background_task_id),
},
)


@router.get("/get_osm_xml/{project_id}")
async def get_osm_xml(
project_id: int,
Expand All @@ -229,7 +277,7 @@ async def get_osm_xml(
# All Submissions JSON
# NOTE runs in separate thread using run_in_threadpool
submission = await run_in_threadpool(
lambda: submission_crud.get_all_submissions(db, project_id)
lambda: submission_crud.get_all_submissions_json(db, project_id)
)

# Write the submission to a file
Expand Down
Loading

0 comments on commit c66f04c

Please sign in to comment.