diff --git a/apps/zui/src/core/query/run.ts b/apps/zui/src/core/query/run.ts index b1cbddfabe..90a0606071 100644 --- a/apps/zui/src/core/query/run.ts +++ b/apps/zui/src/core/query/run.ts @@ -34,19 +34,13 @@ function run(id: string): Thunk> { const paginatedQuery = Results.getPaginatedQuery(id)(getState()) try { - const res = await api.query(paginatedQuery, { - id, - tabId, - }) - res.collect(({rows, shapesMap}) => { - const values = isFirstPage ? [...rows] : [...prevVals, ...rows] - const shapes = isFirstPage - ? {...shapesMap} - : {...prevShapes, ...shapesMap} + const res = await api.query(paginatedQuery, {id, tabId}) + await res.collect(({rows, shapesMap}) => { + const values = isFirstPage ? rows : [...prevVals, ...rows] + const shapes = isFirstPage ? shapesMap : {...prevShapes, ...shapesMap} dispatch(Results.setValues({id, tabId, values})) dispatch(Results.setShapes({id, tabId, shapes})) }) - await res.promise dispatch(Results.success({id, tabId, count: res.rows.length})) return res } catch (e) { diff --git a/apps/zui/src/js/api/zui-api.ts b/apps/zui/src/js/api/zui-api.ts index e9098f7d47..d40cda9bf8 100644 --- a/apps/zui/src/js/api/zui-api.ts +++ b/apps/zui/src/js/api/zui-api.ts @@ -54,11 +54,7 @@ export default class ZuiApi { } createAbortable(tab?: string, tag?: string) { - try { - this.abortables.abort({tab, tag}) - } catch (e) { - console.log("Abort Handled", e) - } + this.abortables.abort({tab, tag}) const ctl = new AbortController() const id = this.abortables.add({ abort: () => ctl.abort(), @@ -74,7 +70,7 @@ export default class ZuiApi { const [signal, cleanup] = this.createAbortable(opts.tabId, opts.id) try { const resp = await zealot.query(body, {signal}) - resp.promise.finally(cleanup) + resp.on("success", cleanup) return resp } catch (e) { cleanup() diff --git a/apps/zui/src/views/histogram-pane/run-query.ts b/apps/zui/src/views/histogram-pane/run-query.ts index 3fce08d2f2..d3aec421e6 100644 --- a/apps/zui/src/views/histogram-pane/run-query.ts +++ b/apps/zui/src/views/histogram-pane/run-query.ts @@ -9,8 +9,18 @@ import ZuiApi from "src/js/api/zui-api" import {isAbortError} from "src/util/is-abort-error" export const HISTOGRAM_RESULTS = "histogram" +const POOL_RANGE = "pool-range" +const NULL_TIME_COUNT = "null-time-count" +const MISSING_TIME_COUNT = "missing-time-count" export async function runHistogramQuery(api: ZuiApi) { + // all these queries should maybe be attached to the same abort signal + // this would change the abortables api a bit + api.abortables.abort({tag: POOL_RANGE}) + api.abortables.abort({tag: NULL_TIME_COUNT}) + api.abortables.abort({tag: MISSING_TIME_COUNT}) + api.abortables.abort({tag: HISTOGRAM_RESULTS}) + const id = HISTOGRAM_RESULTS const tabId = api.current.tabId const key = api.current.location.key @@ -31,7 +41,7 @@ export async function runHistogramQuery(api: ZuiApi) { } function error(error: Error) { - if (isAbortError(error)) return success() + if (isAbortError(error)) return api.dispatch(Results.error({id, tabId, error: error.message})) } @@ -52,7 +62,7 @@ export async function runHistogramQuery(api: ZuiApi) { async function getPoolRange() { const query = `from ${poolId} | min(${timeField}), max(${timeField})` - const resp = await api.query(query, {id, tabId}) + const resp = await api.query(query, {id: POOL_RANGE, tabId}) const [{min, max}] = await resp.js() if (!(min instanceof Date && max instanceof Date)) return null return [min, max] as [Date, Date] @@ -61,19 +71,27 @@ export async function runHistogramQuery(api: ZuiApi) { async function getNullTimeCount() { // Newline after baseQuery in case it ends with a comment. const query = `${baseQuery}\n | ${timeField} == null | count()` - const id = "null-time-count" - const resp = await api.query(query, {id, tabId}) - const [count] = await resp.js() - api.dispatch(Histogram.setNullXCount(count ?? 0)) + try { + const resp = await api.query(query, {id: NULL_TIME_COUNT, tabId}) + const [count] = await resp.js() + api.dispatch(Histogram.setNullXCount(count ?? 0)) + } catch (e) { + if (isAbortError(e)) return + throw e + } } async function getMissingTimeCount() { // Newline after baseQuery in case it ends with a comment. const query = `${baseQuery}\n | !has(${timeField}) | count()` - const id = "missing-time-count" - const resp = await api.query(query, {id, tabId}) - const [count] = await resp.js() - api.dispatch(Histogram.setMissingXCount(count ?? 0)) + try { + const resp = await api.query(query, {id: MISSING_TIME_COUNT, tabId}) + const [count] = await resp.js() + api.dispatch(Histogram.setMissingXCount(count ?? 0)) + } catch (e) { + if (isAbortError(e)) return + throw e + } } async function run() { @@ -85,7 +103,7 @@ export async function runHistogramQuery(api: ZuiApi) { const interval = `${number}${timeUnits[unit]}` // Newline after baseQuery in case it ends with a comment. const query = `${baseQuery}\n | ${timeField} != null | count() by time := bucket(${timeField}, ${interval}), group := ${colorField} | sort time` - const resp = await api.query(query, {id, tabId}) + const resp = await api.query(query, {id: HISTOGRAM_RESULTS, tabId}) api.dispatch(Histogram.setInterval({unit, number, fn})) api.dispatch(Histogram.setRange(range)) resp.collect(collect) diff --git a/packages/zed-js/src/query/result-stream.ts b/packages/zed-js/src/query/result-stream.ts index c91d9a320c..f50dd313f0 100644 --- a/packages/zed-js/src/query/result-stream.ts +++ b/packages/zed-js/src/query/result-stream.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from 'events'; import { eachLine } from '../ndjson/lines'; import { JSOptions } from '../values/types'; import * as zjson from '../zjson'; @@ -5,14 +6,16 @@ import { Channel } from './channel'; import { Collector } from '../types'; import { IsoResponse } from '../client/types'; -export class ResultStream { +export class ResultStream extends EventEmitter { public status: 'idle' | 'pending' | 'error' | 'aborted' | 'success' = 'idle'; private currentChannelId: number | undefined; private channelsMap = new Map(); private _promise?: Promise; - constructor(public resp: IsoResponse, private ctl: AbortController) {} + constructor(public resp: IsoResponse, private ctl: AbortController) { + super(); + } get body() { return this.resp.body; @@ -80,9 +83,11 @@ export class ResultStream { this.consumeLine(json); } this.status = 'success'; + this.emit('success'); resolve(); - } catch (e) { + } catch (e: unknown) { if ( + (e instanceof Object && 'name' in e && e.name === 'AbortError') || (e instanceof DOMException && e.message.match(/user aborted/)) || (e instanceof Error && e.message.match(/context canceled/)) ) {