-
Notifications
You must be signed in to change notification settings - Fork 79
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: update entity tests to support latest bulk entity upload api
- Loading branch information
1 parent
0bb19ca
commit a00ea82
Showing
3 changed files
with
105 additions
and
83 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,6 @@ | |
|
||
from osm_fieldwork.OdkCentral import OdkAppUser, OdkForm, OdkProject | ||
from osm_fieldwork.OdkCentralAsync import OdkDataset | ||
from osm_fieldwork.xlsforms import entities_registration_xml | ||
|
||
logging.basicConfig( | ||
level="DEBUG", | ||
|
@@ -155,51 +154,69 @@ def odk_form_cleanup(odk_form): | |
assert success | ||
|
||
|
||
# NOTE this is session scoped as odk_entity_cleanup depends on it | ||
@pytest.fixture(scope="session") | ||
def odk_entity(project_details) -> tuple: | ||
"""Get entity for a project.""" | ||
async def odk_dataset(project_details) -> tuple: | ||
"""Get dataset (entity list) for a project.""" | ||
odk_id = project_details.get("id") | ||
entity = OdkDataset( | ||
dataset = OdkDataset( | ||
url="https://proxy", | ||
user="[email protected]", | ||
passwd="Password1234", | ||
) | ||
return odk_id, entity | ||
|
||
# Create the dataset | ||
async with dataset as odk_dataset: | ||
created_dataset = await odk_dataset.createDataset( | ||
odk_id, | ||
"features", | ||
[ | ||
"geometry", | ||
"project_id", | ||
"task_id", | ||
"osm_id", | ||
"tags", | ||
"version", | ||
"changeset", | ||
"timestamp", | ||
"status", | ||
], | ||
) | ||
assert created_dataset.get("name") == "features" | ||
assert sorted(created_dataset.get("properties", [])) == sorted( | ||
[ | ||
"geometry", | ||
"project_id", | ||
"task_id", | ||
"osm_id", | ||
"tags", | ||
"version", | ||
"changeset", | ||
"timestamp", | ||
"status", | ||
] | ||
) | ||
|
||
return odk_id, dataset | ||
|
||
# NOTE this is session scoped to avoid attempting to create duplicate form | ||
@pytest.fixture(scope="session") | ||
async def odk_entity_cleanup(odk_entity): | ||
"""Get Entity for project, with automatic cleanup after.""" | ||
odk_id, entity = odk_entity | ||
|
||
# Create entity registration form | ||
form = OdkForm( | ||
entity.url, | ||
entity.user, | ||
entity.passwd, | ||
) | ||
form_name = form.createForm(odk_id, str(entities_registration_xml), publish=True) | ||
if not form_name: | ||
raise AssertionError("Failed to create form") | ||
@pytest.fixture(scope="session") | ||
async def odk_dataset_cleanup(odk_dataset): | ||
"""Get Dataset for project, with automatic cleanup after.""" | ||
odk_id, dataset = odk_dataset | ||
|
||
dataset_name = "features" | ||
async with entity: | ||
entity_json = await entity.createEntity(odk_id, dataset_name, "test entity", {"osm_id": "1", "geometry": "test"}) | ||
async with dataset as odk_dataset: | ||
entity_json = await odk_dataset.createEntity(odk_id, dataset_name, "test entity", {"osm_id": "1", "geometry": "test"}) | ||
entity_uuid = entity_json.get("uuid") | ||
|
||
# Before yield is used in tests | ||
yield odk_id, dataset_name, entity_uuid, entity | ||
yield odk_id, dataset_name, entity_uuid, dataset | ||
# After yield is test cleanup | ||
|
||
async with entity: | ||
entity_deleted = await entity.deleteEntity(odk_id, dataset_name, entity_uuid) | ||
async with dataset as odk_dataset: | ||
entity_deleted = await odk_dataset.deleteEntity(odk_id, dataset_name, entity_uuid) | ||
assert entity_deleted | ||
|
||
form_deleted = form.deleteForm(odk_id, form_name) | ||
assert form_deleted | ||
|
||
|
||
@pytest.fixture(scope="session", autouse=True) | ||
def cleanup(): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters