Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zeebe): support Job Corrections in complete command fixes #347 #348

Draft
wants to merge 4 commits into
base: alpha
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# CAMUNDA_CONNECTORS_VERSION=0.23.2
CAMUNDA_CONNECTORS_VERSION=8.5.0
CAMUNDA_OPTIMIZE_VERSION=8.5.0
CAMUNDA_PLATFORM_VERSION=8.6.0
CAMUNDA_ZEEBE_VERSION=8.6.3
CAMUNDA_PLATFORM_VERSION=8.7.0-alpha2
CAMUNDA_ZEEBE_VERSION=8.7.0-alpha2
CAMUNDA_WEB_MODELER_VERSION=8.5.0
ELASTIC_VERSION=8.9.0
KEYCLOAK_SERVER_VERSION=22.0.3
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/Worker-JobResult.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1ob4kc6" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.30.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.6.0">
<bpmn:process id="job-correction" name="Job Correction Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Job Correction Test">
<bpmn:outgoing>Flow_107kyon</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_107kyon" sourceRef="StartEvent_1" targetRef="Activity_01g5b4t" />
<bpmn:serviceTask id="Activity_01g5b4t" name="Job Correction Job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="job-correction" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_107kyon</bpmn:incoming>
<bpmn:outgoing>Flow_0y0j0w9</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:endEvent id="Event_1hmqn2y" name="Job Correction Test Completed Successfully">
<bpmn:incoming>Flow_0y0j0w9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0y0j0w9" sourceRef="Activity_01g5b4t" targetRef="Event_1hmqn2y" />
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="job-correction">
<bpmndi:BPMNShape id="StartEvent_1_di" bpmnElement="StartEvent_1">
<dc:Bounds x="182" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="163" y="145" width="74" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0s25xnu_di" bpmnElement="Activity_01g5b4t">
<dc:Bounds x="270" y="80" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hmqn2y_di" bpmnElement="Event_1hmqn2y">
<dc:Bounds x="422" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="402" y="145" width="77" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_107kyon_di" bpmnElement="Flow_107kyon">
<di:waypoint x="218" y="120" />
<di:waypoint x="270" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0y0j0w9_di" bpmnElement="Flow_0y0j0w9">
<di:waypoint x="370" y="120" />
<di:waypoint x="422" y="120" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
62 changes: 62 additions & 0 deletions src/__tests__/zeebe/integration/Worker-CompleteWithResult.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'
import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0'

jest.setTimeout(30000)
suppressZeebeLogging()

const zbc = new ZeebeGrpcClient()
let wf: CreateProcessInstanceResponse | undefined
let processDefinitionKey1: string
let bpmnProcessId1: string

beforeAll(async () => {
const res1 = await zbc.deployResource({
processFilename: './src/__tests__/testdata/Worker-JobResult.bpmn',
})
;({
processDefinitionKey: processDefinitionKey1,
bpmnProcessId: bpmnProcessId1,
} = res1.deployments[0].process)
await cancelProcesses(processDefinitionKey1)
})

afterEach(async () => {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey).catch((e) => e)
}
})

afterAll(async () => {
await zbc.close()
await cancelProcesses(processDefinitionKey1)
restoreZeebeLogging()
})

test('Can complete a task with job corrections', (done) => {
zbc
.createProcessInstance({
bpmnProcessId: bpmnProcessId1,
variables: {},
})
.then((res) => {
wf = res
zbc.createWorker({
taskType: 'job-correction',
taskHandler: async (job) => {
expect(job.processInstanceKey).toBe(wf?.processInstanceKey)
const res1 = await job.completeWithJobResult({
variables: {},
result: {
denied: true,
},
Comment on lines +49 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 Given the test case (the process has a service task for which this job worker code will run) this code may eventually break.

Job result is at this time only meant to be used for User Task Listeners.

For now, we allow regular job completions with a job result ignoring the result. However, in the future we may add validations against it.

🔧 As a more durable test case, please consider a Camunda User Task with a completing task listener. You can then try to complete the user task, which creates a job that you can activate and complete with a job result.

})
// @TODO: correction interface
done(null)
return res1
},
loglevel: 'NONE',
})
})
})
42 changes: 41 additions & 1 deletion src/proto/zeebe.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ option java_package = "io.camunda.zeebe.gateway.protocol";
option go_package = "./;pb";

// For a more complete documentation, refer to Zeebe documentation at:
// https://docs.camunda.io/docs/reference/grpc
// https://docs.camunda.io/docs/apis-tools/zeebe-api/gateway-service/

message StreamActivatedJobsRequest {
// the job type, as defined in the BPMN process (e.g. <zeebe:taskDefinition
Expand Down Expand Up @@ -101,6 +101,46 @@ message CompleteJobRequest {
int64 jobKey = 1;
// a JSON document representing the variables in the current task scope
string variables = 2;
// The result of the completed job as determined by the worker.
optional JobResult result = 3;
}

message JobResult{
// Indicates whether the worker denies the work, i.e. explicitly doesn't approve it.
// For example, a Task Listener can deny the completion of a task by setting this flag to true.
// In this example, the completion of a task is represented by a job that the worker can complete as denied.
// As a result, the completion request is rejected and the task remains active.
// Defaults to false.
optional bool denied = 1;
// Attributes that were corrected by the worker.
// The following attributes can be corrected, additional attributes will be ignored:
// * `assignee` - reset by providing an empty String
// * `dueDate` - reset by providing an empty String
// * `followUpDate` - reset by providing an empty String
// * `candidateGroups` - reset by providing an empty list
// * `candidateUsers` - reset by providing an empty list
// * `priority` - minimum 0, maximum 100, default 50
// Omitting any of the attributes will preserve the persisted attribute's value.
optional JobResultCorrections corrections = 2;
}

message JobResultCorrections {
// The assignee of the task.
optional string assignee = 1;
// The due date of the task.
optional string dueDate = 2;
// The follow-up date of the task.
optional string followUpDate = 3;
// The list of candidate users of the task.
optional StringList candidateUsers = 4;
// The list of candidate groups of the task.
optional StringList candidateGroups = 5;
// The priority of the task.
optional int32 priority = 6;
}

message StringList {
repeated string values = 1;
}

message CompleteJobResponse {
Expand Down
23 changes: 22 additions & 1 deletion src/zeebe/lib/ZBStreamWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
ZBGrpc,
ZBWorkerTaskHandler,
} from './interfaces-1.0'
import { ActivatedJob, StreamActivatedJobsRequest } from './interfaces-grpc-1.0'
import {
ActivatedJob,
CompleteJobRequest,
StreamActivatedJobsRequest,
} from './interfaces-grpc-1.0'

import { parseVariablesAndCustomHeadersToJSON } from '.'

Expand Down Expand Up @@ -195,13 +199,30 @@ You should call only one job action method in the worker handler. This is a bug
})
}

const correctJob =
(job: Job) => (req: Pick<CompleteJobRequest, 'result' | 'variables'>) =>
this.completeJob(
job.key,
{
result: req.result,
variables: req.variables,
},
taskType
)

const fail = failJob(thisJob)
const succeed = succeedJob(thisJob)
const completeWithResult = correctJob(thisJob)

return {
cancelWorkflow: cancelWorkflow(thisJob),
complete: errorMsgOnPriorMessageCall('job.complete', succeed),
error: errorMsgOnPriorMessageCall('error', errorJob(thisJob)),
fail: errorMsgOnPriorMessageCall('job.fail', fail),
completeWithJobResult: errorMsgOnPriorMessageCall(
'job.completeWithJobResult',
completeWithResult
),
forward: errorMsgOnPriorMessageCall('job.forward', () => {
return JOB_ACTION_ACKNOWLEDGEMENT
}),
Expand Down
22 changes: 21 additions & 1 deletion src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import * as ZB from './interfaces-1.0'
import {
ActivateJobsRequest,
ActivateJobsResponse,
CompleteJobRequest,
JobResult,
} from './interfaces-grpc-1.0'
import { ZBClientOptions } from './interfaces-published-contract'

Expand Down Expand Up @@ -340,6 +342,14 @@ You should call only one job action method in the worker handler. This is a bug
(completedVariables?: T) =>
this.completeJob(job.key, completedVariables ?? {})

const correctJob =
(job: ZB.Job<WorkerInputVariables, CustomHeaderShape>) =>
(req: Pick<CompleteJobRequest, 'result' | 'variables'>) =>
this.completeJob(job.key, {
result: req.result,
variables: req.variables,
})

const errorJob =
(job: ZB.Job<WorkerInputVariables, CustomHeaderShape>) =>
(e: string | ZB.ErrorJobWithVariables, errorMessage: string = '') => {
Expand All @@ -362,9 +372,14 @@ You should call only one job action method in the worker handler. This is a bug

const fail = failJob(thisJob)
const succeed = succeedJob(thisJob)
const completeWithResult = correctJob(thisJob)
return {
cancelWorkflow: cancelWorkflow(thisJob),
complete: errorMsgOnPriorMessageCall('job.complete', succeed),
completeWithJobResult: errorMsgOnPriorMessageCall(
'job.completeWithJobResult',
completeWithResult
),
error: errorMsgOnPriorMessageCall('error', errorJob(thisJob)),
fail: errorMsgOnPriorMessageCall('job.fail', fail),
forward: errorMsgOnPriorMessageCall('job.forward', () => {
Expand Down Expand Up @@ -402,11 +417,16 @@ You should call only one job action method in the worker handler. This is a bug
})
}

private completeJob(jobKey: string, completedVariables = {}) {
private completeJob(
jobKey: string,
completedVariables = {},
result: JobResult = {}
) {
return this.zbClient
.completeJob({
jobKey,
variables: completedVariables,
result,
})
.then((res) => {
this.logger.logDebug(`Completed job ${jobKey} for ${this.taskType}`)
Expand Down
6 changes: 6 additions & 0 deletions src/zeebe/lib/interfaces-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ export interface JobCompletionInterface<WorkerOutputVariables> {
complete: (
updatedVariables?: WorkerOutputVariables
) => Promise<JOB_ACTION_ACKNOWLEDGEMENT>
/**
* Complete the job with success, and a job result that may contain corrections. Since Camunda 8.7.
*/
completeWithJobResult: (
req: Pick<CompleteJobRequest, 'result' | 'variables'>
) => Promise<JOB_ACTION_ACKNOWLEDGEMENT>
/**
* Fail the job with an informative message as to the cause. Optionally, pass in a
* value remaining retries. If no value is passed for retries then the current retry
Expand Down
39 changes: 39 additions & 0 deletions src/zeebe/lib/interfaces-grpc-1.0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,45 @@ export interface ThrowErrorRequest {
export interface CompleteJobRequest<Variables = IProcessVariables> {
readonly jobKey: string
variables: Variables
/** Since Camunda 8.7 */
result?: JobResult
}

/**
* Since Camunda 8.7
*/
export interface JobResult {
// Indicates whether the worker denies the work, i.e. explicitly doesn't approve it.
// For example, a Task Listener can deny the completion of a task by setting this flag to true.
// In this example, the completion of a task is represented by a job that the worker can complete as denied.
// As a result, the completion request is rejected and the task remains active.
// Defaults to false.
denied?: boolean
// Attributes that were corrected by the worker.
// The following attributes can be corrected, additional attributes will be ignored:
// * `assignee` - reset by providing an empty String
// * `dueDate` - reset by providing an empty String
// * `followUpDate` - reset by providing an empty String
// * `candidateGroups` - reset by providing an empty list
// * `candidateUsers` - reset by providing an empty list
jwulf marked this conversation as resolved.
Show resolved Hide resolved
// * `priority` - minimum 0, maximum 100, default 50
// Omitting any of the attributes will preserve the persisted attribute's value.
corrections?: JobResultCorrections
}

export interface JobResultCorrections {
// The assignee of the task.
assignee?: string
// The due date of the task.
dueDate?: string
// The follow-up date of the task.
followUpDate?: string
// The list of candidate users of the task.
candidateUsers?: { values: string[] }
// The list of candidate groups of the task.
candidateGroups?: { values: string[] }
// The priority of the task.
priority?: number
}

interface SetVariablesRequestBase {
Expand Down
6 changes: 1 addition & 5 deletions src/zeebe/zb/ZBWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ export class ZBWorker<
)(
{
...job,
cancelWorkflow: workerCallback.cancelWorkflow,
complete: workerCallback.complete,
fail: workerCallback.fail,
error: workerCallback.error,
forward: workerCallback.forward,
...workerCallback,
},
this
)
Expand Down
Loading