Skip to content

Commit

Permalink
[resotocore][fix] Batch size and timeout (#1872)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Jan 8, 2024
1 parent b7abd71 commit d8e1342
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
3 changes: 2 additions & 1 deletion resotocore/resotocore/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -6147,8 +6147,9 @@ def parse_duration_or_int(s: str) -> Union[int, timedelta]:
parser.add_argument("--filter", type=predicate_term.parse, nargs="*", default=None)
parser.add_argument("--granularity", type=parse_duration_or_int, default=5)
p = parser.parse_args(args_parts_unquoted_parser.parse(part))
timeout = if_set(ctx.env.get("search_timeout"), duration)
cursor = await self.dependencies.db_access.time_series_db.load_time_series(
p.name, p.start, p.end, group_by=p.group, filter_by=p.filter, granularity=p.granularity
p.name, p.start, p.end, group_by=p.group, filter_by=p.filter, granularity=p.granularity, timeout=timeout
)
return CLISourceContext(cursor.count(), cursor.full_count()), cursor

Expand Down
27 changes: 23 additions & 4 deletions resotocore/resotocore/db/async_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@

class AsyncCursor(AsyncIterator[Any]):
def __init__(
self, cursor: Cursor, trafo: Optional[Callable[[Json], Optional[Any]]], flatten_nodes_and_edges: bool = False
self,
cursor: Cursor,
*,
query: str,
bind_vars: Optional[Json] = None,
trafo: Optional[Callable[[Json], Optional[Any]]] = None,
flatten_nodes_and_edges: bool = False,
):
self.query = query
self.bind_vars = bind_vars
self.cursor = cursor
self.visited_node: Set[str] = set()
self.visited_edge: Set[str] = set()
Expand Down Expand Up @@ -72,6 +80,8 @@ async def __anext__(self) -> Any:
return await self.next_deferred_edge()

def close(self) -> None:
if stats := self.cursor.statistics():
log.debug(f"Query {self.query} with bind_vars {self.bind_vars} took {stats}")
self.cursor.close(ignore_missing=True)

def count(self) -> Optional[int]:
Expand Down Expand Up @@ -136,6 +146,7 @@ async def next_from_db(self) -> Json:
res: Json = self.cursor.pop()
return res
except CursorNextError as ex:
log.error(f"Cursor does not exist any longer. Query: {self.query} with bind_vars: {self.bind_vars}")
raise QueryTookToLongError("Cursor does not exist any longer, since the query ran for too long.") from ex

async def next_deferred_edge(self) -> Json:
Expand All @@ -149,8 +160,8 @@ async def next_deferred_edge(self) -> Json:


class AsyncCursorContext(AsyncContextManager[AsyncCursor]):
def __init__(self, cursor: Cursor, trafo: Optional[Callable[[Json], Optional[Any]]], flatten_nodes_and_edges: bool):
self.cursor = AsyncCursor(cursor, trafo, flatten_nodes_and_edges)
def __init__(self, cursor: AsyncCursor):
self.cursor = cursor

async def __aenter__(self) -> AsyncCursor:
return self.cursor
Expand Down Expand Up @@ -211,7 +222,15 @@ async def aql_cursor(
skip_inaccessible_cols,
max_runtime,
)
return AsyncCursorContext(cursor, trafo, flatten_nodes_and_edges or False)
return AsyncCursorContext(
AsyncCursor(
cursor,
trafo=trafo,
flatten_nodes_and_edges=flatten_nodes_and_edges or False,
query=query,
bind_vars=bind_vars,
)
)

async def aql(
self,
Expand Down
12 changes: 11 additions & 1 deletion resotocore/resotocore/db/timeseriesdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import defaultdict
from datetime import timedelta, datetime, timezone
from functools import partial
from numbers import Number
from typing import Optional, List, Set, Union, cast, Callable, Dict

from attr import evolve, define
Expand Down Expand Up @@ -107,6 +108,8 @@ async def load_time_series(
filter_by: Optional[List[Predicate]] = None,
granularity: Optional[Union[timedelta, int]] = None,
trafo: Optional[Callable[[Json], Json]] = None,
batch_size: Optional[int] = None,
timeout: Optional[timedelta] = None,
) -> AsyncCursor:
"""
Load time series data.
Expand All @@ -119,6 +122,8 @@ async def load_time_series(
The minimum granularity is one hour.
In case this number is an integer, it is interpreted as the number of steps between start and end.
:param trafo: Optional transformation function to apply to each result.
:param batch_size: Optional batch size for the query.
:param timeout: Timeout for the query to run.
:return: A cursor to iterate over the time series data.
"""
assert start < end, "start must be before end"
Expand All @@ -141,7 +146,10 @@ def result_trafo(js: Json) -> Json:
js["at"] = utc_str(datetime.fromtimestamp(js["at"], timezone.utc))
return js

async with await self.db.aql_cursor(qs, bind_vars=bv, trafo=trafo or result_trafo) as crsr:
ttl = cast(Number, int(timeout.total_seconds())) if timeout else None
async with await self.db.aql_cursor(
query=qs, bind_vars=bv, batch_size=batch_size or 10_000, ttl=ttl, trafo=trafo or result_trafo
) as crsr:
return crsr

async def downsample(self, now: Optional[datetime] = None) -> Union[str, Json]:
Expand Down Expand Up @@ -184,6 +192,8 @@ def ts_format(ts: str, js: Json) -> Json:
c_end,
granularity=bucket.resolution,
trafo=partial(ts_format, ts.name),
batch_size=100_000, # The values are tiny. Use a large batch size.
timeout=timedelta(seconds=60),
)
]:
log.info(f"Compact {ts.name} bucket {bucket} to {len(ts_data)} entries (last={ts_bucket_last})")
Expand Down
8 changes: 4 additions & 4 deletions resotocore/resotocore/graph_manager/graph_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,22 @@ async def export_graph(self, graph_name: GraphName) -> AsyncIterator[str]:
yield to_js_str(metadata)

# 2. model collection
cursor = AsyncCursor(await tx.all(model_collection), None)
cursor = AsyncCursor(await tx.all(model_collection), query=model_collection)
async for doc in cursor:
yield dumps(doc)

# 3. vertex collection
cursor = AsyncCursor(await tx.all(vertex_collection), None)
cursor = AsyncCursor(await tx.all(vertex_collection), query=vertex_collection)
async for doc in cursor:
yield dumps(doc)

# 4. default edge collection
cursor = AsyncCursor(await tx.all(default_edge_collection), None)
cursor = AsyncCursor(await tx.all(default_edge_collection), query=default_edge_collection)
async for doc in cursor:
yield dumps(doc)

# 5. delete edge collection
cursor = AsyncCursor(await tx.all(delete_edge_collection), None)
cursor = AsyncCursor(await tx.all(delete_edge_collection), query=delete_edge_collection)
async for doc in cursor:
yield dumps(doc)

Expand Down

0 comments on commit d8e1342

Please sign in to comment.