Skip to content

Commit

Permalink
fix: large refactor to OdkCentralAsync, raise errors when required
Browse files Browse the repository at this point in the history
  • Loading branch information
spwoodcock committed Jul 29, 2024
1 parent e4ec2e4 commit 0bb19ca
Showing 1 changed file with 157 additions and 126 deletions.
283 changes: 157 additions & 126 deletions osm_fieldwork/OdkCentralAsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@
import logging
import os
from asyncio import gather
from typing import Dict, List, Optional
from typing import Any, Optional, TypedDict
from uuid import uuid4

import aiohttp

log = logging.getLogger(__name__)


class EntityIn(TypedDict):
"""Required format for Entity uploads to ODK Central."""

label: str
data: dict[str, Any]


class OdkCentral(object):
"""Helper methods for ODK Central API."""

def __init__(
self,
url: Optional[str] = None,
Expand Down Expand Up @@ -141,8 +150,9 @@ async def listForms(self, projectId: int, metadata: bool = False):
self.forms = await response.json()
return self.forms
except aiohttp.ClientError as e:
log.error(f"Error fetching forms: {e}")
return []
msg = f"Error fetching forms: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def listSubmissions(self, projectId: int, xform: str, filters: dict = None):
"""Fetch a list of submission instances for a given form.
Expand All @@ -167,8 +177,9 @@ async def listSubmissions(self, projectId: int, xform: str, filters: dict = None
async with self.session.get(url, params=filters, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Error fetching submissions: {e}")
return {}
msg = f"Error fetching submissions: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def getAllProjectSubmissions(self, projectId: int, xforms: list = None, filters: dict = None):
"""Fetch a list of submissions in a project on an ODK Central server.
Expand Down Expand Up @@ -242,8 +253,108 @@ async def listDatasets(
async with self.session.get(url, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Error fetching datasets: {e}")
return []
msg = f"Error fetching datasets: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def createDataset(
self,
projectId: int,
datasetName: Optional[str] = "features",
properties: Optional[list[str]] = [],
):
"""Creates a dataset for a given project.
Args:
projectId (int): The ID of the project to create the dataset for.
datasetName (str): The name of the dataset to be created.
properties (list[str]): List of property names to create.
Alternatively call createDatasetProperty for each property manually.
Returns:
dict: The JSON response containing information about the created dataset.
Raises:
aiohttp.ClientError: If an error occurs during the dataset creation process.
"""
# Create the dataset
url = f"{self.base}projects/{projectId}/datasets"
payload = {"name": datasetName}
try:
log.info(f"Creating dataset ({datasetName}) for project ({projectId})")
async with self.session.post(
url,
ssl=self.verify,
json=payload,
) as response:
if response.status not in (200, 201):
error_message = await response.text()
log.error(f"Failed to create Dataset: {error_message}")
log.info(f"Successfully created Dataset {datasetName}")
dataset = await response.json()
except aiohttp.ClientError as e:
msg = f"Failed to create Dataset: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

if not properties:
return dataset

# Add the properties, if specified
# FIXME this is a bit of a hack until ODK Central has better support
# FIXME for adding dataset properties in bulk
try:
log.debug(f"Creating properties for dataset ({datasetName}): {properties}")
properties_tasks = [self.createDatasetProperty(projectId, field, datasetName) for field in properties]
success = await gather(*properties_tasks, return_exceptions=True) # type: ignore
if not success:
log.warning(f"No properties were uploaded for ODK project ({projectId}) dataset name ({datasetName})")
log.info(f"Successfully created properties for dataset ({datasetName})")
except aiohttp.ClientError as e:
msg = f"Failed to create properties: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

# Manually append to prevent another API call
dataset["properties"] = properties
return dataset

async def createDatasetProperty(
self,
projectId: int,
field_name: str,
datasetName: Optional[str] = "features",
):
"""Create a property for a dataset.
Args:
projectId (int): The ID of the project.
datasetName (str): The name of the dataset.
field (dict): A dictionary containing the field information.
Returns:
dict: The response data from the API.
Raises:
aiohttp.ClientError: If an error occurs during the API request.
"""
url = f"{self.base}projects/{projectId}/datasets/{datasetName}/properties"
payload = {
"name": field_name,
}

try:
log.debug(f"Creating property of dataset {datasetName}")
async with self.session.post(url, ssl=self.verify, json=payload) as response:
response_data = await response.json()
if response.status not in (200, 201):
log.debug(f"Failed to create properties: {response.status}, message='{response_data}'")
log.debug(f"Successfully created properties for dataset {datasetName}")
return response_data
except aiohttp.ClientError as e:
msg = f"Failed to create properties: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def listEntities(
self,
Expand Down Expand Up @@ -288,8 +399,9 @@ async def listEntities(
async with self.session.get(url, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Error fetching entities: {e}")
return []
msg = f"Error fetching entities: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def getEntity(
self,
Expand Down Expand Up @@ -341,102 +453,10 @@ async def getEntity(
async with self.session.get(url, ssl=self.verify) as response:
return await response.json()
except aiohttp.ClientError as e:
# NOTE skip raising exception on HTTP 404 (not found)
log.error(f"Error fetching entity: {e}")
return {}

async def createDataset(
self,
projectId: int,
datasetName: str,
):
"""Creates a dataset for a given project.
Args:
projectId (int): The ID of the project to create the dataset for.
datasetName (str): The name of the dataset to be created.
Returns:
dict: The JSON response containing information about the created dataset.
Raises:
aiohttp.ClientError: If an error occurs during the dataset creation process.
"""
url = f"{self.base}projects/{projectId}/datasets"
payload = {"name": "features"}
try:
log.info(f"creating datasets {datasetName} for project {projectId}")
async with self.session.post(
url,
ssl=self.verify,
json=payload,
) as response:
if response.status not in (200, 201):
error_message = await response.text()
log.error(f"Failed to create dataset: {error_message}")
log.info(f"successfully created dataset {datasetName}")
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Failed to create Entity: {e}")
return {}

async def createProperty(self, projectId: int, datasetName: str, field: dict):
"""Create a property for a dataset.
Args:
projectId (int): The ID of the project.
datasetName (str): The name of the dataset.
field (dict): A dictionary containing the field information.
Returns:
dict: The response data from the API.
Raises:
aiohttp.ClientError: If an error occurs during the API request.
"""
url = f"{self.base}projects/{projectId}/datasets/{datasetName}/properties"
payload = {
"name": field["name"],
}

try:
log.info(f"creating property of dataset {datasetName}")
async with self.session.post(url, ssl=self.verify, json=payload) as response:
response_data = await response.json()
if response.status not in (200, 201):
log.error(f"Failed to create properties: {response.status}, message='{response_data}'")
log.info(f"Successfully created properties for dataset {datasetName}")
return response_data
except aiohttp.ClientError as e:
log.error(f"Failed to create properties: {e}")
return {}

async def createProperties(self, projectId: int, datasetName: str, properties: List[dict]):
"""Create a property for a dataset.
Args:
projectId (int): The ID of the project.
datasetName (str): The name of the dataset.
field (dict): A dictionary containing the field information.
Returns:
dict: The response data from the API.
Raises:
aiohttp.ClientError: If an error occurs during the API request.
"""
try:
log.info(f"bulk uploading properties of dataset {datasetName}")
properties_tasks = [self.createProperty(projectId, datasetName, field) for field in properties]
properties = await gather(*properties_tasks, return_exceptions=True) # type: ignore
if not properties:
log.warning(f"No properties were uploaded for ODK project ({projectId}) dataset name ({datasetName})")
log.info(f"Successfully created properties for dataset {datasetName}")
return {"success"}
except aiohttp.ClientError as e:
log.error(f"Failed to create properties: {e}")
return {}

# TODO: not required anymore, only if required to upload single entity
async def createEntity(
self,
projectId: int,
Expand Down Expand Up @@ -517,40 +537,47 @@ async def createEntity(
) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Failed to create Entity: {e}")
return {}
msg = f"Failed to create Entity: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def createEntities(
self,
projectId: int,
datasetName: str,
entities: List[Dict],
):
entities: list[EntityIn],
) -> dict:
"""Bulk create Entities in a project dataset (entity list).
NOTE this endpoint will be redundant after Central 2024.01 release.
Args:
projectId (int): The ID of the project on ODK Central.
datasetName (int): The name of a dataset, specific to a project.
labelDataDict (dict): Mapping of Entity label:data (str:dict) to insert.
entities (list[EntityIn]): A list of Entities to insert.
Format: {"label": "John Doe", "data": {"firstName": "John", "age": "22"}}
Returns:
list: A list of Entity detail JSONs.
The 'uuid' field includes the unique entity identifier.
# list: A list of Entity detail JSONs.
# The 'uuid' field includes the unique entity identifier.
dict: {'success': true}
When creating bulk entities ODK Central return this for now.
"""
log.info(f"Bulk uploading Entities for project ({projectId}) dataset ({datasetName})")
# Validation
if not isinstance(entities, list):
raise ValueError("Entities must be a list")

log.info(f"Bulk uploading ({len(entities)}) Entities for project ({projectId}) dataset ({datasetName})")
url = f"{self.base}projects/{projectId}/datasets/{datasetName}/entities"
payload = {"entities": entities, "source": {"name": "features.csv", "size": len(entities)}}
payload = {"entities": entities, "source": {"name": "features.csv"}}

try:
log.info(f"Creating ({len(entities)}) entities for project " f"({projectId}) dataset ({datasetName})")
async with self.session.post(url, ssl=self.verify, json=payload) as response:
response.raise_for_status()
log.info(f"Successfully created entities for project {projectId} in dataset {datasetName}")
log.info(f"Successfully created entities for project ({projectId}) in dataset ({datasetName})")
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Failed to create Entity: {e}")
return {}
msg = f"Failed to create Entities: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def updateEntity(
self,
Expand Down Expand Up @@ -643,8 +670,9 @@ async def updateEntity(
) as response:
return await response.json()
except aiohttp.ClientError as e:
log.error(f"Failed to update Entity: {e}")
return {}
msg = f"Failed to update Entity: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def deleteEntity(
self,
Expand Down Expand Up @@ -674,8 +702,9 @@ async def deleteEntity(
log.debug(f"Server returned deletion unsuccessful: {response_msg}")
return success
except aiohttp.ClientError as e:
log.error(f"Failed to delete Entity: {e}")
return False
msg = f"Failed to delete Entity: {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

async def getEntityCount(
self,
Expand All @@ -697,11 +726,12 @@ async def getEntityCount(
async with self.session.get(url, ssl=self.verify) as response:
count = (await response.json()).get("@odata.count", None)
except aiohttp.ClientError as e:
log.error(f"Failed to get Entity count for project ({projectId}): {e}")
return 0
msg = f"Failed to get Entity count for project ({projectId}): {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

if count is None:
log.debug(f"Failed to get Entity count for project ({projectId}) " f"dataset ({datasetName})")
log.debug(f"Project ({projectId}) has no Entities in dataset ({datasetName})")
return 0

return count
Expand Down Expand Up @@ -804,5 +834,6 @@ async def getEntityData(
return response_json.get("value", [])
return response_json
except aiohttp.ClientError as e:
log.error(f"Failed to get Entity data: {e}")
return {}
msg = f"Failed to get Entity data for project ({projectId}): {e}"
log.error(msg)
raise aiohttp.ClientError(msg) from e

0 comments on commit 0bb19ca

Please sign in to comment.