Skip to content

Commit

Permalink
tests refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Ronald Krist committed Dec 20, 2024
1 parent b6ce055 commit ef18d20
Show file tree
Hide file tree
Showing 20 changed files with 270 additions and 750 deletions.
206 changes: 58 additions & 148 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,111 +321,23 @@ class WithApprovalPermissions(RequestBasedWorkflowPermissions):
),
}


@pytest.fixture
def change_workflow_function():
from oarepo_workflows.proxies import current_oarepo_workflows

return current_oarepo_workflows.set_workflow


@pytest.fixture(scope="module")
def create_app(instance_path, entry_points):
"""Application factory fixture."""
return create_api


@pytest.fixture()
@pytest.fixture(autouse=True)
def vocab_cf(app, db, cache):
from oarepo_runtime.services.custom_fields.mappings import prepare_cf_indices

prepare_cf_indices()
ThesisDraft.index.refresh()


@pytest.fixture()
def urls():
return {"BASE_URL": "/thesis/", "BASE_URL_REQUESTS": "/requests/"}


@pytest.fixture()
def publish_request_data_function():
def ret_data(record_id):
return {
"request_type": "publish_draft",
"topic": {"thesis_draft": record_id},
"payload": {"version": "1.0"},
}

return ret_data


@pytest.fixture()
def conditional_recipient_request_data_function():
def ret_data(record_id):
return {
"request_type": "conditional_recipient_rt",
"topic": {"thesis_draft": record_id},
}

return ret_data


@pytest.fixture()
def another_topic_updating_request_function():
def ret_data(record_id):
return {
"request_type": "another_topic_updating",
"topic": {"thesis_draft": record_id},
}

return ret_data


@pytest.fixture()
def edit_record_data_function():
def ret_data(record_id):
return {
"request_type": "edit_published_record",
"topic": {"thesis": record_id},
}

return ret_data


@pytest.fixture()
def new_version_data_function():
def ret_data(record_id):
return {
"request_type": "new_version",
"topic": {"thesis": record_id},
}

return ret_data


@pytest.fixture()
def delete_record_data_function():
def ret_data(record_id):
return {
"request_type": "delete_published_record",
"topic": {"thesis": record_id},
}

return ret_data


@pytest.fixture()
def delete_draft_function():
def ret_data(record_id):
return {
"request_type": "delete_draft",
"topic": {"thesis_draft": record_id},
}

return ret_data


@pytest.fixture()
def serialization_result():
def _result(topic_id, request_id):
Expand Down Expand Up @@ -560,6 +472,10 @@ def request_events_service(app):
service = current_requests.request_events_service
return service

@pytest.fixture(scope="module")
def record_service():
return current_service


@pytest.fixture()
def users(app, db, UserFixture):
Expand Down Expand Up @@ -631,35 +547,15 @@ def _logged_client(user):

return _logged_client


@pytest.fixture(scope="function")
def request_record_input_data():
"""Input data to a Request record."""
ret = {
"title": "Doc1 approval",
"payload": {
"content": "Can you approve my document doc1 please?",
"format": RequestEventFormat.HTML.value,
},
}
return ret


@pytest.fixture(scope="module")
def record_service():
return current_service


@pytest.fixture()
def example_topic_draft(record_service, users, default_workflow_json): # needed for ui
identity = users[0].identity
draft = record_service.create(identity, default_workflow_json)
return draft._obj


@pytest.fixture()
def record_factory(record_service, default_workflow_json):
def record(identity, custom_workflow=None, additional_data=None):
def merge_record_data(default_workflow_json):
def _merge_data(custom_workflow=None, additional_data=None):
json = copy.deepcopy(default_workflow_json)
if custom_workflow: # specifying this assumes use of workflows
json["parent"]["workflow"] = custom_workflow
Expand All @@ -675,37 +571,44 @@ def record(identity, custom_workflow=None, additional_data=None):
json = always_merger.merge(json, json_metadata)
if additional_data:
always_merger.merge(json, additional_data)
return json
return _merge_data


@pytest.fixture()
def draft_factory(record_service, merge_record_data):
def record(identity, custom_workflow=None, additional_data=None):
json = merge_record_data(custom_workflow, additional_data)
draft = record_service.create(identity, json)
record = record_service.publish(system_identity, draft.id)
return record._obj
return draft._obj

return record

@pytest.fixture()
def record_factory(record_service, draft_factory, urls):
# bypassing request pattern with system identity
def record(client, custom_workflow=None, additional_data=None):
draft = draft_factory(client.user_fixture.identity, custom_workflow, additional_data)
record = record_service.publish(system_identity, draft['id'])
ret = client.get(f"{urls['BASE_URL']}{record['id']}") # unified return value
return ret

return record



@pytest.fixture()
def record_with_files_factory(record_service, default_workflow_json):
def record(identity, custom_workflow=None, additional_data=None):
json = copy.deepcopy(default_workflow_json)
def record_with_files_factory(record_service, draft_factory, default_workflow_json, urls):
def record(client, custom_workflow=None, additional_data=None):
identity = client.user_fixture.identity
if (
"files" in default_workflow_json
and "enabled" in default_workflow_json["files"]
):
default_workflow_json["files"]["enabled"] = True
if custom_workflow: # specifying this assumes use of workflows
json["parent"]["workflow"] = custom_workflow
json = {
"metadata": {
"creators": [
"Creator 1",
"Creator 2",
],
"contributors": ["Contributor 1"],
}
}
json = always_merger.merge(json, default_workflow_json)
if additional_data:
always_merger.merge(json, additional_data)
draft = record_service.create(identity, json)
if not additional_data:
additional_data = {}
additional_data.setdefault("files",{}).setdefault("enabled", True)
draft = draft_factory(identity, custom_workflow, additional_data)

# upload file
# Initialize files upload
Expand All @@ -722,28 +625,26 @@ def record(identity, custom_workflow=None, additional_data=None):
)
commit = files_service.commit_file(identity, draft["id"], "test.pdf")

record = record_service.publish(system_identity, draft.id)
return record._obj
#publish record
record = record_service.publish(system_identity, draft["id"])
ret = client.get(f"{urls['BASE_URL']}{record['id']}") # unified return value
return ret

return record



@pytest.fixture()
def create_draft_via_resource(default_workflow_json, urls):
def create_draft_via_resource(merge_record_data, urls):
def _create_draft(
client, expand=True, custom_workflow=None, additional_data=None, **kwargs
):
json = copy.deepcopy(default_workflow_json)
if custom_workflow:
json["parent"]["workflow"] = custom_workflow
if additional_data:
json = always_merger.merge(json, additional_data)
json = merge_record_data(custom_workflow, additional_data)
url = urls["BASE_URL"] + "?expand=true" if expand else urls["BASE_URL"]
return client.post(url, json=json, **kwargs)

return _create_draft


@pytest.fixture()
def events_resource_data():
"""Input data for the Request Events Resource (REST body)."""
Expand All @@ -754,7 +655,6 @@ def events_resource_data():
}
}


def _create_role(id, name, description, is_managed, database):
"""Creates a Role/Group."""
r = current_datastore.create_role(
Expand Down Expand Up @@ -826,28 +726,38 @@ def _create_request_from_link(request_types_json, request_type):

return _create_request_from_link

@pytest.fixture()
def request_type_additional_data():
return {"publish_draft":{"payload": {"version": "1.0"}}}

@pytest.fixture
def create_request_by_link(get_request_link):
def _create_request(client, record, request_type):
def create_request_by_link(get_request_link, request_type_additional_data):
def _create_request(client, record, request_type, additional_data=None, **request_kwargs):
if additional_data is None:
additional_data = {}
applicable_requests = client.get(
link2testclient(record.json["links"]["applicable-requests"])
).json["hits"]["hits"]
create_link = link2testclient(
get_request_link(applicable_requests, request_type)
)
create_response = client.post(create_link)
if request_type in request_type_additional_data:
additional_data = always_merger.merge(additional_data, request_type_additional_data[request_type])
if not additional_data:
create_response = client.post(create_link, **request_kwargs)
else:
create_response = client.post(create_link, json=additional_data, **request_kwargs)
return create_response

return _create_request


@pytest.fixture
def submit_request_by_link(create_request_by_link):
def _submit_request(client, record, request_type):
create_response = create_request_by_link(client, record, request_type)
def _submit_request(client, record, request_type, create_additional_data=None, submit_additional_data=None):
create_response = create_request_by_link(client, record, request_type, additional_data=create_additional_data)
submit_response = client.post(
link2testclient(create_response.json["links"]["actions"]["submit"])
link2testclient(create_response.json["links"]["actions"]["submit"]), json=submit_additional_data
)
return submit_response

Expand Down
Loading

0 comments on commit ef18d20

Please sign in to comment.