Skip to content

Commit

Permalink
Merge pull request #368 from camunda/feat-366
Browse files Browse the repository at this point in the history
feat(zeebe): implement backoff on UNAUTHENTICATED error for workers
  • Loading branch information
jwulf authored Feb 4, 2025
2 parents fe2abfe + 56c3c78 commit 22ef706
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 30 deletions.
42 changes: 42 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug Jest Tests",
"program": "${workspaceFolder}/node_modules/jest/bin/jest",
"args": [
// "Worker-Backoff", // set to the name of the test file you want to run
"--runInBand",
"--watchAll=false"
],
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"env": {
"NODE_ENV": "test",
"CAMUNDA_UNIT_TEST": "true",
"ZEEBE_ADDRESS": "localhost:26500",
"ZEEBE_REST_ADDRESS": "http://localhost:8080",
"ZEEBE_GRPC_ADDRESS": "localhost:26500",
"ZEEBE_CLIENT_ID": "zeebe",
"ZEEBE_CLIENT_SECRET": "not_a_real_thing",
"CAMUNDA_OAUTH_URL": "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
"ZEEBE_AUTHORIZATION_SERVER_URL": "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
"CAMUNDA_TASKLIST_BASE_URL": "http://localhost:8082",
"CAMUNDA_OPERATE_BASE_URL": "http://localhost:8081",
"CAMUNDA_OPTIMIZE_BASE_URL": "http://localhost:8083",
"CAMUNDA_MODELER_BASE_URL": "http://localhost:8070/api",
"CAMUNDA_ZEEBE_OAUTH_AUDIENCE": "zeebe.camunda.io",
"CAMUNDA_TENANT_ID": "<default>",
"CAMUNDA_SECURE_CONNECTION": "false",
"ZEEBE_INSECURE_CONNECTION": "true",
"CAMUNDA_OAUTH_TOKEN_REFRESH_THRESHOLD_MS": "10000",
"CAMUNDA_AUTH_STRATEGY": "OAUTH",
"CAMUNDA_OPTIMIZE_OAUTH_AUDIENCE": "optimize-api",
"ZEEBE_TOKEN_AUDIENCE": "zeebe.camunda.io"
}
}
]
}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The configuration for the SDK can be done by any combination of environment vari

Any configuration passed in to the `Camunda8` constructor is merged over any configuration in the environment.

The configuration object fields and the environment variables have exactly the same names. See the file `src/lib/Configuration.ts` for a complete configuration outline.
The configuration object fields and the environment variables have exactly the same names. See the file [`src/lib/Configuration.ts`](https://github.com/camunda/camunda-8-js-sdk/blob/main/src/lib/Configuration.ts) for a complete list of configuration parameters that can be set via environment variables or constructor parameters.

## A note on how int64 is handled in the JavaScript SDK

Expand Down
66 changes: 66 additions & 0 deletions src/__tests__/zeebe/integration/Worker-Backoff.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'

/**
* This is a manually verified test. To check it, comment out the next line, then check the console output.
* You should see the error messages from the worker, and the backoff expanding the time between them.
*/
suppressZeebeLogging()

jest.setTimeout(30000)
afterAll(() => {
restoreZeebeLogging()
})

test('Will backoff on UNAUTHENTICATED', (done) => {
let durations = 0

const zbc = new ZeebeGrpcClient({
config: {
CAMUNDA_AUTH_STRATEGY: 'NONE',
CAMUNDA_LOG_LEVEL: 'DEBUG',
},
})

const w = zbc.createWorker({
taskType: 'unauthenticated-worker',
taskHandler: async () => {
throw new Error('Not Implemented') // is never called
},
})
w.on('backoff', (duration) => {
durations += duration
})
setTimeout(() => {
expect(durations).toBe(31000)
w.close()
done()
}, 25000)
})

test('Will use a supplied custom max backoff', (done) => {
let durations = 0

const zbc = new ZeebeGrpcClient({
config: {
CAMUNDA_AUTH_STRATEGY: 'NONE',
CAMUNDA_LOG_LEVEL: 'DEBUG',
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: 2000,
},
})

const w = zbc.createWorker({
taskType: 'unauthenticated-worker',
taskHandler: async () => {
throw new Error('Not Implemented') // is never called
},
})
w.on('backoff', (duration) => {
durations += duration
})
setTimeout(() => {
expect(durations).toBe(11000)
w.close()
done()
}, 10000)
})
6 changes: 6 additions & 0 deletions src/lib/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import { Logger } from '../c8/lib/C8Logger'

const getMainEnv = () =>
createEnv({
/** Maximum polling backoff time in milliseconds for Job Workers when an error is encountered. Defaults to 16000 (16 seconds). */
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: {
type: 'number',
optional: true,
default: 16000,
},
/** Custom user agent */
CAMUNDA_CUSTOM_USER_AGENT_STRING: {
type: 'string',
Expand Down
14 changes: 14 additions & 0 deletions src/zeebe/lib/ConnectionStatusEvent.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
export const ConnectionStatusEvent = {
close: 'close' as const,
/** This is a latched error event. It will fire once to signal the start of a error state. */
connectionError: 'connectionError' as const,
ready: 'ready' as const,
unknown: 'unknown' as const,
/** The worker is applying a backoff. The duration of the backoff in ms is passed as a parameter to any listener */
backoff: 'backoff' as const,
/** This is an unlatched error event. It will fire multiple times when an error state is encountered. */
streamError: 'streamError' as const,
}

export type ConnectionStatusEventMap = {
close: void
[ConnectionStatusEvent.connectionError]: void
[ConnectionStatusEvent.ready]: void
[ConnectionStatusEvent.unknown]: void
[ConnectionStatusEvent.backoff]: number
[ConnectionStatusEvent.streamError]: Error
}
4 changes: 3 additions & 1 deletion src/zeebe/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ export class GrpcClient extends EventEmitter {
}
}

let _error: GrpcStreamError | undefined
// Free the stream resources. When it emits 'end', we remove all listeners and destroy it.
stream.on('end', () => {
stream.removeAllListeners()
Expand Down Expand Up @@ -380,6 +381,7 @@ export class GrpcClient extends EventEmitter {
* streaming calls, and each worker, which only does streaming calls
*/
stream.on('error', (error: GrpcStreamError) => {
_error = error
clearTimeout(clientSideTimeout)
debug(`${methodName}Stream error emitted by stream`, error)
this.emit(MiddlewareSignals.Event.Error)
Expand Down Expand Up @@ -410,7 +412,7 @@ export class GrpcClient extends EventEmitter {
)
stream.on('end', () => clearTimeout(clientSideTimeout))

return stream
return _error ? { error: _error } : stream
}

this[`${methodName}Sync`] = (data) => {
Expand Down
10 changes: 5 additions & 5 deletions src/zeebe/lib/TypedEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ import { EventEmitter } from 'events'
type EventMap = Record<string, unknown>

type EventKey<T extends EventMap> = string & keyof T
type EventReceiver = () => void
type EventReceiver<T> = (params: T) => void

interface Emitter<T extends EventMap> {
on<K extends EventKey<T>>(eventName: K, fn: EventReceiver): void
off<K extends EventKey<T>>(eventName: K, fn: EventReceiver): void
on<K extends EventKey<T>>(eventName: K, fn: EventReceiver<T[K]>): void
off<K extends EventKey<T>>(eventName: K, fn: EventReceiver<T[K]>): void
emit<K extends EventKey<T>>(eventName: K, params?: T[K]): void
}

export class TypedEmitter<T extends EventMap> implements Emitter<T> {
private emitter = new EventEmitter()
public on<K extends EventKey<T>>(eventName: K, fn: EventReceiver) {
public on<K extends EventKey<T>>(eventName: K, fn: EventReceiver<T[K]>) {
this.emitter.on(eventName, fn)
return this
}

public off<K extends EventKey<T>>(eventName: K, fn: EventReceiver) {
public off<K extends EventKey<T>>(eventName: K, fn: EventReceiver<T[K]>) {
this.emitter.off(eventName, fn)
}

Expand Down
93 changes: 71 additions & 22 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import * as uuid from 'uuid'
import { LosslessDto } from '../../lib'
import { ZeebeGrpcClient } from '../zb/ZeebeGrpcClient'

import { ConnectionStatusEvent } from './ConnectionStatusEvent'
import {
ConnectionStatusEvent,
ConnectionStatusEventMap,
} from './ConnectionStatusEvent'
import { GrpcError } from './GrpcError'
import { StatefulLogInterceptor } from './StatefulLogInterceptor'
import { TypedEmitter } from './TypedEmitter'
Expand Down Expand Up @@ -68,7 +71,7 @@ export class ZBWorkerBase<
CustomHeaderShape = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
WorkerOutputVariables = any,
> extends TypedEmitter<typeof ConnectionStatusEvent> {
> extends TypedEmitter<ConnectionStatusEventMap> {
private static readonly DEFAULT_JOB_ACTIVATION_TIMEOUT =
Duration.seconds.of(60)
private static readonly DEFAULT_MAX_ACTIVE_JOBS = 32
Expand Down Expand Up @@ -113,6 +116,7 @@ export class ZBWorkerBase<
new (...args: any[]): CustomHeaderShape
}
private tenantIds: string[] | [string] | undefined
private CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: number

constructor({
grpcClient,
Expand Down Expand Up @@ -200,7 +204,8 @@ export class ZBWorkerBase<

this.logger = log
this.capacityEmitter = new EventEmitter()

this.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS =
options.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS! // We assert because it is optional in the explicit arguments, but hydrated from env config with a default
this.pollLoop = setInterval(
() => this.poll(),
Duration.milliseconds.from(this.pollInterval)
Expand Down Expand Up @@ -229,6 +234,7 @@ export class ZBWorkerBase<
if (this.activeJobs <= 0) {
await this.grpcClient.close(timeout)
this.grpcClient.removeAllListeners()
this.jobStream?.removeAllListeners()
this.jobStream?.cancel?.()
this.jobStream = undefined
this.logger.logDebug('Cancelled Job Stream')
Expand Down Expand Up @@ -460,6 +466,7 @@ You should call only one job action method in the worker handler. This is a bug
}

private handleStreamEnd = (id) => {
this.jobStream?.removeAllListeners()
this.jobStream = undefined
this.logger.logDebug(
`Deleted job stream [${id}] listeners and job stream reference`
Expand All @@ -468,7 +475,7 @@ You should call only one job action method in the worker handler. This is a bug

private async poll() {
const pollAlreadyInProgress = this.pollMutex || this.jobStream !== undefined
const workerIsClosing = this.closePromise !== undefined || this.closing
const workerIsClosing = !!this.closePromise || this.closing
const insufficientCapacityAvailable =
this.activeJobs > this.activeJobsThresholdForReactivation

Expand Down Expand Up @@ -497,45 +504,79 @@ You should call only one job action method in the worker handler. This is a bug
start
)

let doBackoff = false
if (jobStream.stream) {
this.logger.logDebug(`Stream [${id}] opened...`)
this.jobStream = jobStream.stream
// This event happens when the server cancels the call after the deadline
// And when it has completed a response with work
jobStream.stream.on('end', () => {
this.logger.logDebug(
`Stream [${id}] ended after ${(Date.now() - start) / 1000} seconds`
)
this.handleStreamEnd(id)
this.backPressureRetryCount = 0
})

jobStream.stream.on('error', (error) => {
this.pollMutex = true
this.logger.logDebug(
`Stream [${id}] error after ${(Date.now() - start) / 1000} seconds`,
error
)
const errorCode = (error as Error & { code: number }).code
// Backoff on
if (
(error as Error & { code: number }).code ===
GrpcError.RESOURCE_EXHAUSTED ||
(error as Error & { code: number }).code === GrpcError.INTERNAL
) {
setTimeout(
() => this.handleStreamEnd(id),
const backoffErrorCodes: number[] = [
GrpcError.RESOURCE_EXHAUSTED,
GrpcError.INTERNAL,
GrpcError.UNAUTHENTICATED,
]
doBackoff = backoffErrorCodes.includes(errorCode)
this.emit('streamError', error)
if (doBackoff) {
const backoffDuration = Math.min(
this.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS,
1000 * 2 ** this.backPressureRetryCount++
)

this.emit('backoff', backoffDuration)

setTimeout(() => {
this.handleStreamEnd(id)
this.pollMutex = false
}, backoffDuration)
} else {
this.handleStreamEnd(id)
this.pollMutex = false
}
})

// This event happens when the server cancels the call after the deadline
// And when it has completed a response with work
// And also after an error event.
jobStream.stream.on('end', () => {
this.logger.logDebug(
`Stream [${id}] ended after ${(Date.now() - start) / 1000} seconds`
)
this.handleStreamEnd(id)
})
}

if (jobStream.error) {
const error = jobStream.error?.message
const error = jobStream.error!.message
const message = 'Grpc Stream Error: '
const backoffErrorCodes: string[] = [
`${message}${GrpcError.RESOURCE_EXHAUSTED}`,
`${message}${GrpcError.INTERNAL}`,
`${message}${GrpcError.UNAUTHENTICATED}`,
]
doBackoff = backoffErrorCodes.map((e) => error.includes(e)).includes(true)
const backoffDuration = Math.min(
this.CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS,
1000 * 2 ** this.backPressureRetryCount++
)
this.logger.logError({ id, error })
if (doBackoff) {
this.emit('backoff', backoffDuration)
setTimeout(() => {
this.handleStreamEnd(id)
this.pollMutex = false
}, backoffDuration)
}
} else {
this.pollMutex = false
}
this.pollMutex = false
}

private async activateJobs(id: string) {
Expand All @@ -549,6 +590,7 @@ You should call only one job action method in the worker handler. This is a bug
closing: true as const,
}
}

if (this.debugMode) {
this.logger.logDebug('Activating Jobs...')
}
Expand Down Expand Up @@ -605,6 +647,13 @@ You should call only one job action method in the worker handler. This is a bug
if (this.closing) {
return
}
/**
* At this point we know that we have a working connection to the server, so we can reset the backpressure retry count.
* Putting it here means that if we have a lot of connection errors and increment the backpressure count,
* then get a connection, but no jobs are activated, and before any jobs are activated we get another error condition
* then the backoff will start not from 0, but from the level of backoff we were at previously.
*/
this.backPressureRetryCount = 0
this.activeJobs += res.jobs.length

Promise.all(
Expand Down
Loading

0 comments on commit 22ef706

Please sign in to comment.