Skip to content

Commit

Permalink
Fix Abort Problems
Browse files Browse the repository at this point in the history
  • Loading branch information
jameskerr committed Feb 6, 2024
1 parent d02507a commit 668f78d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 30 deletions.
14 changes: 4 additions & 10 deletions apps/zui/src/core/query/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,13 @@ function run(id: string): Thunk<Promise<ResultStream | null>> {
const paginatedQuery = Results.getPaginatedQuery(id)(getState())

try {
const res = await api.query(paginatedQuery, {
id,
tabId,
})
res.collect(({rows, shapesMap}) => {
const values = isFirstPage ? [...rows] : [...prevVals, ...rows]
const shapes = isFirstPage
? {...shapesMap}
: {...prevShapes, ...shapesMap}
const res = await api.query(paginatedQuery, {id, tabId})
await res.collect(({rows, shapesMap}) => {
const values = isFirstPage ? rows : [...prevVals, ...rows]
const shapes = isFirstPage ? shapesMap : {...prevShapes, ...shapesMap}
dispatch(Results.setValues({id, tabId, values}))
dispatch(Results.setShapes({id, tabId, shapes}))
})
await res.promise
dispatch(Results.success({id, tabId, count: res.rows.length}))
return res
} catch (e) {
Expand Down
8 changes: 2 additions & 6 deletions apps/zui/src/js/api/zui-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ export default class ZuiApi {
}

createAbortable(tab?: string, tag?: string) {
try {
this.abortables.abort({tab, tag})
} catch (e) {
console.log("Abort Handled", e)
}
this.abortables.abort({tab, tag})
const ctl = new AbortController()
const id = this.abortables.add({
abort: () => ctl.abort(),
Expand All @@ -74,7 +70,7 @@ export default class ZuiApi {
const [signal, cleanup] = this.createAbortable(opts.tabId, opts.id)
try {
const resp = await zealot.query(body, {signal})
resp.promise.finally(cleanup)
resp.on("success", cleanup)
return resp
} catch (e) {
cleanup()
Expand Down
40 changes: 29 additions & 11 deletions apps/zui/src/views/histogram-pane/run-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,18 @@ import ZuiApi from "src/js/api/zui-api"
import {isAbortError} from "src/util/is-abort-error"

export const HISTOGRAM_RESULTS = "histogram"
const POOL_RANGE = "pool-range"
const NULL_TIME_COUNT = "null-time-count"
const MISSING_TIME_COUNT = "missing-time-count"

export async function runHistogramQuery(api: ZuiApi) {
// all these queries should maybe be attached to the same abort signal
// this would change the abortables api a bit
api.abortables.abort({tag: POOL_RANGE})
api.abortables.abort({tag: NULL_TIME_COUNT})
api.abortables.abort({tag: MISSING_TIME_COUNT})
api.abortables.abort({tag: HISTOGRAM_RESULTS})

const id = HISTOGRAM_RESULTS
const tabId = api.current.tabId
const key = api.current.location.key
Expand All @@ -31,7 +41,7 @@ export async function runHistogramQuery(api: ZuiApi) {
}

function error(error: Error) {
if (isAbortError(error)) return success()
if (isAbortError(error)) return
api.dispatch(Results.error({id, tabId, error: error.message}))
}

Expand All @@ -52,7 +62,7 @@ export async function runHistogramQuery(api: ZuiApi) {

async function getPoolRange() {
const query = `from ${poolId} | min(${timeField}), max(${timeField})`
const resp = await api.query(query, {id, tabId})
const resp = await api.query(query, {id: POOL_RANGE, tabId})
const [{min, max}] = await resp.js()
if (!(min instanceof Date && max instanceof Date)) return null
return [min, max] as [Date, Date]
Expand All @@ -61,19 +71,27 @@ export async function runHistogramQuery(api: ZuiApi) {
async function getNullTimeCount() {
// Newline after baseQuery in case it ends with a comment.
const query = `${baseQuery}\n | ${timeField} == null | count()`
const id = "null-time-count"
const resp = await api.query(query, {id, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setNullXCount(count ?? 0))
try {
const resp = await api.query(query, {id: NULL_TIME_COUNT, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setNullXCount(count ?? 0))
} catch (e) {
if (isAbortError(e)) return
throw e
}
}

async function getMissingTimeCount() {
// Newline after baseQuery in case it ends with a comment.
const query = `${baseQuery}\n | !has(${timeField}) | count()`
const id = "missing-time-count"
const resp = await api.query(query, {id, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setMissingXCount(count ?? 0))
try {
const resp = await api.query(query, {id: MISSING_TIME_COUNT, tabId})
const [count] = await resp.js()
api.dispatch(Histogram.setMissingXCount(count ?? 0))
} catch (e) {
if (isAbortError(e)) return
throw e
}
}

async function run() {
Expand All @@ -85,7 +103,7 @@ export async function runHistogramQuery(api: ZuiApi) {
const interval = `${number}${timeUnits[unit]}`
// Newline after baseQuery in case it ends with a comment.
const query = `${baseQuery}\n | ${timeField} != null | count() by time := bucket(${timeField}, ${interval}), group := ${colorField} | sort time`
const resp = await api.query(query, {id, tabId})
const resp = await api.query(query, {id: HISTOGRAM_RESULTS, tabId})
api.dispatch(Histogram.setInterval({unit, number, fn}))
api.dispatch(Histogram.setRange(range))
resp.collect(collect)
Expand Down
11 changes: 8 additions & 3 deletions packages/zed-js/src/query/result-stream.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { EventEmitter } from 'events';
import { eachLine } from '../ndjson/lines';
import { JSOptions } from '../values/types';
import * as zjson from '../zjson';
import { Channel } from './channel';
import { Collector } from '../types';
import { IsoResponse } from '../client/types';

export class ResultStream {
export class ResultStream extends EventEmitter {
public status: 'idle' | 'pending' | 'error' | 'aborted' | 'success' = 'idle';

private currentChannelId: number | undefined;
private channelsMap = new Map<number, Channel>();
private _promise?: Promise<void>;

constructor(public resp: IsoResponse, private ctl: AbortController) {}
constructor(public resp: IsoResponse, private ctl: AbortController) {
super();
}

get body() {
return this.resp.body;
Expand Down Expand Up @@ -80,9 +83,11 @@ export class ResultStream {
this.consumeLine(json);
}
this.status = 'success';
this.emit('success');
resolve();
} catch (e) {
} catch (e: unknown) {
if (
(e instanceof Object && 'name' in e && e.name === 'AbortError') ||
(e instanceof DOMException && e.message.match(/user aborted/)) ||
(e instanceof Error && e.message.match(/context canceled/))
) {
Expand Down

0 comments on commit 668f78d

Please sign in to comment.