diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index def12455..aee72d33 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -217,9 +217,9 @@ async def get_records(self, filter_dict={}, fetch_single=False, async def find_records(self, conditions: List[str] = None, values=[], fetch_single=False, limit: int = 0, offset: int = 0, order: List[str] = None, expanded=False, - enable_joins=False, cur: aiopg.Cursor = None) -> Tuple[DBResponse, DBPagination]: + enable_joins=False, cur: aiopg.Cursor = None, select_columns: List[str] = None) -> Tuple[DBResponse, DBPagination]: sql_template = """ - SELECT * FROM ( + SELECT {select_columns} FROM ( SELECT {keys} FROM {table_name} @@ -239,11 +239,14 @@ async def find_records(self, conditions: List[str] = None, values=[], fetch_sing where="WHERE {}".format(" AND ".join(conditions)) if conditions else "", order_by="ORDER BY {}".format(", ".join(order)) if order else "", limit="LIMIT {}".format(limit) if limit else "", - offset="OFFSET {}".format(offset) if offset else "" + offset="OFFSET {}".format(offset) if offset else "", + select_columns="*" if select_columns is None else ",".join(select_columns) ).strip() + # We should serialize the response if no custom select columns were provided, otherwise we return the raw result. + should_serialize = select_columns is None return await self.execute_sql(select_sql=select_sql, values=values, fetch_single=fetch_single, - expanded=expanded, limit=limit, offset=offset, cur=cur) + expanded=expanded, limit=limit, offset=offset, cur=cur, serialize=should_serialize) async def execute_sql(self, select_sql: str, values=[], fetch_single=False, expanded=False, limit: int = 0, offset: int = 0, @@ -689,6 +692,29 @@ class AsyncTaskTablePostgres(AsyncPostgresTable): trigger_keys = primary_keys select_columns = keys step_table_name = AsyncStepTablePostgres.table_name + metadata_table_name = METADATA_TABLE_NAME + + joins = [ + """ + LEFT JOIN LATERAL ( + SELECT field_name, value + FROM {metadata_table} as metadata + WHERE + {table_name}.flow_id = metadata.flow_id AND + {table_name}.run_number = metadata.run_number AND + {table_name}.step_name = metadata.step_name AND + {table_name}.task_id = metadata.task_id + ) as metadata on true + """.format( + metadata_table=metadata_table_name, + table_name=table_name + ) + ] + + join_columns = [ + "metadata.field_name as metadata_field_name", + "metadata.value as metadata_value" + ] async def add_task(self, task: TaskRow, fill_heartbeat=False): # todo backfill run_number if missing? @@ -714,6 +740,31 @@ async def get_tasks(self, flow_id: str, run_id: str, step_name: str): } return await self.get_records(filter_dict=filter_dict) + async def get_filtered_task_ids(self, flow_id: str, run_id: str, step_name: str, metadata_field: str, metadata_value: str): + if metadata_field or metadata_value: + run_id_key, run_id_value = translate_run_key(run_id) + filter_dict = { + "flow_id": flow_id, + run_id_key: run_id_value, + "step_name": step_name, + "metadata_field_name": metadata_field, + "metadata_value": metadata_value + } + db_response, pagination = await self.find_records( + conditions=[f"{k} = %s" for k, v in filter_dict.items() if v is not None], + values=[v for k, v in filter_dict.items() if v is not None], + order=["task_id"], + enable_joins=True, + select_columns=["task_name, task_id"] + ) + + # flatten the ids in the response + def _format_id(row): + # pick the task_name over task_id + return row[0] or row[1] + flattened_response = DBResponse(body=[_format_id(row) for row in db_response.body], response_code=db_response.response_code) + return flattened_response, pagination + async def get_task(self, flow_id: str, run_id: str, step_name: str, task_id: str, expanded: bool = False): run_id_key, run_id_value = translate_run_key(run_id) diff --git a/services/metadata_service/api/task.py b/services/metadata_service/api/task.py index 06fdc81e..5b8f228a 100644 --- a/services/metadata_service/api/task.py +++ b/services/metadata_service/api/task.py @@ -19,6 +19,11 @@ def __init__(self, app): "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks", self.get_tasks, ) + app.router.add_route( + "GET", + "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks", + self.get_filtered_tasks, + ) app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}", @@ -76,6 +81,61 @@ async def get_tasks(self, request): db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response) return db_response + @format_response + @handle_exceptions + async def get_filtered_tasks(self, request): + """ + --- + description: get all task ids that match the provided metadata field name and/or value. + tags: + - Tasks + parameters: + - name: "flow_id" + in: "path" + description: "flow_id" + required: true + type: "string" + - name: "run_number" + in: "path" + description: "run_number" + required: true + type: "string" + - name: "step_name" + in: "path" + description: "step_name" + required: true + type: "string" + - name: "metadata_field_name" + in: "query" + description: "Metadata field name to filter with" + type: "string" + - name: "metadata_value" + in: "query" + description: "Value for the metadata field to filter on" + type: "string" + produces: + - text/plain + responses: + "200": + description: successful operation. Return tasks + "405": + description: invalid HTTP Method + """ + flow_id = request.match_info.get("flow_id") + run_number = request.match_info.get("run_number") + step_name = request.match_info.get("step_name") + + # possible filters + metadata_field = request.query.get("metadata_field_name", None) + metadata_value = request.query.get("metadata_value", None) + + # We cannot do anything without filter values + if metadata_field is None and metadata_value is None: + raise web.HTTPBadRequest(reason="A metadata_field_name or metadata_value are required for filtering.") + + db_response, _ = await self._async_table.get_filtered_task_ids(flow_id, run_number, step_name, metadata_field, metadata_value) + return db_response + @format_response @handle_exceptions async def get_task(self, request): diff --git a/services/metadata_service/api/utils.py b/services/metadata_service/api/utils.py index 9b232c64..8e916436 100644 --- a/services/metadata_service/api/utils.py +++ b/services/metadata_service/api/utils.py @@ -59,6 +59,8 @@ def handle_exceptions(func): async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) + except web.HTTPClientError as ex: + return ServiceResponse(ex.status_code, ex.reason) except Exception as err: return http_500(str(err)) diff --git a/services/metadata_service/tests/integration_tests/task_test.py b/services/metadata_service/tests/integration_tests/task_test.py index 24a1cee5..6e12f1b3 100644 --- a/services/metadata_service/tests/integration_tests/task_test.py +++ b/services/metadata_service/tests/integration_tests/task_test.py @@ -3,7 +3,7 @@ from .utils import ( cli, db, assert_api_get_response, assert_api_post_response, compare_partial, - add_flow, add_run, add_step, add_task, update_objects_with_run_tags + add_flow, add_run, add_step, add_task, add_metadata, update_objects_with_run_tags ) import pytest @@ -185,6 +185,92 @@ async def test_tasks_get(cli, db): # getting tasks for non-existent step should return empty list await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/nonexistent/tasks".format(**_first_task), status=200, data=[]) +async def test_filtered_tasks_get(cli, db): + # create a flow, run and step for the test + _flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body + _run = (await add_run(db, flow_id=_flow["flow_id"])).body + _step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body + + # add tasks to the step + _first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body + _second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body + _third_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body + + # add metadata to filter on + (await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_a", "value": "value_a"})) + (await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_b", "value": "value_b"})) + + (await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"})) + (await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"})) + + # filtering with a shared key should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task), + data=[ _first_task["task_id"], _second_task["task_id"]]) + + # filtering with a shared value should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_value=value_b".format(**_first_task), + data=[_first_task["task_id"], _second_task["task_id"]]) + + # filtering with a shared key&value should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&metadata_value=value_b".format(**_first_task), + data=[_first_task["task_id"], _second_task["task_id"]]) + + # filtering with a shared value should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=not_value_a".format(**_first_task), + data=[_second_task["task_id"]]) + + # filtering with a mixed key&value should not return results + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=value_b".format(**_first_task), + data=[]) + + # not providing filters should result in error + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), status=400) + + +async def test_filtered_tasks_mixed_ids_get(cli, db): + # create a flow, run and step for the test + _flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body + _run = (await add_run(db, flow_id=_flow["flow_id"])).body + _step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body + + # add tasks to the step + first_task_name = "first-task-1" + _first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"], task_name=first_task_name)).body + # we need to refetch the task as the return does not contain the internal task ID we need for further record creation. + _first_task = (await db.task_table_postgres.get_task(flow_id=_step["flow_id"], run_id=_step["run_number"], step_name=_step["step_name"], task_id=first_task_name, expanded=True)).body + _second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body + + # add metadata to filter on + (await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_a", "value": "value_a"})) + (await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_b", "value": "value_b"})) + + (await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"})) + (await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"})) + + # filtering with a shared key should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task), + data=[first_task_name, _second_task["task_id"]]) + + # filtering with a shared value should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_value=value_b".format(**_first_task), + data=[first_task_name, _second_task["task_id"]]) + + # filtering with a shared key&value should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&metadata_value=value_b".format(**_first_task), + data=[first_task_name, _second_task["task_id"]]) + + # filtering with a shared value should return all relevant tasks + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=not_value_a".format(**_first_task), + data=[_second_task["task_id"]]) + + # filtering with a mixed key&value should not return results + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=value_b".format(**_first_task), + data=[]) + + # not providing filters should result in error + await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), status=400) + + async def test_task_get(cli, db): # create flow, run and step for test