From 56c3c78429e899aa139c92ac03bcf6d62e5d0850 Mon Sep 17 00:00:00 2001 From: Josh Wulf Date: Tue, 4 Feb 2025 13:06:15 +1300 Subject: [PATCH] feat(zeebe): implement backoff on UNAUTHENTICATED error for workers fixes #366 --- .vscode/launch.json | 42 +++++++++ README.md | 2 +- .../zeebe/integration/Worker-Backoff.spec.ts | 66 +++++++++++++ src/lib/Configuration.ts | 6 ++ src/zeebe/lib/ConnectionStatusEvent.ts | 14 +++ src/zeebe/lib/GrpcClient.ts | 4 +- src/zeebe/lib/TypedEmitter.ts | 10 +- src/zeebe/lib/ZBWorkerBase.ts | 93 ++++++++++++++----- src/zeebe/lib/interfaces-1.0.ts | 4 + src/zeebe/zb/ZeebeGrpcClient.ts | 7 +- 10 files changed, 218 insertions(+), 30 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 src/__tests__/zeebe/integration/Worker-Backoff.spec.ts diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..598cf88d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,42 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Debug Jest Tests", + "program": "${workspaceFolder}/node_modules/jest/bin/jest", + "args": [ + // "Worker-Backoff", // set to the name of the test file you want to run + "--runInBand", + "--watchAll=false" + ], + "cwd": "${workspaceFolder}", + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen", + "env": { + "NODE_ENV": "test", + "CAMUNDA_UNIT_TEST": "true", + "ZEEBE_ADDRESS": "localhost:26500", + "ZEEBE_REST_ADDRESS": "http://localhost:8080", + "ZEEBE_GRPC_ADDRESS": "localhost:26500", + "ZEEBE_CLIENT_ID": "zeebe", + "ZEEBE_CLIENT_SECRET": "not_a_real_thing", + "CAMUNDA_OAUTH_URL": "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token", + "ZEEBE_AUTHORIZATION_SERVER_URL": "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token", + "CAMUNDA_TASKLIST_BASE_URL": "http://localhost:8082", + "CAMUNDA_OPERATE_BASE_URL": "http://localhost:8081", + "CAMUNDA_OPTIMIZE_BASE_URL": "http://localhost:8083", + "CAMUNDA_MODELER_BASE_URL": "http://localhost:8070/api", + "CAMUNDA_ZEEBE_OAUTH_AUDIENCE": "zeebe.camunda.io", + "CAMUNDA_TENANT_ID": "", + "CAMUNDA_SECURE_CONNECTION": "false", + "ZEEBE_INSECURE_CONNECTION": "true", + "CAMUNDA_OAUTH_TOKEN_REFRESH_THRESHOLD_MS": "10000", + "CAMUNDA_AUTH_STRATEGY": "OAUTH", + "CAMUNDA_OPTIMIZE_OAUTH_AUDIENCE": "optimize-api", + "ZEEBE_TOKEN_AUDIENCE": "zeebe.camunda.io" + } + } + ] +} diff --git a/README.md b/README.md index c1bfeb6e..2f07dd09 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ The configuration for the SDK can be done by any combination of environment vari Any configuration passed in to the `Camunda8` constructor is merged over any configuration in the environment. -The configuration object fields and the environment variables have exactly the same names. See the file `src/lib/Configuration.ts` for a complete configuration outline. +The configuration object fields and the environment variables have exactly the same names. See the file [`src/lib/Configuration.ts`](https://github.com/camunda/camunda-8-js-sdk/blob/main/src/lib/Configuration.ts) for a complete list of configuration parameters that can be set via environment variables or constructor parameters. ## A note on how int64 is handled in the JavaScript SDK diff --git a/src/__tests__/zeebe/integration/Worker-Backoff.spec.ts b/src/__tests__/zeebe/integration/Worker-Backoff.spec.ts new file mode 100644 index 00000000..369c428c --- /dev/null +++ b/src/__tests__/zeebe/integration/Worker-Backoff.spec.ts @@ -0,0 +1,66 @@ +import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib' +import { ZeebeGrpcClient } from '../../../zeebe' + +/** + * This is a manually verified test. To check it, comment out the next line, then check the console output. + * You should see the error messages from the worker, and the backoff expanding the time between them. + */ +suppressZeebeLogging() + +jest.setTimeout(30000) +afterAll(() => { + restoreZeebeLogging() +}) + +test('Will backoff on UNAUTHENTICATED', (done) => { + let durations = 0 + + const zbc = new ZeebeGrpcClient({ + config: { + CAMUNDA_AUTH_STRATEGY: 'NONE', + CAMUNDA_LOG_LEVEL: 'DEBUG', + }, + }) + + const w = zbc.createWorker({ + taskType: 'unauthenticated-worker', + taskHandler: async () => { + throw new Error('Not Implemented') // is never called + }, + }) + w.on('backoff', (duration) => { + durations += duration + }) + setTimeout(() => { + expect(durations).toBe(31000) + w.close() + done() + }, 25000) +}) + +test('Will use a supplied custom max backoff', (done) => { + let durations = 0 + + const zbc = new ZeebeGrpcClient({ + config: { + CAMUNDA_AUTH_STRATEGY: 'NONE', + CAMUNDA_LOG_LEVEL: 'DEBUG', + CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: 2000, + }, + }) + + const w = zbc.createWorker({ + taskType: 'unauthenticated-worker', + taskHandler: async () => { + throw new Error('Not Implemented') // is never called + }, + }) + w.on('backoff', (duration) => { + durations += duration + }) + setTimeout(() => { + expect(durations).toBe(11000) + w.close() + done() + }, 10000) +}) diff --git a/src/lib/Configuration.ts b/src/lib/Configuration.ts index 6c4df6b8..a13267cf 100644 --- a/src/lib/Configuration.ts +++ b/src/lib/Configuration.ts @@ -6,6 +6,12 @@ import { Logger } from '../c8/lib/C8Logger' const getMainEnv = () => createEnv({ + /** Maximum polling backoff time in milliseconds for Job Workers when an error is encountered. Defaults to 16000 (16 seconds). */ + CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: { + type: 'number', + optional: true, + default: 16000, + }, /** Custom user agent */ CAMUNDA_CUSTOM_USER_AGENT_STRING: { type: 'string', diff --git a/src/zeebe/lib/ConnectionStatusEvent.ts b/src/zeebe/lib/ConnectionStatusEvent.ts index eda864f3..925952ed 100644 --- a/src/zeebe/lib/ConnectionStatusEvent.ts +++ b/src/zeebe/lib/ConnectionStatusEvent.ts @@ -1,6 +1,20 @@ export const ConnectionStatusEvent = { close: 'close' as const, + /** This is a latched error event. It will fire once to signal the start of a error state. */ connectionError: 'connectionError' as const, ready: 'ready' as const, unknown: 'unknown' as const, + /** The worker is applying a backoff. The duration of the backoff in ms is passed as a parameter to any listener */ + backoff: 'backoff' as const, + /** This is an unlatched error event. It will fire multiple times when an error state is encountered. */ + streamError: 'streamError' as const, +} + +export type ConnectionStatusEventMap = { + close: void + [ConnectionStatusEvent.connectionError]: void + [ConnectionStatusEvent.ready]: void + [ConnectionStatusEvent.unknown]: void + [ConnectionStatusEvent.backoff]: number + [ConnectionStatusEvent.streamError]: Error } diff --git a/src/zeebe/lib/GrpcClient.ts b/src/zeebe/lib/GrpcClient.ts index b3f0ba76..40273768 100644 --- a/src/zeebe/lib/GrpcClient.ts +++ b/src/zeebe/lib/GrpcClient.ts @@ -339,6 +339,7 @@ export class GrpcClient extends EventEmitter { } } + let _error: GrpcStreamError | undefined // Free the stream resources. When it emits 'end', we remove all listeners and destroy it. stream.on('end', () => { stream.removeAllListeners() @@ -380,6 +381,7 @@ export class GrpcClient extends EventEmitter { * streaming calls, and each worker, which only does streaming calls */ stream.on('error', (error: GrpcStreamError) => { + _error = error clearTimeout(clientSideTimeout) debug(`${methodName}Stream error emitted by stream`, error) this.emit(MiddlewareSignals.Event.Error) @@ -410,7 +412,7 @@ export class GrpcClient extends EventEmitter { ) stream.on('end', () => clearTimeout(clientSideTimeout)) - return stream + return _error ? { error: _error } : stream } this[`${methodName}Sync`] = (data) => { diff --git a/src/zeebe/lib/TypedEmitter.ts b/src/zeebe/lib/TypedEmitter.ts index 61ce1f07..5607c9ad 100644 --- a/src/zeebe/lib/TypedEmitter.ts +++ b/src/zeebe/lib/TypedEmitter.ts @@ -3,22 +3,22 @@ import { EventEmitter } from 'events' type EventMap = Record type EventKey = string & keyof T -type EventReceiver = () => void +type EventReceiver = (params: T) => void interface Emitter { - on>(eventName: K, fn: EventReceiver): void - off>(eventName: K, fn: EventReceiver): void + on>(eventName: K, fn: EventReceiver): void + off>(eventName: K, fn: EventReceiver): void emit>(eventName: K, params?: T[K]): void } export class TypedEmitter implements Emitter { private emitter = new EventEmitter() - public on>(eventName: K, fn: EventReceiver) { + public on>(eventName: K, fn: EventReceiver) { this.emitter.on(eventName, fn) return this } - public off>(eventName: K, fn: EventReceiver) { + public off>(eventName: K, fn: EventReceiver) { this.emitter.off(eventName, fn) } diff --git a/src/zeebe/lib/ZBWorkerBase.ts b/src/zeebe/lib/ZBWorkerBase.ts index bfc814b9..872cb7fe 100644 --- a/src/zeebe/lib/ZBWorkerBase.ts +++ b/src/zeebe/lib/ZBWorkerBase.ts @@ -12,7 +12,10 @@ import * as uuid from 'uuid' import { LosslessDto } from '../../lib' import { ZeebeGrpcClient } from '../zb/ZeebeGrpcClient' -import { ConnectionStatusEvent } from './ConnectionStatusEvent' +import { + ConnectionStatusEvent, + ConnectionStatusEventMap, +} from './ConnectionStatusEvent' import { GrpcError } from './GrpcError' import { StatefulLogInterceptor } from './StatefulLogInterceptor' import { TypedEmitter } from './TypedEmitter' @@ -68,7 +71,7 @@ export class ZBWorkerBase< CustomHeaderShape = any, // eslint-disable-next-line @typescript-eslint/no-explicit-any WorkerOutputVariables = any, -> extends TypedEmitter { +> extends TypedEmitter { private static readonly DEFAULT_JOB_ACTIVATION_TIMEOUT = Duration.seconds.of(60) private static readonly DEFAULT_MAX_ACTIVE_JOBS = 32 @@ -113,6 +116,7 @@ export class ZBWorkerBase< new (...args: any[]): CustomHeaderShape } private tenantIds: string[] | [string] | undefined + private CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: number constructor({ grpcClient, @@ -200,7 +204,8 @@ export class ZBWorkerBase< this.logger = log this.capacityEmitter = new EventEmitter() - + this.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS = + options.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS! // We assert because it is optional in the explicit arguments, but hydrated from env config with a default this.pollLoop = setInterval( () => this.poll(), Duration.milliseconds.from(this.pollInterval) @@ -229,6 +234,7 @@ export class ZBWorkerBase< if (this.activeJobs <= 0) { await this.grpcClient.close(timeout) this.grpcClient.removeAllListeners() + this.jobStream?.removeAllListeners() this.jobStream?.cancel?.() this.jobStream = undefined this.logger.logDebug('Cancelled Job Stream') @@ -460,6 +466,7 @@ You should call only one job action method in the worker handler. This is a bug } private handleStreamEnd = (id) => { + this.jobStream?.removeAllListeners() this.jobStream = undefined this.logger.logDebug( `Deleted job stream [${id}] listeners and job stream reference` @@ -468,7 +475,7 @@ You should call only one job action method in the worker handler. This is a bug private async poll() { const pollAlreadyInProgress = this.pollMutex || this.jobStream !== undefined - const workerIsClosing = this.closePromise !== undefined || this.closing + const workerIsClosing = !!this.closePromise || this.closing const insufficientCapacityAvailable = this.activeJobs > this.activeJobsThresholdForReactivation @@ -497,45 +504,79 @@ You should call only one job action method in the worker handler. This is a bug start ) + let doBackoff = false if (jobStream.stream) { this.logger.logDebug(`Stream [${id}] opened...`) this.jobStream = jobStream.stream - // This event happens when the server cancels the call after the deadline - // And when it has completed a response with work - jobStream.stream.on('end', () => { - this.logger.logDebug( - `Stream [${id}] ended after ${(Date.now() - start) / 1000} seconds` - ) - this.handleStreamEnd(id) - this.backPressureRetryCount = 0 - }) jobStream.stream.on('error', (error) => { + this.pollMutex = true this.logger.logDebug( `Stream [${id}] error after ${(Date.now() - start) / 1000} seconds`, error ) + const errorCode = (error as Error & { code: number }).code // Backoff on - if ( - (error as Error & { code: number }).code === - GrpcError.RESOURCE_EXHAUSTED || - (error as Error & { code: number }).code === GrpcError.INTERNAL - ) { - setTimeout( - () => this.handleStreamEnd(id), + const backoffErrorCodes: number[] = [ + GrpcError.RESOURCE_EXHAUSTED, + GrpcError.INTERNAL, + GrpcError.UNAUTHENTICATED, + ] + doBackoff = backoffErrorCodes.includes(errorCode) + this.emit('streamError', error) + if (doBackoff) { + const backoffDuration = Math.min( + this.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS, 1000 * 2 ** this.backPressureRetryCount++ ) + + this.emit('backoff', backoffDuration) + + setTimeout(() => { + this.handleStreamEnd(id) + this.pollMutex = false + }, backoffDuration) } else { this.handleStreamEnd(id) + this.pollMutex = false } }) + + // This event happens when the server cancels the call after the deadline + // And when it has completed a response with work + // And also after an error event. + jobStream.stream.on('end', () => { + this.logger.logDebug( + `Stream [${id}] ended after ${(Date.now() - start) / 1000} seconds` + ) + this.handleStreamEnd(id) + }) } if (jobStream.error) { - const error = jobStream.error?.message + const error = jobStream.error!.message + const message = 'Grpc Stream Error: ' + const backoffErrorCodes: string[] = [ + `${message}${GrpcError.RESOURCE_EXHAUSTED}`, + `${message}${GrpcError.INTERNAL}`, + `${message}${GrpcError.UNAUTHENTICATED}`, + ] + doBackoff = backoffErrorCodes.map((e) => error.includes(e)).includes(true) + const backoffDuration = Math.min( + this.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS, + 1000 * 2 ** this.backPressureRetryCount++ + ) this.logger.logError({ id, error }) + if (doBackoff) { + this.emit('backoff', backoffDuration) + setTimeout(() => { + this.handleStreamEnd(id) + this.pollMutex = false + }, backoffDuration) + } + } else { + this.pollMutex = false } - this.pollMutex = false } private async activateJobs(id: string) { @@ -549,6 +590,7 @@ You should call only one job action method in the worker handler. This is a bug closing: true as const, } } + if (this.debugMode) { this.logger.logDebug('Activating Jobs...') } @@ -605,6 +647,13 @@ You should call only one job action method in the worker handler. This is a bug if (this.closing) { return } + /** + * At this point we know that we have a working connection to the server, so we can reset the backpressure retry count. + * Putting it here means that if we have a lot of connection errors and increment the backpressure count, + * then get a connection, but no jobs are activated, and before any jobs are activated we get another error condition + * then the backoff will start not from 0, but from the level of backoff we were at previously. + */ + this.backPressureRetryCount = 0 this.activeJobs += res.jobs.length Promise.all( diff --git a/src/zeebe/lib/interfaces-1.0.ts b/src/zeebe/lib/interfaces-1.0.ts index 979bd692..fff2e0dd 100644 --- a/src/zeebe/lib/interfaces-1.0.ts +++ b/src/zeebe/lib/interfaces-1.0.ts @@ -358,6 +358,10 @@ export interface ZBWorkerOptions { * Enable debug tracking */ debug?: boolean + /** + * Maximum backoff time in milliseconds + */ + CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS?: number } export const JOB_ACTION_ACKNOWLEDGEMENT = 'JOB_ACTION_ACKNOWLEDGEMENT' as const diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index bfeda8e9..41fef722 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -458,7 +458,12 @@ export class ZeebeGrpcClient extends TypedEmitter< id: config.id || null, idColor, log, - options: { ...this.options, ...options }, + options: { + ...this.options, + ...options, + CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: + this.config.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS, + }, taskHandler: config.taskHandler, taskType: config.taskType, zbClient: this,