Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add task filtering based on metadata #449

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ async def get_records(self, filter_dict={}, fetch_single=False,

async def find_records(self, conditions: List[str] = None, values=[], fetch_single=False,
limit: int = 0, offset: int = 0, order: List[str] = None, expanded=False,
enable_joins=False, cur: aiopg.Cursor = None) -> Tuple[DBResponse, DBPagination]:
enable_joins=False, cur: aiopg.Cursor = None, select_columns: List[str] = None) -> Tuple[DBResponse, DBPagination]:
sql_template = """
SELECT * FROM (
SELECT {select_columns} FROM (
SELECT
{keys}
FROM {table_name}
Expand All @@ -239,11 +239,14 @@ async def find_records(self, conditions: List[str] = None, values=[], fetch_sing
where="WHERE {}".format(" AND ".join(conditions)) if conditions else "",
order_by="ORDER BY {}".format(", ".join(order)) if order else "",
limit="LIMIT {}".format(limit) if limit else "",
offset="OFFSET {}".format(offset) if offset else ""
offset="OFFSET {}".format(offset) if offset else "",
select_columns="*" if select_columns is None else ",".join(select_columns)
).strip()

# We should serialize the response if no custom select columns were provided, otherwise we return the raw result.
should_serialize = select_columns is None
return await self.execute_sql(select_sql=select_sql, values=values, fetch_single=fetch_single,
expanded=expanded, limit=limit, offset=offset, cur=cur)
expanded=expanded, limit=limit, offset=offset, cur=cur, serialize=should_serialize)

async def execute_sql(self, select_sql: str, values=[], fetch_single=False,
expanded=False, limit: int = 0, offset: int = 0,
Expand Down Expand Up @@ -689,6 +692,29 @@ class AsyncTaskTablePostgres(AsyncPostgresTable):
trigger_keys = primary_keys
select_columns = keys
step_table_name = AsyncStepTablePostgres.table_name
metadata_table_name = METADATA_TABLE_NAME

joins = [
"""
LEFT JOIN LATERAL (
SELECT field_name, value
FROM {metadata_table} as metadata
WHERE
{table_name}.flow_id = metadata.flow_id AND
{table_name}.run_number = metadata.run_number AND
{table_name}.step_name = metadata.step_name AND
{table_name}.task_id = metadata.task_id
) as metadata on true
""".format(
metadata_table=metadata_table_name,
table_name=table_name
)
]

join_columns = [
"metadata.field_name as metadata_field_name",
"metadata.value as metadata_value"
]

async def add_task(self, task: TaskRow, fill_heartbeat=False):
# todo backfill run_number if missing?
Expand All @@ -714,6 +740,31 @@ async def get_tasks(self, flow_id: str, run_id: str, step_name: str):
}
return await self.get_records(filter_dict=filter_dict)

async def get_filtered_task_ids(self, flow_id: str, run_id: str, step_name: str, metadata_field: str, metadata_value: str):
if metadata_field or metadata_value:
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
"metadata_field_name": metadata_field,
"metadata_value": metadata_value
}
db_response, pagination = await self.find_records(
conditions=[f"{k} = %s" for k, v in filter_dict.items() if v is not None],
values=[v for k, v in filter_dict.items() if v is not None],
order=["task_id"],
enable_joins=True,
select_columns=["task_name, task_id"]
)

# flatten the ids in the response
def _format_id(row):
# pick the task_name over task_id
return row[0] or row[1]
flattened_response = DBResponse(body=[_format_id(row) for row in db_response.body], response_code=db_response.response_code)
return flattened_response, pagination

async def get_task(self, flow_id: str, run_id: str, step_name: str,
task_id: str, expanded: bool = False):
run_id_key, run_id_value = translate_run_key(run_id)
Expand Down
60 changes: 60 additions & 0 deletions services/metadata_service/api/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def __init__(self, app):
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks",
self.get_tasks,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks",
self.get_filtered_tasks,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}",
Expand Down Expand Up @@ -76,6 +81,61 @@ async def get_tasks(self, request):
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
return db_response

@format_response
@handle_exceptions
async def get_filtered_tasks(self, request):
"""
---
description: get all task ids that match the provided metadata field name and/or value.
tags:
- Tasks
parameters:
- name: "flow_id"
in: "path"
description: "flow_id"
required: true
type: "string"
- name: "run_number"
in: "path"
description: "run_number"
required: true
type: "string"
- name: "step_name"
in: "path"
description: "step_name"
required: true
type: "string"
- name: "metadata_field_name"
in: "query"
description: "Metadata field name to filter with"
type: "string"
- name: "metadata_value"
in: "query"
description: "Value for the metadata field to filter on"
type: "string"
produces:
- text/plain
responses:
"200":
description: successful operation. Return tasks
"405":
description: invalid HTTP Method
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")

# possible filters
metadata_field = request.query.get("metadata_field_name", None)
metadata_value = request.query.get("metadata_value", None)

# We cannot do anything without filter values
if metadata_field is None and metadata_value is None:
raise web.HTTPBadRequest(reason="A metadata_field_name or metadata_value are required for filtering.")

db_response, _ = await self._async_table.get_filtered_task_ids(flow_id, run_number, step_name, metadata_field, metadata_value)
return db_response

@format_response
@handle_exceptions
async def get_task(self, request):
Expand Down
2 changes: 2 additions & 0 deletions services/metadata_service/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def handle_exceptions(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except web.HTTPClientError as ex:
return ServiceResponse(ex.status_code, ex.reason)
except Exception as err:
return http_500(str(err))

Expand Down
88 changes: 87 additions & 1 deletion services/metadata_service/tests/integration_tests/task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .utils import (
cli, db,
assert_api_get_response, assert_api_post_response, compare_partial,
add_flow, add_run, add_step, add_task, update_objects_with_run_tags
add_flow, add_run, add_step, add_task, add_metadata, update_objects_with_run_tags
)
import pytest

Expand Down Expand Up @@ -185,6 +185,92 @@ async def test_tasks_get(cli, db):
# getting tasks for non-existent step should return empty list
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/nonexistent/tasks".format(**_first_task), status=200, data=[])

async def test_filtered_tasks_get(cli, db):
# create a flow, run and step for the test
_flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body
_run = (await add_run(db, flow_id=_flow["flow_id"])).body
_step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body

# add tasks to the step
_first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body
_second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body
_third_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body

# add metadata to filter on
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_a", "value": "value_a"}))
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_b", "value": "value_b"}))

(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"}))
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"}))

# filtering with a shared key should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task),
data=[ _first_task["task_id"], _second_task["task_id"]])

# filtering with a shared value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_value=value_b".format(**_first_task),
data=[_first_task["task_id"], _second_task["task_id"]])

# filtering with a shared key&value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&metadata_value=value_b".format(**_first_task),
data=[_first_task["task_id"], _second_task["task_id"]])

# filtering with a shared value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=not_value_a".format(**_first_task),
data=[_second_task["task_id"]])

# filtering with a mixed key&value should not return results
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=value_b".format(**_first_task),
data=[])

# not providing filters should result in error
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), status=400)


async def test_filtered_tasks_mixed_ids_get(cli, db):
# create a flow, run and step for the test
_flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body
_run = (await add_run(db, flow_id=_flow["flow_id"])).body
_step = (await add_step(db, flow_id=_run["flow_id"], run_number=_run["run_number"], step_name="first_step")).body

# add tasks to the step
first_task_name = "first-task-1"
_first_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"], task_name=first_task_name)).body
# we need to refetch the task as the return does not contain the internal task ID we need for further record creation.
_first_task = (await db.task_table_postgres.get_task(flow_id=_step["flow_id"], run_id=_step["run_number"], step_name=_step["step_name"], task_id=first_task_name, expanded=True)).body
_second_task = (await add_task(db, flow_id=_step["flow_id"], run_number=_step["run_number"], step_name=_step["step_name"])).body

# add metadata to filter on
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_a", "value": "value_a"}))
(await add_metadata(db, flow_id=_first_task["flow_id"], run_number=_first_task["run_number"], step_name=_first_task["step_name"], task_id=_first_task["task_id"], metadata={"field_name":"field_b", "value": "value_b"}))

(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_a", "value": "not_value_a"}))
(await add_metadata(db, flow_id=_second_task["flow_id"], run_number=_second_task["run_number"], step_name=_second_task["step_name"], task_id=_second_task["task_id"], metadata={"field_name": "field_b", "value": "value_b"}))

# filtering with a shared key should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a".format(**_first_task),
data=[first_task_name, _second_task["task_id"]])

# filtering with a shared value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_value=value_b".format(**_first_task),
data=[first_task_name, _second_task["task_id"]])

# filtering with a shared key&value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_b&metadata_value=value_b".format(**_first_task),
data=[first_task_name, _second_task["task_id"]])

# filtering with a shared value should return all relevant tasks
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=not_value_a".format(**_first_task),
data=[_second_task["task_id"]])

# filtering with a mixed key&value should not return results
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks?metadata_field_name=field_a&metadata_value=value_b".format(**_first_task),
data=[])

# not providing filters should result in error
await assert_api_get_response(cli, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks".format(**_first_task), status=400)



async def test_task_get(cli, db):
# create flow, run and step for test
Expand Down
Loading