From 7198ed26e228db61bd7aba058ae053e48bd1b192 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 30 Jun 2024 13:44:25 -0700 Subject: [PATCH 01/10] update from Orkes --- build.gradle | 6 +- .../annotations/protogen/ProtoEnum.java | 26 ++++ .../annotations/protogen/ProtoField.java | 36 +++++ .../annotations/protogen/ProtoMessage.java | 51 +++++++ .../config/ObjectMapperConfiguration.java | 2 - .../common/config/ObjectMapperProvider.java | 12 +- .../conductor/common/metadata/Auditable.java | 4 +- .../conductor/common/metadata/BaseDef.java | 1 + .../conductor/common/metadata/SchemaDef.java | 49 +++++++ .../common/metadata/acl/Permission.java | 1 + .../common/metadata/events/EventHandler.java | 132 +++++++++++++++++- .../conductor/common/metadata/tasks/Task.java | 23 ++- .../common/metadata/tasks/TaskDef.java | 56 +++++++- .../workflow/IdempotencyStrategy.java | 18 +++ .../metadata/workflow/RateLimitConfig.java | 41 ++++++ .../workflow/StartWorkflowRequest.java | 36 +++++ .../metadata/workflow/StateChangeEvent.java | 47 +++++++ .../metadata/workflow/SubWorkflowParams.java | 82 +++++++---- .../workflow/UpgradeWorkflowRequest.java | 69 +++++++++ .../common/metadata/workflow/WorkflowDef.java | 105 +++++++++++--- .../metadata/workflow/WorkflowTask.java | 93 +++++++++--- .../conductor/common/run/Workflow.java | 110 +++++++-------- .../conductor/common/run/WorkflowSummary.java | 17 ++- .../conductor/common/utils/TaskUtils.java | 17 +++ .../common/validation/ErrorResponse.java | 11 ++ .../conductor/common/tasks/TaskDefTest.java | 23 +-- .../conductor/common/tasks/TaskTest.java | 2 +- .../workflow/SubWorkflowParamsTest.java | 66 +++++---- .../workflow/WorkflowDefValidatorTest.java | 7 +- .../service/MetadataServiceTest.java | 4 +- .../WorkflowTaskTypeConstraintTest.java | 20 --- 31 files changed, 945 insertions(+), 222 deletions(-) create mode 100644 common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoEnum.java create mode 100644 common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoField.java create mode 100644 common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoMessage.java create mode 100644 common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java create mode 100644 common/src/main/java/com/netflix/conductor/common/metadata/workflow/IdempotencyStrategy.java create mode 100644 common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java create mode 100644 common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java create mode 100644 common/src/main/java/com/netflix/conductor/common/metadata/workflow/UpgradeWorkflowRequest.java diff --git a/build.gradle b/build.gradle index 67084b75d..8a134574c 100644 --- a/build.gradle +++ b/build.gradle @@ -73,13 +73,17 @@ allprojects { implementation('org.apache.logging.log4j:log4j-api') implementation('org.apache.logging.log4j:log4j-slf4j-impl') implementation('org.apache.logging.log4j:log4j-jul') - implementation('org.apache.logging.log4j:log4j-web') + implementation('org.apache.logging.log4j:log4j-web') + compileOnly 'org.projectlombok:lombok:1.18.34' + + annotationProcessor 'org.projectlombok:lombok:1.18.34' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' testImplementation('org.springframework.boot:spring-boot-starter-test') testImplementation('org.springframework.boot:spring-boot-starter-log4j2') testImplementation 'junit:junit' testImplementation "org.junit.vintage:junit-vintage-engine" + testAnnotationProcessor 'org.projectlombok:lombok:1.18.34' } // processes additional configuration metadata json file as described here diff --git a/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoEnum.java b/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoEnum.java new file mode 100644 index 000000000..c07e679f7 --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoEnum.java @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.annotations.protogen; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * ProtoEnum annotates an enum type that will be exposed via the GRPC API as a native Protocol + * Buffers enum. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ProtoEnum {} diff --git a/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoField.java b/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoField.java new file mode 100644 index 000000000..a61bb5ea1 --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoField.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.annotations.protogen; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * ProtoField annotates a field inside an struct with metadata on how to expose it on its + * corresponding Protocol Buffers struct. For a field to be exposed in a ProtoBuf struct, the + * containing struct must also be annotated with a {@link ProtoMessage} or {@link ProtoEnum} tag. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface ProtoField { + /** + * Mandatory. Sets the Protocol Buffer ID for this specific field. Once a field has been + * annotated with a given ID, the ID can never change to a different value or the resulting + * Protocol Buffer struct will not be backwards compatible. + * + * @return the numeric ID for the field + */ + int id(); +} diff --git a/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoMessage.java b/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoMessage.java new file mode 100644 index 000000000..45fa884f9 --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/annotations/protogen/ProtoMessage.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.annotations.protogen; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * ProtoMessage annotates a given Java class so it becomes exposed via the GRPC API as a native + * Protocol Buffers struct. The annotated class must be a POJO. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ProtoMessage { + /** + * Sets whether the generated mapping code will contain a helper to translate the POJO for this + * class into the equivalent ProtoBuf object. + * + * @return whether this class will generate a mapper to ProtoBuf objects + */ + boolean toProto() default true; + + /** + * Sets whether the generated mapping code will contain a helper to translate the ProtoBuf + * object for this class into the equivalent POJO. + * + * @return whether this class will generate a mapper from ProtoBuf objects + */ + boolean fromProto() default true; + + /** + * Sets whether this is a wrapper class that will be used to encapsulate complex nested type + * interfaces. Wrapper classes are not directly exposed by the ProtoBuf API and must be mapped + * manually. + * + * @return whether this is a wrapper class + */ + boolean wrapper() default false; +} diff --git a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperConfiguration.java b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperConfiguration.java index 9d698cb1b..a256e6882 100644 --- a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperConfiguration.java +++ b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperConfiguration.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import jakarta.annotation.PostConstruct; @Configuration @@ -34,6 +33,5 @@ public void customizeDefaultObjectMapper() { objectMapper.setDefaultPropertyInclusion( JsonInclude.Value.construct( JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS)); - objectMapper.registerModule(new AfterburnerModule()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java index 51ebfc8cf..7f2b64d96 100644 --- a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java +++ b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java @@ -17,7 +17,8 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; /** * A Factory class for creating a customized {@link ObjectMapper}. This is only used by the @@ -26,6 +27,8 @@ */ public class ObjectMapperProvider { + private static final ObjectMapper objectMapper = _getObjectMapper(); + /** * The customizations in this method are configured using {@link * org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration} @@ -39,6 +42,10 @@ public class ObjectMapperProvider { * @see org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration */ public ObjectMapper getObjectMapper() { + return objectMapper; + } + + private static ObjectMapper _getObjectMapper() { final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); @@ -46,8 +53,9 @@ public ObjectMapper getObjectMapper() { objectMapper.setDefaultPropertyInclusion( JsonInclude.Value.construct( JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS)); + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.registerModule(new JsonProtoModule()); - objectMapper.registerModule(new AfterburnerModule()); + objectMapper.registerModule(new JavaTimeModule()); return objectMapper; } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/Auditable.java b/common/src/main/java/com/netflix/conductor/common/metadata/Auditable.java index fcdfdf9fa..bef2e1792 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/Auditable.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/Auditable.java @@ -42,7 +42,7 @@ public void setOwnerApp(String ownerApp) { * @return the createTime */ public Long getCreateTime() { - return createTime; + return createTime == null ? 0 : createTime; } /** @@ -56,7 +56,7 @@ public void setCreateTime(Long createTime) { * @return the updateTime */ public Long getUpdateTime() { - return updateTime; + return updateTime == null ? 0 : updateTime; } /** diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/BaseDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/BaseDef.java index 7fec07255..fac1d1047 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/BaseDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/BaseDef.java @@ -22,6 +22,7 @@ * A base class for {@link com.netflix.conductor.common.metadata.workflow.WorkflowDef} and {@link * com.netflix.conductor.common.metadata.tasks.TaskDef}. */ +@Deprecated public abstract class BaseDef extends Auditable { private final Map accessPolicy = new EnumMap<>(Permission.class); diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java new file mode 100644 index 000000000..a90f87b3c --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata; + +import java.util.Map; + +import jakarta.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@EqualsAndHashCode(callSuper = true) +@Builder +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SchemaDef extends Auditable { + + public enum Type { + JSON, + AVRO, + PROTOBUF + } + + @NotNull private String name; + + @NotNull @Builder.Default private int version = 1; + + @NotNull private Type type; + + // Schema definition stored here + private Map data; + + // Externalized schema definition (eg. via AVRO, Protobuf registry) + // If using Orkes Schema registry, this points to the name of the schema in the registry + private String externalRef; +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/acl/Permission.java b/common/src/main/java/com/netflix/conductor/common/metadata/acl/Permission.java index dfcc77571..a87c89953 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/acl/Permission.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/acl/Permission.java @@ -15,6 +15,7 @@ import com.netflix.conductor.annotations.protogen.ProtoEnum; @ProtoEnum +@Deprecated public enum Permission { OWNER, OPERATOR diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java b/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java index 0321c85b7..56817315a 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java @@ -146,7 +146,9 @@ public static class Action { public enum Type { start_workflow, complete_task, - fail_task + fail_task, + terminate_workflow, + update_workflow_variables } @ProtoField(id = 1) @@ -164,6 +166,12 @@ public enum Type { @ProtoField(id = 5) private boolean expandInlineJSON; + @ProtoField(id = 6) + private TerminateWorkflow terminate_workflow; + + @ProtoField(id = 7) + private UpdateWorkflowVariables update_workflow_variables; + /** * @return the action */ @@ -234,6 +242,35 @@ public void setExpandInlineJSON(boolean expandInlineJSON) { public boolean isExpandInlineJSON() { return expandInlineJSON; } + + /** + * @return the terminate_workflow + */ + public TerminateWorkflow getTerminate_workflow() { + return terminate_workflow; + } + + /** + * @param terminate_workflow the terminate_workflow to set + */ + public void setTerminate_workflow(TerminateWorkflow terminate_workflow) { + this.terminate_workflow = terminate_workflow; + } + + /** + * @return the update_workflow_variables + */ + public UpdateWorkflowVariables getUpdate_workflow_variables() { + return update_workflow_variables; + } + + /** + * @param update_workflow_variables the update_workflow_variables to set + */ + public void setUpdate_workflow_variables( + UpdateWorkflowVariables update_workflow_variables) { + this.update_workflow_variables = update_workflow_variables; + } } @ProtoMessage @@ -414,4 +451,97 @@ public void setTaskToDomain(Map taskToDomain) { this.taskToDomain = taskToDomain; } } + + @ProtoMessage + public static class TerminateWorkflow { + + @ProtoField(id = 1) + private String workflowId; + + @ProtoField(id = 2) + private String terminationReason; + + /** + * @return the workflowId + */ + public String getWorkflowId() { + return workflowId; + } + + /** + * @param workflowId the workflowId to set + */ + public void setWorkflowId(String workflowId) { + this.workflowId = workflowId; + } + + /** + * @return the reasonForTermination + */ + public String getTerminationReason() { + return terminationReason; + } + + /** + * @param terminationReason the reasonForTermination to set + */ + public void setTerminationReason(String terminationReason) { + this.terminationReason = terminationReason; + } + } + + @ProtoMessage + public static class UpdateWorkflowVariables { + + @ProtoField(id = 1) + private String workflowId; + + @ProtoField(id = 2) + private Map variables; + + @ProtoField(id = 3) + private Boolean appendArray; + + /** + * @return the workflowId + */ + public String getWorkflowId() { + return workflowId; + } + + /** + * @param workflowId the workflowId to set + */ + public void setWorkflowId(String workflowId) { + this.workflowId = workflowId; + } + + /** + * @return the variables + */ + public Map getVariables() { + return variables; + } + + /** + * @param variables the variables to set + */ + public void setVariables(Map variables) { + this.variables = variables; + } + + /** + * @return appendArray + */ + public Boolean isAppendArray() { + return appendArray; + } + + /** + * @param appendArray the appendArray to set + */ + public void setAppendArray(Boolean appendArray) { + this.appendArray = appendArray; + } + } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index ea98133f3..495ff06a9 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -24,7 +24,6 @@ import com.netflix.conductor.annotations.protogen.ProtoMessage; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.protobuf.Any; import io.swagger.v3.oas.annotations.Hidden; @@ -203,6 +202,9 @@ public boolean isRetriable() { @ProtoField(id = 42) private boolean subworkflowChanged; + // If the task is an event associated with a parent task, the id of the parent task + private String parentTaskId; + public Task() {} /** @@ -630,7 +632,6 @@ public void setOutputMessage(Any outputMessage) { /** * @return {@link Optional} containing the task definition if available */ - @JsonIgnore public Optional getTaskDefinition() { return Optional.ofNullable(this.getWorkflowTask()).map(WorkflowTask::getTaskDefinition); } @@ -756,6 +757,14 @@ public void setSubWorkflowId(String subWorkflowId) { } } + public String getParentTaskId() { + return parentTaskId; + } + + public void setParentTaskId(String parentTaskId) { + this.parentTaskId = parentTaskId; + } + public Task copy() { Task copy = new Task(); copy.setCallbackAfterSeconds(callbackAfterSeconds); @@ -788,7 +797,7 @@ public Task copy() { copy.setIsolationGroupId(isolationGroupId); copy.setSubWorkflowId(getSubWorkflowId()); copy.setSubworkflowChanged(subworkflowChanged); - + copy.setParentTaskId(parentTaskId); return copy; } @@ -809,7 +818,7 @@ public Task deepCopy() { deepCopy.setWorkerId(workerId); deepCopy.setReasonForIncompletion(reasonForIncompletion); deepCopy.setSeq(seq); - + deepCopy.setParentTaskId(parentTaskId); return deepCopy; } @@ -963,7 +972,8 @@ && getWorkflowPriority() == task.getWorkflowPriority() getExternalOutputPayloadStoragePath(), task.getExternalOutputPayloadStoragePath()) && Objects.equals(getIsolationGroupId(), task.getIsolationGroupId()) - && Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace()); + && Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace()) + && Objects.equals(getParentTaskId(), task.getParentTaskId()); } @Override @@ -1005,6 +1015,7 @@ public int hashCode() { getExternalInputPayloadStoragePath(), getExternalOutputPayloadStoragePath(), getIsolationGroupId(), - getExecutionNameSpace()); + getExecutionNameSpace(), + getParentTaskId()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java index f6d5964d7..7e4357604 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java @@ -23,10 +23,10 @@ import com.netflix.conductor.annotations.protogen.ProtoMessage; import com.netflix.conductor.common.constraints.OwnerEmailMandatoryConstraint; import com.netflix.conductor.common.constraints.TaskTimeoutConstraint; -import com.netflix.conductor.common.metadata.BaseDef; +import com.netflix.conductor.common.metadata.Auditable; +import com.netflix.conductor.common.metadata.SchemaDef; import jakarta.validation.Valid; -import jakarta.validation.constraints.Email; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -34,7 +34,7 @@ @ProtoMessage @TaskTimeoutConstraint @Valid -public class TaskDef extends BaseDef { +public class TaskDef extends Auditable { @ProtoEnum public enum TimeoutPolicy { @@ -114,7 +114,6 @@ public enum RetryLogic { @ProtoField(id = 18) @OwnerEmailMandatoryConstraint - @Email(message = "ownerEmail should be valid email address") private String ownerEmail; @ProtoField(id = 19) @@ -125,6 +124,13 @@ public enum RetryLogic { @Min(value = 1, message = "Backoff scale factor. Applicable for LINEAR_BACKOFF") private Integer backoffScaleFactor = 1; + @ProtoField(id = 21) + private String baseType; + + private SchemaDef inputSchema; + private SchemaDef outputSchema; + private boolean enforceSchema; + public TaskDef() {} public TaskDef(String name) { @@ -426,6 +432,38 @@ public Integer getBackoffScaleFactor() { return backoffScaleFactor; } + public String getBaseType() { + return baseType; + } + + public void setBaseType(String baseType) { + this.baseType = baseType; + } + + public SchemaDef getInputSchema() { + return inputSchema; + } + + public void setInputSchema(SchemaDef inputSchema) { + this.inputSchema = inputSchema; + } + + public SchemaDef getOutputSchema() { + return outputSchema; + } + + public void setOutputSchema(SchemaDef outputSchema) { + this.outputSchema = outputSchema; + } + + public boolean isEnforceSchema() { + return enforceSchema; + } + + public void setEnforceSchema(boolean enforceSchema) { + this.enforceSchema = enforceSchema; + } + @Override public String toString() { return name; @@ -456,7 +494,10 @@ && getRetryLogic() == taskDef.getRetryLogic() && Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) && Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) && Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) - && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()); + && Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail()) + && Objects.equals(getBaseType(), taskDef.getBaseType()) + && Objects.equals(getInputSchema(), taskDef.getInputSchema()) + && Objects.equals(getOutputSchema(), taskDef.getOutputSchema()); } @Override @@ -479,6 +520,9 @@ public int hashCode() { getInputTemplate(), getIsolationGroupId(), getExecutionNameSpace(), - getOwnerEmail()); + getOwnerEmail(), + getBaseType(), + getInputSchema(), + getOutputSchema()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/IdempotencyStrategy.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/IdempotencyStrategy.java new file mode 100644 index 000000000..4b9ebd60b --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/IdempotencyStrategy.java @@ -0,0 +1,18 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.workflow; + +public enum IdempotencyStrategy { + FAIL, + RETURN_EXISTING +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java new file mode 100644 index 000000000..fba785f9c --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.workflow; + +/** Rate limit configuration for workflows */ +public class RateLimitConfig { + /** + * Key that defines the rate limit. Rate limit key is a combination of workflow payload such as + * name, or correlationId etc. + */ + private String rateLimitKey; + + /** Number of concurrently running workflows that are allowed per key */ + private int concurrentExecLimit; + + public String getRateLimitKey() { + return rateLimitKey; + } + + public void setRateLimitKey(String rateLimitKey) { + this.rateLimitKey = rateLimitKey; + } + + public int getConcurrentExecLimit() { + return concurrentExecLimit; + } + + public void setConcurrentExecLimit(int concurrentExecLimit) { + this.concurrentExecLimit = concurrentExecLimit; + } +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java index fc8f83af7..9d76533d6 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java @@ -54,6 +54,29 @@ public class StartWorkflowRequest { @Max(value = 99, message = "priority: ${validatedValue} should be maximum {value}") private Integer priority = 0; + @ProtoField(id = 9) + private String createdBy; + + private String idempotencyKey; + + private IdempotencyStrategy idempotencyStrategy; + + public String getIdempotencyKey() { + return idempotencyKey; + } + + public void setIdempotencyKey(String idempotencyKey) { + this.idempotencyKey = idempotencyKey; + } + + public IdempotencyStrategy getIdempotencyStrategy() { + return idempotencyStrategy; + } + + public void setIdempotencyStrategy(IdempotencyStrategy idempotencyStrategy) { + this.idempotencyStrategy = idempotencyStrategy; + } + public String getName() { return name; } @@ -158,4 +181,17 @@ public StartWorkflowRequest withWorkflowDef(WorkflowDef workflowDef) { this.workflowDef = workflowDef; return this; } + + public String getCreatedBy() { + return createdBy; + } + + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + + public StartWorkflowRequest withCreatedBy(String createdBy) { + this.createdBy = createdBy; + return this; + } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java new file mode 100644 index 000000000..8d0d6a01d --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.workflow; + +import java.util.Map; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +@Valid +public class StateChangeEvent { + + @NotNull private String type; + + private Map payload; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Map getPayload() { + return payload; + } + + public void setPayload(Map payload) { + this.payload = payload; + } + + @Override + public String toString() { + return "StateChangeEvent{" + "type='" + type + '\'' + ", payload=" + payload + '}'; + } +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/SubWorkflowParams.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/SubWorkflowParams.java index d2fbb6f3b..66040b593 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/SubWorkflowParams.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/SubWorkflowParams.java @@ -12,23 +12,22 @@ */ package com.netflix.conductor.common.metadata.workflow; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import com.netflix.conductor.annotations.protogen.ProtoField; import com.netflix.conductor.annotations.protogen.ProtoMessage; +import com.netflix.conductor.common.utils.TaskUtils; import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSetter; -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; @ProtoMessage public class SubWorkflowParams { @ProtoField(id = 1) - @NotNull(message = "SubWorkflowParams name cannot be null") - @NotEmpty(message = "SubWorkflowParams name cannot be empty") private String name; @ProtoField(id = 2) @@ -42,15 +41,36 @@ public class SubWorkflowParams { @ProtoField(id = 4) private Object workflowDefinition; + private String idempotencyKey; + + private IdempotencyStrategy idempotencyStrategy; + + public String getIdempotencyKey() { + return idempotencyKey; + } + + public void setIdempotencyKey(String idempotencyKey) { + this.idempotencyKey = idempotencyKey; + } + + public IdempotencyStrategy getIdempotencyStrategy() { + return idempotencyStrategy; + } + + public void setIdempotencyStrategy(IdempotencyStrategy idempotencyStrategy) { + this.idempotencyStrategy = idempotencyStrategy; + } + /** * @return the name */ public String getName() { if (workflowDefinition != null) { - return getWorkflowDef().getName(); - } else { - return name; + if (workflowDefinition instanceof WorkflowDef) { + return ((WorkflowDef) workflowDefinition).getName(); + } } + return name; } /** @@ -65,10 +85,11 @@ public void setName(String name) { */ public Integer getVersion() { if (workflowDefinition != null) { - return getWorkflowDef().getVersion(); - } else { - return version; + if (workflowDefinition instanceof WorkflowDef) { + return ((WorkflowDef) workflowDefinition).getVersion(); + } } + return version; } /** @@ -95,14 +116,19 @@ public void setTaskToDomain(Map taskToDomain) { /** * @return the workflowDefinition as an Object */ + @JsonGetter("workflowDefinition") public Object getWorkflowDefinition() { return workflowDefinition; } - /** - * @return the workflowDefinition as a WorkflowDef - */ - @JsonGetter("workflowDefinition") + @Deprecated + @JsonIgnore + public void setWorkflowDef(WorkflowDef workflowDef) { + this.setWorkflowDefinition(workflowDef); + } + + @Deprecated + @JsonIgnore public WorkflowDef getWorkflowDef() { return (WorkflowDef) workflowDefinition; } @@ -110,20 +136,26 @@ public WorkflowDef getWorkflowDef() { /** * @param workflowDef the workflowDefinition to set */ + @JsonSetter("workflowDefinition") public void setWorkflowDefinition(Object workflowDef) { - if (!(workflowDef == null || workflowDef instanceof WorkflowDef)) { + if (workflowDef == null) { + this.workflowDefinition = workflowDef; + } else if (workflowDef instanceof WorkflowDef) { + this.workflowDefinition = workflowDef; + } else if (workflowDef instanceof String) { + if (!(((String) workflowDef).startsWith("${")) + || !(((String) workflowDef).endsWith("}"))) { + throw new IllegalArgumentException( + "workflowDefinition is a string, but not a valid DSL string"); + } else { + this.workflowDefinition = workflowDef; + } + } else if (workflowDef instanceof LinkedHashMap) { + this.workflowDefinition = TaskUtils.convertToWorkflowDef(workflowDef); + } else { throw new IllegalArgumentException( - "workflowDefinition must be either null or WorkflowDef"); + "workflowDefinition must be either null, or WorkflowDef, or a valid DSL string"); } - this.workflowDefinition = workflowDef; - } - - /** - * @param workflowDef the workflowDefinition to set - */ - @JsonSetter("workflowDefinition") - public void setWorkflowDef(WorkflowDef workflowDef) { - this.workflowDefinition = workflowDef; } @Override diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/UpgradeWorkflowRequest.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/UpgradeWorkflowRequest.java new file mode 100644 index 000000000..a33b16874 --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/UpgradeWorkflowRequest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.workflow; + +import java.util.Map; + +import com.netflix.conductor.annotations.protogen.ProtoField; +import com.netflix.conductor.annotations.protogen.ProtoMessage; + +import jakarta.validation.constraints.NotNull; + +@ProtoMessage +public class UpgradeWorkflowRequest { + + public Map getTaskOutput() { + return taskOutput; + } + + public void setTaskOutput(Map taskOutput) { + this.taskOutput = taskOutput; + } + + public Map getWorkflowInput() { + return workflowInput; + } + + public void setWorkflowInput(Map workflowInput) { + this.workflowInput = workflowInput; + } + + @ProtoField(id = 4) + private Map taskOutput; + + @ProtoField(id = 3) + private Map workflowInput; + + @ProtoField(id = 2) + private Integer version; + + @NotNull(message = "Workflow name cannot be null or empty") + @ProtoField(id = 1) + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getVersion() { + return version; + } + + public void setVersion(Integer version) { + this.version = version; + } +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index 02c4d0149..6b56c61a3 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -12,12 +12,7 @@ */ package com.netflix.conductor.common.metadata.workflow; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import com.netflix.conductor.annotations.protogen.ProtoEnum; import com.netflix.conductor.annotations.protogen.ProtoField; @@ -25,19 +20,16 @@ import com.netflix.conductor.common.constraints.NoSemiColonConstraint; import com.netflix.conductor.common.constraints.OwnerEmailMandatoryConstraint; import com.netflix.conductor.common.constraints.TaskReferenceNameUniqueConstraint; -import com.netflix.conductor.common.metadata.BaseDef; +import com.netflix.conductor.common.metadata.Auditable; +import com.netflix.conductor.common.metadata.SchemaDef; import com.netflix.conductor.common.metadata.tasks.TaskType; -import jakarta.validation.Valid; -import jakarta.validation.constraints.Email; -import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; +import jakarta.validation.*; +import jakarta.validation.constraints.*; @ProtoMessage @TaskReferenceNameUniqueConstraint -public class WorkflowDef extends BaseDef { +public class WorkflowDef extends Auditable { @ProtoEnum public enum TimeoutPolicy { @@ -76,7 +68,7 @@ public enum TimeoutPolicy { @Max(value = 2, message = "workflowDef schemaVersion: {value} is only supported") private int schemaVersion = 2; - // By default, a workflow is restartable + // By default a workflow is restartable @ProtoField(id = 9) private boolean restartable = true; @@ -85,7 +77,6 @@ public enum TimeoutPolicy { @ProtoField(id = 11) @OwnerEmailMandatoryConstraint - @Email(message = "ownerEmail should be valid email address") private String ownerEmail; @ProtoField(id = 12) @@ -101,6 +92,28 @@ public enum TimeoutPolicy { @ProtoField(id = 15) private Map inputTemplate = new HashMap<>(); + @ProtoField(id = 16) + private String workflowStatusListenerSink; + + @ProtoField(id = 17) + private RateLimitConfig rateLimitConfig; + + @ProtoField(id = 18) + private SchemaDef inputSchema; + + @ProtoField(id = 19) + private SchemaDef outputSchema; + + public boolean isEnforceSchema() { + return enforceSchema; + } + + public void setEnforceSchema(boolean enforceSchema) { + this.enforceSchema = enforceSchema; + } + + private boolean enforceSchema = true; + /** * @return the name */ @@ -321,6 +334,38 @@ public static String getKey(String name, int version) { return name + "." + version; } + public String getWorkflowStatusListenerSink() { + return workflowStatusListenerSink; + } + + public void setWorkflowStatusListenerSink(String workflowStatusListenerSink) { + this.workflowStatusListenerSink = workflowStatusListenerSink; + } + + public RateLimitConfig getRateLimitConfig() { + return rateLimitConfig; + } + + public void setRateLimitConfig(RateLimitConfig rateLimitConfig) { + this.rateLimitConfig = rateLimitConfig; + } + + public SchemaDef getInputSchema() { + return inputSchema; + } + + public void setInputSchema(SchemaDef inputSchema) { + this.inputSchema = inputSchema; + } + + public SchemaDef getOutputSchema() { + return outputSchema; + } + + public void setOutputSchema(SchemaDef outputSchema) { + this.outputSchema = outputSchema; + } + public boolean containsType(String taskType) { return collectTasks().stream().anyMatch(t -> t.getType().equals(taskType)); } @@ -393,7 +438,9 @@ && getSchemaVersion() == that.getSchemaVersion() && Objects.equals(getOutputParameters(), that.getOutputParameters()) && Objects.equals(getFailureWorkflow(), that.getFailureWorkflow()) && Objects.equals(getOwnerEmail(), that.getOwnerEmail()) - && Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds()); + && Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds()) + && Objects.equals(getInputSchema(), that.getInputSchema()) + && Objects.equals(getOutputSchema(), that.getOutputSchema()); } @Override @@ -408,7 +455,9 @@ public int hashCode() { getFailureWorkflow(), getSchemaVersion(), getOwnerEmail(), - getTimeoutSeconds()); + getTimeoutSeconds(), + getInputSchema(), + getOutputSchema()); } @Override @@ -437,8 +486,28 @@ public String toString() { + restartable + ", workflowStatusListenerEnabled=" + workflowStatusListenerEnabled + + ", ownerEmail='" + + ownerEmail + + '\'' + + ", timeoutPolicy=" + + timeoutPolicy + ", timeoutSeconds=" + timeoutSeconds + + ", variables=" + + variables + + ", inputTemplate=" + + inputTemplate + + ", workflowStatusListenerSink='" + + workflowStatusListenerSink + + '\'' + + ", rateLimitConfig=" + + rateLimitConfig + + ", inputSchema=" + + inputSchema + + ", outputSchema=" + + outputSchema + + ", enforceSchema=" + + enforceSchema + '}'; } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index b0734ce26..88c83f364 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -26,10 +26,10 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonSetter; import jakarta.validation.Valid; -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.PositiveOrZero; +import jakarta.validation.constraints.*; /** * This is the task definition definied as part of the {@link WorkflowDef}. The tasks definied in @@ -86,7 +86,6 @@ public void setTasks(List tasks) { // Populates for the tasks of the decision type @ProtoField(id = 9) - @JsonInclude(JsonInclude.Include.NON_EMPTY) private Map> decisionCases = new LinkedHashMap<>(); @Deprecated private String dynamicForkJoinTasksParam; @@ -98,11 +97,9 @@ public void setTasks(List tasks) { private String dynamicForkTasksInputParamName; @ProtoField(id = 12) - @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<@Valid WorkflowTask> defaultCase = new LinkedList<>(); @ProtoField(id = 13) - @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<@Valid List<@Valid WorkflowTask>> forkTasks = new LinkedList<>(); @ProtoField(id = 14) @@ -114,7 +111,6 @@ public void setTasks(List tasks) { private SubWorkflowParams subWorkflowParam; @ProtoField(id = 16) - @JsonInclude(JsonInclude.Include.NON_EMPTY) private List joinOn = new LinkedList<>(); @ProtoField(id = 17) @@ -130,7 +126,6 @@ public void setTasks(List tasks) { private Boolean rateLimited; @ProtoField(id = 21) - @JsonInclude(JsonInclude.Include.NON_EMPTY) private List defaultExclusiveJoinTask = new LinkedList<>(); @ProtoField(id = 23) @@ -140,7 +135,6 @@ public void setTasks(List tasks) { private String loopCondition; @ProtoField(id = 25) - @JsonInclude(JsonInclude.Include.NON_EMPTY) private List loopOver = new LinkedList<>(); @ProtoField(id = 26) @@ -153,7 +147,40 @@ public void setTasks(List tasks) { private String expression; @ProtoField(id = 29) - private boolean permissive = false; + private String joinStatus; + + @ProtoField(id = 30) + private boolean permissive; + + public static class CacheConfig { + + private String key; + private int ttlInSecond; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public int getTtlInSecond() { + return ttlInSecond; + } + + public void setTtlInSecond(int ttlInSecond) { + this.ttlInSecond = ttlInSecond; + } + } + + private CacheConfig cacheConfig; + + /* + Map of events to be emitted when the task status changed. + key can be comma separated values of the status changes prefixed with "on" + */ + private @Valid Map> onStateChange = new HashMap<>(); /** * @return the name @@ -390,9 +417,18 @@ public void setScriptExpression(String expression) { this.scriptExpression = expression; } + public CacheConfig getCacheConfig() { + return cacheConfig; + } + + public void setCacheConfig(CacheConfig cacheConfig) { + this.cacheConfig = cacheConfig; + } + /** * @return the subWorkflow */ + @JsonGetter public SubWorkflowParams getSubWorkflowParam() { return subWorkflowParam; } @@ -400,6 +436,7 @@ public SubWorkflowParams getSubWorkflowParam() { /** * @param subWorkflow the subWorkflowParam to set */ + @JsonSetter public void setSubWorkflowParam(SubWorkflowParams subWorkflow) { this.subWorkflowParam = subWorkflow; } @@ -550,18 +587,18 @@ public void setExpression(String expression) { this.expression = expression; } - /** - * @return If the task is permissive. When set to true, and the task is in failed status, - * fail-fast does not occur. The workflow execution continues until reaching join or end of - * workflow, allowing idempotent execution of other tasks. - */ + public String getJoinStatus() { + return joinStatus; + } + + public void setJoinStatus(String joinStatus) { + this.joinStatus = joinStatus; + } + public boolean isPermissive() { - return this.permissive; + return permissive; } - /** - * @param permissive when set to true, the task is marked as permissive - */ public void setPermissive(boolean permissive) { this.permissive = permissive; } @@ -713,6 +750,14 @@ public WorkflowTask get(String taskReferenceName) { return null; } + public Map> getOnStateChange() { + return onStateChange; + } + + public void setOnStateChange(Map> onStateChange) { + this.onStateChange = onStateChange; + } + @Override public String toString() { return name + "/" + taskReferenceName; @@ -750,10 +795,14 @@ && isOptional() == that.isOptional() && Objects.equals(getForkTasks(), that.getForkTasks()) && Objects.equals(getSubWorkflowParam(), that.getSubWorkflowParam()) && Objects.equals(getJoinOn(), that.getJoinOn()) + && Objects.equals(getJoinStatus(), that.getJoinStatus()) && Objects.equals(getSink(), that.getSink()) && Objects.equals(isAsyncComplete(), that.isAsyncComplete()) && Objects.equals(getDefaultExclusiveJoinTask(), that.getDefaultExclusiveJoinTask()) - && Objects.equals(getRetryCount(), that.getRetryCount()); + && Objects.equals(getRetryCount(), that.getRetryCount()) + && Objects.equals(getCacheConfig(), that.getCacheConfig()) + && Objects.equals(isPermissive(), that.isPermissive()) + && Objects.equals(getOnStateChange(), that.getOnStateChange()); } @Override @@ -779,10 +828,14 @@ public int hashCode() { getStartDelay(), getSubWorkflowParam(), getJoinOn(), + getJoinStatus(), getSink(), isAsyncComplete(), isOptional(), getDefaultExclusiveJoinTask(), + getOnStateChange(), + getCacheConfig(), + isPermissive(), getRetryCount()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java index 26a8b5598..866d01a5e 100644 --- a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java +++ b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java @@ -126,8 +126,47 @@ public boolean isSuccessful() { @ProtoField(id = 25) private Set failedTaskNames = new HashSet<>(); + @ProtoField(id = 26) + private List history = new LinkedList<>(); + + private String idempotencyKey; + private String rateLimitKey; + private boolean rateLimited; + public Workflow() {} + public String getIdempotencyKey() { + return idempotencyKey; + } + + public void setIdempotencyKey(String idempotencyKey) { + this.idempotencyKey = idempotencyKey; + } + + public String getRateLimitKey() { + return rateLimitKey; + } + + public void setRateLimitKey(String rateLimitKey) { + this.rateLimitKey = rateLimitKey; + } + + public boolean isRateLimited() { + return rateLimited; + } + + public void setRateLimited(boolean rateLimited) { + this.rateLimited = rateLimited; + } + + public List getHistory() { + return history; + } + + public void setHistory(List history) { + this.history = history; + } + /** * @return the status */ @@ -326,14 +365,6 @@ public void setFailedReferenceTaskNames(Set failedReferenceTaskNames) { this.failedReferenceTaskNames = failedReferenceTaskNames; } - public Set getFailedTaskNames() { - return failedTaskNames; - } - - public void setFailedTaskNames(Set failedTaskNames) { - this.failedTaskNames = failedTaskNames; - } - public WorkflowDef getWorkflowDefinition() { return workflowDefinition; } @@ -447,6 +478,14 @@ public boolean hasParent() { return StringUtils.isNotEmpty(parentWorkflowId); } + public Set getFailedTaskNames() { + return failedTaskNames; + } + + public void setFailedTaskNames(Set failedTaskNames) { + this.failedTaskNames = failedTaskNames; + } + public Task getTaskByRefName(String refName) { if (refName == null) { throw new RuntimeException( @@ -495,7 +534,6 @@ public Workflow copy() { copy.setLastRetriedTime(lastRetriedTime); copy.setTaskToDomain(taskToDomain); copy.setFailedReferenceTaskNames(failedReferenceTaskNames); - copy.setFailedTaskNames(failedTaskNames); copy.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath); copy.setExternalOutputPayloadStoragePath(externalOutputPayloadStoragePath); return copy; @@ -527,61 +565,11 @@ public boolean equals(Object o) { return false; } Workflow workflow = (Workflow) o; - return getEndTime() == workflow.getEndTime() - && getWorkflowVersion() == workflow.getWorkflowVersion() - && getStatus() == workflow.getStatus() - && Objects.equals(getWorkflowId(), workflow.getWorkflowId()) - && Objects.equals(getParentWorkflowId(), workflow.getParentWorkflowId()) - && Objects.equals(getParentWorkflowTaskId(), workflow.getParentWorkflowTaskId()) - && Objects.equals(getTasks(), workflow.getTasks()) - && Objects.equals(getInput(), workflow.getInput()) - && Objects.equals(getOutput(), workflow.getOutput()) - && Objects.equals(getWorkflowName(), workflow.getWorkflowName()) - && Objects.equals(getCorrelationId(), workflow.getCorrelationId()) - && Objects.equals(getReRunFromWorkflowId(), workflow.getReRunFromWorkflowId()) - && Objects.equals(getReasonForIncompletion(), workflow.getReasonForIncompletion()) - && Objects.equals(getEvent(), workflow.getEvent()) - && Objects.equals(getTaskToDomain(), workflow.getTaskToDomain()) - && Objects.equals( - getFailedReferenceTaskNames(), workflow.getFailedReferenceTaskNames()) - && Objects.equals(getFailedTaskNames(), workflow.getFailedTaskNames()) - && Objects.equals( - getExternalInputPayloadStoragePath(), - workflow.getExternalInputPayloadStoragePath()) - && Objects.equals( - getExternalOutputPayloadStoragePath(), - workflow.getExternalOutputPayloadStoragePath()) - && Objects.equals(getPriority(), workflow.getPriority()) - && Objects.equals(getWorkflowDefinition(), workflow.getWorkflowDefinition()) - && Objects.equals(getVariables(), workflow.getVariables()) - && Objects.equals(getLastRetriedTime(), workflow.getLastRetriedTime()); + return Objects.equals(getWorkflowId(), workflow.getWorkflowId()); } @Override public int hashCode() { - return Objects.hash( - getStatus(), - getEndTime(), - getWorkflowId(), - getParentWorkflowId(), - getParentWorkflowTaskId(), - getTasks(), - getInput(), - getOutput(), - getWorkflowName(), - getWorkflowVersion(), - getCorrelationId(), - getReRunFromWorkflowId(), - getReasonForIncompletion(), - getEvent(), - getTaskToDomain(), - getFailedReferenceTaskNames(), - getFailedTaskNames(), - getWorkflowDefinition(), - getExternalInputPayloadStoragePath(), - getExternalOutputPayloadStoragePath(), - getPriority(), - getVariables(), - getLastRetriedTime()); + return Objects.hash(getWorkflowId()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java b/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java index 9be8d7df1..c41a8f69c 100644 --- a/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java +++ b/common/src/main/java/com/netflix/conductor/common/run/WorkflowSummary.java @@ -88,6 +88,9 @@ public class WorkflowSummary { @ProtoField(id = 18) private Set failedTaskNames = new HashSet<>(); + @ProtoField(id = 19) + private String createdBy; + public WorkflowSummary() {} public WorkflowSummary(Workflow workflow) { @@ -346,6 +349,14 @@ public void setPriority(int priority) { this.priority = priority; } + public String getCreatedBy() { + return createdBy; + } + + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -366,7 +377,8 @@ && getWorkflowId().equals(that.getWorkflowId()) && StringUtils.equals(getEndTime(), that.getEndTime()) && getStatus() == that.getStatus() && Objects.equals(getReasonForIncompletion(), that.getReasonForIncompletion()) - && Objects.equals(getEvent(), that.getEvent()); + && Objects.equals(getEvent(), that.getEvent()) + && Objects.equals(getCreatedBy(), that.getCreatedBy()); } @Override @@ -383,6 +395,7 @@ public int hashCode() { getReasonForIncompletion(), getExecutionTime(), getEvent(), - getPriority()); + getPriority(), + getCreatedBy()); } } diff --git a/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java b/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java index 6ba1f11ba..7bb6ab7ff 100644 --- a/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java +++ b/common/src/main/java/com/netflix/conductor/common/utils/TaskUtils.java @@ -12,8 +12,21 @@ */ package com.netflix.conductor.common.utils; +import com.netflix.conductor.common.config.ObjectMapperProvider; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + public class TaskUtils { + private static final ObjectMapper objectMapper; + + static { + ObjectMapperProvider provider = new ObjectMapperProvider(); + objectMapper = provider.getObjectMapper(); + } + private static final String LOOP_TASK_DELIMITER = "__"; public static String appendIteration(String name, int iteration) { @@ -28,4 +41,8 @@ public static String removeIterationFromTaskRefName(String referenceTaskName) { String[] tokens = referenceTaskName.split(TaskUtils.LOOP_TASK_DELIMITER); return tokens.length > 0 ? tokens[0] : referenceTaskName; } + + public static WorkflowDef convertToWorkflowDef(Object workflowDef) { + return objectMapper.convertValue(workflowDef, new TypeReference() {}); + } } diff --git a/common/src/main/java/com/netflix/conductor/common/validation/ErrorResponse.java b/common/src/main/java/com/netflix/conductor/common/validation/ErrorResponse.java index f9183c928..a43a91197 100644 --- a/common/src/main/java/com/netflix/conductor/common/validation/ErrorResponse.java +++ b/common/src/main/java/com/netflix/conductor/common/validation/ErrorResponse.java @@ -13,6 +13,7 @@ package com.netflix.conductor.common.validation; import java.util.List; +import java.util.Map; public class ErrorResponse { @@ -23,6 +24,16 @@ public class ErrorResponse { private boolean retryable; private List validationErrors; + private Map metadata; + + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } + public int getStatus() { return status; } diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java index f370138fc..a46cf7d5c 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskDefTest.java @@ -74,24 +74,6 @@ public void testTaskDef() { assertTrue(validationErrors.contains("ownerEmail cannot be empty")); } - @Test - public void testTaskDefNameAndOwnerNotSet() { - TaskDef taskDef = new TaskDef(); - taskDef.setRetryCount(-1); - taskDef.setTimeoutSeconds(1000); - taskDef.setResponseTimeoutSeconds(1); - - Set> result = validator.validate(taskDef); - assertEquals(3, result.size()); - - List validationErrors = new ArrayList<>(); - result.forEach(e -> validationErrors.add(e.getMessage())); - - assertTrue(validationErrors.contains("TaskDef retryCount: 0 must be >= 0")); - assertTrue(validationErrors.contains("TaskDef name cannot be null or empty")); - assertTrue(validationErrors.contains("ownerEmail cannot be empty")); - } - @Test public void testTaskDefInvalidEmail() { TaskDef taskDef = new TaskDef(); @@ -99,7 +81,6 @@ public void testTaskDefInvalidEmail() { taskDef.setRetryCount(1); taskDef.setTimeoutSeconds(1000); taskDef.setResponseTimeoutSeconds(1); - taskDef.setOwnerEmail("owner"); Set> result = validator.validate(taskDef); assertEquals(1, result.size()); @@ -107,7 +88,9 @@ public void testTaskDefInvalidEmail() { List validationErrors = new ArrayList<>(); result.forEach(e -> validationErrors.add(e.getMessage())); - assertTrue(validationErrors.contains("ownerEmail should be valid email address")); + assertTrue( + validationErrors.toString(), + validationErrors.contains("ownerEmail cannot be empty")); } @Test diff --git a/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java b/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java index 255108170..402fcfcb0 100644 --- a/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java +++ b/common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java @@ -98,7 +98,7 @@ public void testDeepCopyTask() { final Task task = new Task(); // In order to avoid forgetting putting inside the copy method the newly added fields check // the number of declared fields. - final int expectedTaskFieldsNumber = 40; + final int expectedTaskFieldsNumber = 41; final int declaredFieldsNumber = task.getClass().getDeclaredFields().length; assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber); diff --git a/common/src/test/java/com/netflix/conductor/common/workflow/SubWorkflowParamsTest.java b/common/src/test/java/com/netflix/conductor/common/workflow/SubWorkflowParamsTest.java index d32afc5f6..5d9222d62 100644 --- a/common/src/test/java/com/netflix/conductor/common/workflow/SubWorkflowParamsTest.java +++ b/common/src/test/java/com/netflix/conductor/common/workflow/SubWorkflowParamsTest.java @@ -12,11 +12,9 @@ */ package com.netflix.conductor.common.workflow; -import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; @@ -25,6 +23,8 @@ import org.springframework.test.context.junit4.SpringRunner; import com.netflix.conductor.common.config.TestObjectMapperConfiguration; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; @@ -32,13 +32,8 @@ import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import jakarta.validation.ConstraintViolation; -import jakarta.validation.Validation; -import jakarta.validation.Validator; -import jakarta.validation.ValidatorFactory; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; @ContextConfiguration(classes = {TestObjectMapperConfiguration.class}) @RunWith(SpringRunner.class) @@ -46,22 +41,6 @@ public class SubWorkflowParamsTest { @Autowired private ObjectMapper objectMapper; - @Test - public void testWorkflowTaskName() { - SubWorkflowParams subWorkflowParams = new SubWorkflowParams(); // name is null - ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); - Validator validator = factory.getValidator(); - - Set> result = validator.validate(subWorkflowParams); - assertEquals(2, result.size()); - - List validationErrors = new ArrayList<>(); - result.forEach(e -> validationErrors.add(e.getMessage())); - - assertTrue(validationErrors.contains("SubWorkflowParams name cannot be null")); - assertTrue(validationErrors.contains("SubWorkflowParams name cannot be empty")); - } - @Test public void testWorkflowSetTaskToDomain() { SubWorkflowParams subWorkflowParams = new SubWorkflowParams(); @@ -91,7 +70,6 @@ public void testGetWorkflowDef() { def.getTasks().add(task); subWorkflowParams.setWorkflowDefinition(def); assertEquals(def, subWorkflowParams.getWorkflowDefinition()); - assertEquals(def, subWorkflowParams.getWorkflowDef()); } @Test @@ -115,7 +93,41 @@ public void testWorkflowDefJson() throws Exception { objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(subWorkflowParams); SubWorkflowParams deserializedParams = objectMapper.readValue(serializedParams, SubWorkflowParams.class); - assertEquals(def, deserializedParams.getWorkflowDefinition()); - assertEquals(def, deserializedParams.getWorkflowDef()); + var x = (WorkflowDef) deserializedParams.getWorkflowDefinition(); + assertEquals(def, x); + + var taskName = "taskName"; + var subWorkflowName = "subwf"; + TaskDef taskDef = new TaskDef(taskName); + taskDef.setRetryCount(0); + taskDef.setOwnerEmail("test@orkes.io"); + + WorkflowTask inline = new WorkflowTask(); + inline.setTaskReferenceName(taskName); + inline.setName(taskName); + inline.setTaskDefinition(taskDef); + inline.setWorkflowTaskType(TaskType.SIMPLE); + inline.setInputParameters(Map.of("evaluatorType", "graaljs", "expression", "true;")); + + WorkflowDef subworkflowDef = new WorkflowDef(); + subworkflowDef.setName(subWorkflowName); + subworkflowDef.setOwnerEmail("test@orkes.io"); + subworkflowDef.setInputParameters(Arrays.asList("value", "inlineValue")); + subworkflowDef.setDescription("Sub Workflow to test retry"); + subworkflowDef.setTimeoutSeconds(600); + subworkflowDef.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.TIME_OUT_WF); + subworkflowDef.setTasks(Arrays.asList(inline)); + + // autowired + var serializedSubWorkflowDef1 = objectMapper.writeValueAsString(subworkflowDef); + var deserializedSubWorkflowDef1 = + objectMapper.readValue(serializedSubWorkflowDef1, WorkflowDef.class); + assertEquals(deserializedSubWorkflowDef1, subworkflowDef); + // default + ObjectMapper mapper = new ObjectMapper(); + var serializedSubWorkflowDef2 = mapper.writeValueAsString(subworkflowDef); + var deserializedSubWorkflowDef2 = + mapper.readValue(serializedSubWorkflowDef2, WorkflowDef.class); + assertEquals(deserializedSubWorkflowDef2, subworkflowDef); } } diff --git a/common/src/test/java/com/netflix/conductor/common/workflow/WorkflowDefValidatorTest.java b/common/src/test/java/com/netflix/conductor/common/workflow/WorkflowDefValidatorTest.java index d08acdc77..132e33d99 100644 --- a/common/src/test/java/com/netflix/conductor/common/workflow/WorkflowDefValidatorTest.java +++ b/common/src/test/java/com/netflix/conductor/common/workflow/WorkflowDefValidatorTest.java @@ -327,12 +327,7 @@ public void testWorkflowOwnerInvalidEmail() { ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); Validator validator = factory.getValidator(); Set> result = validator.validate(workflowDef); - assertEquals(1, result.size()); - - List validationErrors = new ArrayList<>(); - result.forEach(e -> validationErrors.add(e.getMessage())); - - assertTrue(validationErrors.contains("ownerEmail should be valid email address")); + assertEquals(0, result.size()); } @Test diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index 1fa3f1990..5277f93e0 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -377,7 +377,7 @@ public void testRegisterWorkflowDefInvalidName() { workflowDef.setOwnerEmail("inavlid-email"); metadataService.registerWorkflowDef(workflowDef); } catch (ConstraintViolationException ex) { - assertEquals(3, ex.getConstraintViolations().size()); + assertEquals(2, ex.getConstraintViolations().size()); Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); assertTrue(messages.contains("WorkflowTask list cannot be empty")); assertTrue( @@ -397,7 +397,7 @@ public void testValidateWorkflowDefInvalidName() { workflowDef.setOwnerEmail("inavlid-email"); metadataService.validateWorkflowDef(workflowDef); } catch (ConstraintViolationException ex) { - assertEquals(3, ex.getConstraintViolations().size()); + assertEquals(2, ex.getConstraintViolations().size()); Set messages = getConstraintViolationMessages(ex.getConstraintViolations()); assertTrue(messages.contains("WorkflowTask list cannot be empty")); assertTrue( diff --git a/core/src/test/java/com/netflix/conductor/validations/WorkflowTaskTypeConstraintTest.java b/core/src/test/java/com/netflix/conductor/validations/WorkflowTaskTypeConstraintTest.java index eb1d88dd3..e58ed97e0 100644 --- a/core/src/test/java/com/netflix/conductor/validations/WorkflowTaskTypeConstraintTest.java +++ b/core/src/test/java/com/netflix/conductor/validations/WorkflowTaskTypeConstraintTest.java @@ -29,7 +29,6 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskType; -import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.execution.tasks.Terminate; import com.netflix.conductor.dao.MetadataDAO; @@ -401,25 +400,6 @@ public void testWorkflowTaskTypeSubworkflowMissingSubworkflowParam() { "subWorkflowParam field is required for taskType: SUB_WORKFLOW taskName: encode")); } - @Test - public void testWorkflowTaskTypeSubworkflow() { - WorkflowTask workflowTask = createSampleWorkflowTask(); - workflowTask.setType("SUB_WORKFLOW"); - - SubWorkflowParams subWorkflowTask = new SubWorkflowParams(); - workflowTask.setSubWorkflowParam(subWorkflowTask); - - Set> result = validator.validate(workflowTask); - assertEquals(2, result.size()); - - List validationErrors = new ArrayList<>(); - - result.forEach(e -> validationErrors.add(e.getMessage())); - - assertTrue(validationErrors.contains("SubWorkflowParams name cannot be null")); - assertTrue(validationErrors.contains("SubWorkflowParams name cannot be empty")); - } - @Test public void testWorkflowTaskTypeTerminateWithoutTerminationStatus() { WorkflowTask workflowTask = createSampleWorkflowTask(); From 498e1daf5c31e25dad4051187418bdda9422b7ff Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 30 Jun 2024 14:11:52 -0700 Subject: [PATCH 02/10] clean up --- .../dao/CassandraExecutionDAOSpec.groovy | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy index 13e79e238..e438f4a88 100644 --- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy +++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy @@ -403,44 +403,6 @@ class CassandraExecutionDAOSpec extends CassandraSpec { eventExecutionList != null && eventExecutionList.empty } - def "verify workflow serialization"() { - given: 'define a workflow' - String workflowId = new IDGenerator().generate() - WorkflowTask workflowTask = new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2)) - WorkflowDef workflowDef = new WorkflowDef(name: UUID.randomUUID().toString(), version: 1, tasks: [workflowTask]) - WorkflowModel workflow = new WorkflowModel(workflowDefinition: workflowDef, workflowId: workflowId, status: WorkflowModel.Status.RUNNING, createTime: System.currentTimeMillis()) - - when: 'serialize workflow' - def workflowJson = objectMapper.writeValueAsString(workflow) - - then: - !workflowJson.contains('failedReferenceTaskNames') - // workflowTask - !workflowJson.contains('decisionCases') - !workflowJson.contains('defaultCase') - !workflowJson.contains('forkTasks') - !workflowJson.contains('joinOn') - !workflowJson.contains('defaultExclusiveJoinTask') - !workflowJson.contains('loopOver') - } - - def "verify task serialization"() { - given: 'define a workflow and tasks for this workflow' - String workflowId = new IDGenerator().generate() - WorkflowTask workflowTask = new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2)) - TaskModel task = new TaskModel(workflowInstanceId: workflowId, taskType: UUID.randomUUID().toString(), referenceTaskName: UUID.randomUUID().toString(), status: TaskModel.Status.SCHEDULED, taskId: new IDGenerator().generate(), workflowTask: workflowTask) - - when: 'serialize task' - def taskJson = objectMapper.writeValueAsString(task) - - then: - !taskJson.contains('decisionCases') - !taskJson.contains('defaultCase') - !taskJson.contains('forkTasks') - !taskJson.contains('joinOn') - !taskJson.contains('defaultExclusiveJoinTask') - } - def "serde of workflow with large number of tasks"() { given: 'create a workflow and tasks for this workflow' String workflowId = new IDGenerator().generate() From e78e569dc2d351a017c09d3c7b0dae6b494724b1 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 30 Jun 2024 14:14:57 -0700 Subject: [PATCH 03/10] update tests --- .../java/com/netflix/conductor/service/MetadataServiceTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index 5277f93e0..e4f827340 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -383,7 +383,6 @@ public void testRegisterWorkflowDefInvalidName() { assertTrue( messages.contains( "Workflow name cannot contain the following set of characters: ':'")); - assertTrue(messages.contains("ownerEmail should be valid email address")); throw ex; } fail("metadataService.registerWorkflowDef did not throw ConstraintViolationException !"); @@ -403,7 +402,6 @@ public void testValidateWorkflowDefInvalidName() { assertTrue( messages.contains( "Workflow name cannot contain the following set of characters: ':'")); - assertTrue(messages.contains("ownerEmail should be valid email address")); throw ex; } fail("metadataService.validateWorkflowDef did not throw ConstraintViolationException !"); From ce423299d306810a38b26109f4ddc904406045fa Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 30 Jun 2024 23:24:57 -0700 Subject: [PATCH 04/10] updates to the classes --- .../conductor/common/metadata/SchemaDef.java | 19 +- .../metadata/workflow/RateLimitConfig.java | 6 + .../metadata/workflow/StateChangeEvent.java | 9 +- .../common/metadata/workflow/WorkflowDef.java | 40 +--- .../metadata/workflow/WorkflowTask.java | 129 ++++------- .../conductor/grpc/AbstractProtoMapper.java | 204 ++++++++++++++++++ grpc/src/main/proto/model/eventhandler.proto | 13 ++ .../main/proto/model/ratelimitconfig.proto | 12 ++ grpc/src/main/proto/model/schemadef.proto | 18 ++ .../proto/model/startworkflowrequest.proto | 1 + grpc/src/main/proto/model/taskdef.proto | 1 + .../proto/model/upgradeworkflowrequest.proto | 15 ++ grpc/src/main/proto/model/workflow.proto | 1 + grpc/src/main/proto/model/workflowdef.proto | 6 + .../main/proto/model/workflowsummary.proto | 1 + grpc/src/main/proto/model/workflowtask.proto | 3 +- 16 files changed, 357 insertions(+), 121 deletions(-) create mode 100644 grpc/src/main/proto/model/ratelimitconfig.proto create mode 100644 grpc/src/main/proto/model/schemadef.proto create mode 100644 grpc/src/main/proto/model/upgradeworkflowrequest.proto diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java index a90f87b3c..5d8b80bbf 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/SchemaDef.java @@ -14,6 +14,10 @@ import java.util.Map; +import com.netflix.conductor.annotations.protogen.ProtoEnum; +import com.netflix.conductor.annotations.protogen.ProtoField; +import com.netflix.conductor.annotations.protogen.ProtoMessage; + import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; @@ -26,19 +30,28 @@ @Data @NoArgsConstructor @AllArgsConstructor +@ProtoMessage public class SchemaDef extends Auditable { + @ProtoEnum public enum Type { JSON, AVRO, PROTOBUF } - @NotNull private String name; + @ProtoField(id = 1) + @NotNull + private String name; - @NotNull @Builder.Default private int version = 1; + @ProtoField(id = 2) + @NotNull + @Builder.Default + private int version = 1; - @NotNull private Type type; + @ProtoField(id = 3) + @NotNull + private Type type; // Schema definition stored here private Map data; diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java index fba785f9c..966880f68 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java @@ -12,15 +12,21 @@ */ package com.netflix.conductor.common.metadata.workflow; +import com.netflix.conductor.annotations.protogen.ProtoField; +import com.netflix.conductor.annotations.protogen.ProtoMessage; + /** Rate limit configuration for workflows */ +@ProtoMessage public class RateLimitConfig { /** * Key that defines the rate limit. Rate limit key is a combination of workflow payload such as * name, or correlationId etc. */ + @ProtoField(id = 1) private String rateLimitKey; /** Number of concurrently running workflows that are allowed per key */ + @ProtoField(id = 2) private int concurrentExecLimit; public String getRateLimitKey() { diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java index 8d0d6a01d..fc0275a5e 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StateChangeEvent.java @@ -14,14 +14,21 @@ import java.util.Map; +import com.netflix.conductor.annotations.protogen.ProtoField; +import com.netflix.conductor.annotations.protogen.ProtoMessage; + import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; @Valid +@ProtoMessage public class StateChangeEvent { - @NotNull private String type; + @ProtoField(id = 1) + @NotNull + private String type; + @ProtoField(id = 2) private Map payload; public String getType() { diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index 6b56c61a3..2569294b8 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -92,18 +92,21 @@ public enum TimeoutPolicy { @ProtoField(id = 15) private Map inputTemplate = new HashMap<>(); - @ProtoField(id = 16) + @ProtoField(id = 17) private String workflowStatusListenerSink; - @ProtoField(id = 17) + @ProtoField(id = 18) private RateLimitConfig rateLimitConfig; - @ProtoField(id = 18) + @ProtoField(id = 19) private SchemaDef inputSchema; - @ProtoField(id = 19) + @ProtoField(id = 20) private SchemaDef outputSchema; + @ProtoField(id = 21) + private boolean enforceSchema = true; + public boolean isEnforceSchema() { return enforceSchema; } @@ -112,8 +115,6 @@ public void setEnforceSchema(boolean enforceSchema) { this.enforceSchema = enforceSchema; } - private boolean enforceSchema = true; - /** * @return the name */ @@ -429,35 +430,12 @@ public boolean equals(Object o) { return false; } WorkflowDef that = (WorkflowDef) o; - return getVersion() == that.getVersion() - && getSchemaVersion() == that.getSchemaVersion() - && Objects.equals(getName(), that.getName()) - && Objects.equals(getDescription(), that.getDescription()) - && Objects.equals(getTasks(), that.getTasks()) - && Objects.equals(getInputParameters(), that.getInputParameters()) - && Objects.equals(getOutputParameters(), that.getOutputParameters()) - && Objects.equals(getFailureWorkflow(), that.getFailureWorkflow()) - && Objects.equals(getOwnerEmail(), that.getOwnerEmail()) - && Objects.equals(getTimeoutSeconds(), that.getTimeoutSeconds()) - && Objects.equals(getInputSchema(), that.getInputSchema()) - && Objects.equals(getOutputSchema(), that.getOutputSchema()); + return version == that.version && Objects.equals(name, that.name); } @Override public int hashCode() { - return Objects.hash( - getName(), - getDescription(), - getVersion(), - getTasks(), - getInputParameters(), - getOutputParameters(), - getFailureWorkflow(), - getSchemaVersion(), - getOwnerEmail(), - getTimeoutSeconds(), - getInputSchema(), - getOutputSchema()); + return Objects.hash(name, version); } @Override diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index 88c83f364..43ad71ce8 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -38,6 +38,28 @@ @ProtoMessage public class WorkflowTask { + public static class CacheConfig { + + private String key; + private int ttlInSecond; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public int getTtlInSecond() { + return ttlInSecond; + } + + public void setTtlInSecond(int ttlInSecond) { + this.ttlInSecond = ttlInSecond; + } + } + @ProtoField(id = 1) @NotEmpty(message = "WorkflowTask name cannot be empty or null") private String name; @@ -146,41 +168,36 @@ public void setTasks(List tasks) { @ProtoField(id = 28) private String expression; + /* + Map of events to be emitted when the task status changed. + key can be comma separated values of the status changes prefixed with "on" + */ @ProtoField(id = 29) - private String joinStatus; - - @ProtoField(id = 30) - private boolean permissive; - - public static class CacheConfig { - - private String key; - private int ttlInSecond; + private @Valid Map> onStateChange = new HashMap<>(); - public String getKey() { - return key; - } + @ProtoMessage(wrapper = true) + public static class StateChangeEventList { - public void setKey(String key) { - this.key = key; + public List getTasks() { + return events; } - public int getTtlInSecond() { - return ttlInSecond; + public void setTasks(List events) { + this.events = events; } - public void setTtlInSecond(int ttlInSecond) { - this.ttlInSecond = ttlInSecond; - } + @ProtoField(id = 1) + private List events; } + @ProtoField(id = 30) + private String joinStatus; + + @ProtoField(id = 31) private CacheConfig cacheConfig; - /* - Map of events to be emitted when the task status changed. - key can be comma separated values of the status changes prefixed with "on" - */ - private @Valid Map> onStateChange = new HashMap<>(); + @ProtoField(id = 32) + private boolean permissive; /** * @return the name @@ -772,70 +789,12 @@ public boolean equals(Object o) { return false; } WorkflowTask that = (WorkflowTask) o; - return getStartDelay() == that.getStartDelay() - && isOptional() == that.isOptional() - && Objects.equals(getName(), that.getName()) - && Objects.equals(getTaskReferenceName(), that.getTaskReferenceName()) - && Objects.equals(getDescription(), that.getDescription()) - && Objects.equals(getInputParameters(), that.getInputParameters()) - && Objects.equals(getType(), that.getType()) - && Objects.equals(getDynamicTaskNameParam(), that.getDynamicTaskNameParam()) - && Objects.equals(getCaseValueParam(), that.getCaseValueParam()) - && Objects.equals(getEvaluatorType(), that.getEvaluatorType()) - && Objects.equals(getExpression(), that.getExpression()) - && Objects.equals(getCaseExpression(), that.getCaseExpression()) - && Objects.equals(getDecisionCases(), that.getDecisionCases()) - && Objects.equals( - getDynamicForkJoinTasksParam(), that.getDynamicForkJoinTasksParam()) - && Objects.equals(getDynamicForkTasksParam(), that.getDynamicForkTasksParam()) - && Objects.equals( - getDynamicForkTasksInputParamName(), - that.getDynamicForkTasksInputParamName()) - && Objects.equals(getDefaultCase(), that.getDefaultCase()) - && Objects.equals(getForkTasks(), that.getForkTasks()) - && Objects.equals(getSubWorkflowParam(), that.getSubWorkflowParam()) - && Objects.equals(getJoinOn(), that.getJoinOn()) - && Objects.equals(getJoinStatus(), that.getJoinStatus()) - && Objects.equals(getSink(), that.getSink()) - && Objects.equals(isAsyncComplete(), that.isAsyncComplete()) - && Objects.equals(getDefaultExclusiveJoinTask(), that.getDefaultExclusiveJoinTask()) - && Objects.equals(getRetryCount(), that.getRetryCount()) - && Objects.equals(getCacheConfig(), that.getCacheConfig()) - && Objects.equals(isPermissive(), that.isPermissive()) - && Objects.equals(getOnStateChange(), that.getOnStateChange()); + return Objects.equals(name, that.name) + && Objects.equals(taskReferenceName, that.taskReferenceName); } @Override public int hashCode() { - - return Objects.hash( - getName(), - getTaskReferenceName(), - getDescription(), - getInputParameters(), - getType(), - getDynamicTaskNameParam(), - getCaseValueParam(), - getCaseExpression(), - getEvaluatorType(), - getExpression(), - getDecisionCases(), - getDynamicForkJoinTasksParam(), - getDynamicForkTasksParam(), - getDynamicForkTasksInputParamName(), - getDefaultCase(), - getForkTasks(), - getStartDelay(), - getSubWorkflowParam(), - getJoinOn(), - getJoinStatus(), - getSink(), - isAsyncComplete(), - isOptional(), - getDefaultExclusiveJoinTask(), - getOnStateChange(), - getCacheConfig(), - isPermissive(), - getRetryCount()); + return Objects.hash(name, taskReferenceName); } } diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index b6d3a3447..424da891e 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -2,6 +2,7 @@ import com.google.protobuf.Any; import com.google.protobuf.Value; +import com.netflix.conductor.common.metadata.SchemaDef; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.events.EventHandler; import com.netflix.conductor.common.metadata.tasks.PollData; @@ -11,10 +12,12 @@ import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.common.metadata.workflow.DynamicForkJoinTask; import com.netflix.conductor.common.metadata.workflow.DynamicForkJoinTaskList; +import com.netflix.conductor.common.metadata.workflow.RateLimitConfig; import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; +import com.netflix.conductor.common.metadata.workflow.UpgradeWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDefSummary; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; @@ -26,7 +29,9 @@ import com.netflix.conductor.proto.EventExecutionPb; import com.netflix.conductor.proto.EventHandlerPb; import com.netflix.conductor.proto.PollDataPb; +import com.netflix.conductor.proto.RateLimitConfigPb; import com.netflix.conductor.proto.RerunWorkflowRequestPb; +import com.netflix.conductor.proto.SchemaDefPb; import com.netflix.conductor.proto.SkipTaskRequestPb; import com.netflix.conductor.proto.StartWorkflowRequestPb; import com.netflix.conductor.proto.SubWorkflowParamsPb; @@ -35,6 +40,7 @@ import com.netflix.conductor.proto.TaskPb; import com.netflix.conductor.proto.TaskResultPb; import com.netflix.conductor.proto.TaskSummaryPb; +import com.netflix.conductor.proto.UpgradeWorkflowRequestPb; import com.netflix.conductor.proto.WorkflowDefPb; import com.netflix.conductor.proto.WorkflowDefSummaryPb; import com.netflix.conductor.proto.WorkflowPb; @@ -202,6 +208,54 @@ public EventHandler fromProto(EventHandlerPb.EventHandler from) { return to; } + public EventHandlerPb.EventHandler.UpdateWorkflowVariables toProto( + EventHandler.UpdateWorkflowVariables from) { + EventHandlerPb.EventHandler.UpdateWorkflowVariables.Builder to = EventHandlerPb.EventHandler.UpdateWorkflowVariables.newBuilder(); + if (from.getWorkflowId() != null) { + to.setWorkflowId( from.getWorkflowId() ); + } + for (Map.Entry pair : from.getVariables().entrySet()) { + to.putVariables( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.isAppendArray() != null) { + to.setAppendArray( from.isAppendArray() ); + } + return to.build(); + } + + public EventHandler.UpdateWorkflowVariables fromProto( + EventHandlerPb.EventHandler.UpdateWorkflowVariables from) { + EventHandler.UpdateWorkflowVariables to = new EventHandler.UpdateWorkflowVariables(); + to.setWorkflowId( from.getWorkflowId() ); + Map variablesMap = new HashMap(); + for (Map.Entry pair : from.getVariablesMap().entrySet()) { + variablesMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setVariables(variablesMap); + to.setAppendArray( from.getAppendArray() ); + return to; + } + + public EventHandlerPb.EventHandler.TerminateWorkflow toProto( + EventHandler.TerminateWorkflow from) { + EventHandlerPb.EventHandler.TerminateWorkflow.Builder to = EventHandlerPb.EventHandler.TerminateWorkflow.newBuilder(); + if (from.getWorkflowId() != null) { + to.setWorkflowId( from.getWorkflowId() ); + } + if (from.getTerminationReason() != null) { + to.setTerminationReason( from.getTerminationReason() ); + } + return to.build(); + } + + public EventHandler.TerminateWorkflow fromProto( + EventHandlerPb.EventHandler.TerminateWorkflow from) { + EventHandler.TerminateWorkflow to = new EventHandler.TerminateWorkflow(); + to.setWorkflowId( from.getWorkflowId() ); + to.setTerminationReason( from.getTerminationReason() ); + return to; + } + public EventHandlerPb.EventHandler.StartWorkflow toProto(EventHandler.StartWorkflow from) { EventHandlerPb.EventHandler.StartWorkflow.Builder to = EventHandlerPb.EventHandler.StartWorkflow.newBuilder(); if (from.getName() != null) { @@ -291,6 +345,12 @@ public EventHandlerPb.EventHandler.Action toProto(EventHandler.Action from) { to.setFailTask( toProto( from.getFail_task() ) ); } to.setExpandInlineJson( from.isExpandInlineJSON() ); + if (from.getTerminate_workflow() != null) { + to.setTerminateWorkflow( toProto( from.getTerminate_workflow() ) ); + } + if (from.getUpdate_workflow_variables() != null) { + to.setUpdateWorkflowVariables( toProto( from.getUpdate_workflow_variables() ) ); + } return to.build(); } @@ -307,6 +367,12 @@ public EventHandler.Action fromProto(EventHandlerPb.EventHandler.Action from) { to.setFail_task( fromProto( from.getFailTask() ) ); } to.setExpandInlineJSON( from.getExpandInlineJson() ); + if (from.hasTerminateWorkflow()) { + to.setTerminate_workflow( fromProto( from.getTerminateWorkflow() ) ); + } + if (from.hasUpdateWorkflowVariables()) { + to.setUpdate_workflow_variables( fromProto( from.getUpdateWorkflowVariables() ) ); + } return to; } @@ -316,6 +382,8 @@ public EventHandlerPb.EventHandler.Action.Type toProto(EventHandler.Action.Type case start_workflow: to = EventHandlerPb.EventHandler.Action.Type.START_WORKFLOW; break; case complete_task: to = EventHandlerPb.EventHandler.Action.Type.COMPLETE_TASK; break; case fail_task: to = EventHandlerPb.EventHandler.Action.Type.FAIL_TASK; break; + case terminate_workflow: to = EventHandlerPb.EventHandler.Action.Type.TERMINATE_WORKFLOW; break; + case update_workflow_variables: to = EventHandlerPb.EventHandler.Action.Type.UPDATE_WORKFLOW_VARIABLES; break; default: throw new IllegalArgumentException("Unexpected enum constant: " + from); } return to; @@ -327,6 +395,8 @@ public EventHandler.Action.Type fromProto(EventHandlerPb.EventHandler.Action.Typ case START_WORKFLOW: to = EventHandler.Action.Type.start_workflow; break; case COMPLETE_TASK: to = EventHandler.Action.Type.complete_task; break; case FAIL_TASK: to = EventHandler.Action.Type.fail_task; break; + case TERMINATE_WORKFLOW: to = EventHandler.Action.Type.terminate_workflow; break; + case UPDATE_WORKFLOW_VARIABLES: to = EventHandler.Action.Type.update_workflow_variables; break; default: throw new IllegalArgumentException("Unexpected enum constant: " + from); } return to; @@ -356,6 +426,22 @@ public PollData fromProto(PollDataPb.PollData from) { return to; } + public RateLimitConfigPb.RateLimitConfig toProto(RateLimitConfig from) { + RateLimitConfigPb.RateLimitConfig.Builder to = RateLimitConfigPb.RateLimitConfig.newBuilder(); + if (from.getRateLimitKey() != null) { + to.setRateLimitKey( from.getRateLimitKey() ); + } + to.setConcurrentExecLimit( from.getConcurrentExecLimit() ); + return to.build(); + } + + public RateLimitConfig fromProto(RateLimitConfigPb.RateLimitConfig from) { + RateLimitConfig to = new RateLimitConfig(); + to.setRateLimitKey( from.getRateLimitKey() ); + to.setConcurrentExecLimit( from.getConcurrentExecLimit() ); + return to; + } + public RerunWorkflowRequestPb.RerunWorkflowRequest toProto(RerunWorkflowRequest from) { RerunWorkflowRequestPb.RerunWorkflowRequest.Builder to = RerunWorkflowRequestPb.RerunWorkflowRequest.newBuilder(); if (from.getReRunFromWorkflowId() != null) { @@ -394,6 +480,48 @@ public RerunWorkflowRequest fromProto(RerunWorkflowRequestPb.RerunWorkflowReques return to; } + public SchemaDefPb.SchemaDef toProto(SchemaDef from) { + SchemaDefPb.SchemaDef.Builder to = SchemaDefPb.SchemaDef.newBuilder(); + if (from.getName() != null) { + to.setName( from.getName() ); + } + to.setVersion( from.getVersion() ); + if (from.getType() != null) { + to.setType( toProto( from.getType() ) ); + } + return to.build(); + } + + public SchemaDef fromProto(SchemaDefPb.SchemaDef from) { + SchemaDef to = new SchemaDef(); + to.setName( from.getName() ); + to.setVersion( from.getVersion() ); + to.setType( fromProto( from.getType() ) ); + return to; + } + + public SchemaDefPb.SchemaDef.Type toProto(SchemaDef.Type from) { + SchemaDefPb.SchemaDef.Type to; + switch (from) { + case JSON: to = SchemaDefPb.SchemaDef.Type.JSON; break; + case AVRO: to = SchemaDefPb.SchemaDef.Type.AVRO; break; + case PROTOBUF: to = SchemaDefPb.SchemaDef.Type.PROTOBUF; break; + default: throw new IllegalArgumentException("Unexpected enum constant: " + from); + } + return to; + } + + public SchemaDef.Type fromProto(SchemaDefPb.SchemaDef.Type from) { + SchemaDef.Type to; + switch (from) { + case JSON: to = SchemaDef.Type.JSON; break; + case AVRO: to = SchemaDef.Type.AVRO; break; + case PROTOBUF: to = SchemaDef.Type.PROTOBUF; break; + default: throw new IllegalArgumentException("Unexpected enum constant: " + from); + } + return to; + } + public SkipTaskRequest fromProto(SkipTaskRequestPb.SkipTaskRequest from) { SkipTaskRequest to = new SkipTaskRequest(); Map taskInputMap = new HashMap(); @@ -439,6 +567,9 @@ public StartWorkflowRequestPb.StartWorkflowRequest toProto(StartWorkflowRequest if (from.getPriority() != null) { to.setPriority( from.getPriority() ); } + if (from.getCreatedBy() != null) { + to.setCreatedBy( from.getCreatedBy() ); + } return to.build(); } @@ -458,6 +589,7 @@ public StartWorkflowRequest fromProto(StartWorkflowRequestPb.StartWorkflowReques } to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); to.setPriority( from.getPriority() ); + to.setCreatedBy( from.getCreatedBy() ); return to; } @@ -716,6 +848,9 @@ public TaskDefPb.TaskDef toProto(TaskDef from) { if (from.getBackoffScaleFactor() != null) { to.setBackoffScaleFactor( from.getBackoffScaleFactor() ); } + if (from.getBaseType() != null) { + to.setBaseType( from.getBaseType() ); + } return to.build(); } @@ -744,6 +879,7 @@ public TaskDef fromProto(TaskDefPb.TaskDef from) { to.setOwnerEmail( from.getOwnerEmail() ); to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() ); to.setBackoffScaleFactor( from.getBackoffScaleFactor() ); + to.setBaseType( from.getBaseType() ); return to; } @@ -965,6 +1101,40 @@ public TaskSummary fromProto(TaskSummaryPb.TaskSummary from) { return to; } + public UpgradeWorkflowRequestPb.UpgradeWorkflowRequest toProto(UpgradeWorkflowRequest from) { + UpgradeWorkflowRequestPb.UpgradeWorkflowRequest.Builder to = UpgradeWorkflowRequestPb.UpgradeWorkflowRequest.newBuilder(); + for (Map.Entry pair : from.getTaskOutput().entrySet()) { + to.putTaskOutput( pair.getKey(), toProto( pair.getValue() ) ); + } + for (Map.Entry pair : from.getWorkflowInput().entrySet()) { + to.putWorkflowInput( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getVersion() != null) { + to.setVersion( from.getVersion() ); + } + if (from.getName() != null) { + to.setName( from.getName() ); + } + return to.build(); + } + + public UpgradeWorkflowRequest fromProto(UpgradeWorkflowRequestPb.UpgradeWorkflowRequest from) { + UpgradeWorkflowRequest to = new UpgradeWorkflowRequest(); + Map taskOutputMap = new HashMap(); + for (Map.Entry pair : from.getTaskOutputMap().entrySet()) { + taskOutputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setTaskOutput(taskOutputMap); + Map workflowInputMap = new HashMap(); + for (Map.Entry pair : from.getWorkflowInputMap().entrySet()) { + workflowInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setWorkflowInput(workflowInputMap); + to.setVersion( from.getVersion() ); + to.setName( from.getName() ); + return to; + } + public WorkflowPb.Workflow toProto(Workflow from) { WorkflowPb.Workflow.Builder to = WorkflowPb.Workflow.newBuilder(); if (from.getStatus() != null) { @@ -1018,6 +1188,9 @@ public WorkflowPb.Workflow toProto(Workflow from) { } to.setLastRetriedTime( from.getLastRetriedTime() ); to.addAllFailedTaskNames( from.getFailedTaskNames() ); + for (Workflow elem : from.getHistory()) { + to.addHistory( toProto(elem) ); + } return to.build(); } @@ -1058,6 +1231,7 @@ public Workflow fromProto(WorkflowPb.Workflow from) { to.setVariables(variablesMap); to.setLastRetriedTime( from.getLastRetriedTime() ); to.setFailedTaskNames( from.getFailedTaskNamesList().stream().collect(Collectors.toCollection(HashSet::new)) ); + to.setHistory( from.getHistoryList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); return to; } @@ -1124,6 +1298,18 @@ public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { for (Map.Entry pair : from.getInputTemplate().entrySet()) { to.putInputTemplate( pair.getKey(), toProto( pair.getValue() ) ); } + if (from.getWorkflowStatusListenerSink() != null) { + to.setWorkflowStatusListenerSink( from.getWorkflowStatusListenerSink() ); + } + if (from.getRateLimitConfig() != null) { + to.setRateLimitConfig( toProto( from.getRateLimitConfig() ) ); + } + if (from.getInputSchema() != null) { + to.setInputSchema( toProto( from.getInputSchema() ) ); + } + if (from.getOutputSchema() != null) { + to.setOutputSchema( toProto( from.getOutputSchema() ) ); + } return to.build(); } @@ -1156,6 +1342,16 @@ public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { inputTemplateMap.put( pair.getKey(), fromProto( pair.getValue() ) ); } to.setInputTemplate(inputTemplateMap); + to.setWorkflowStatusListenerSink( from.getWorkflowStatusListenerSink() ); + if (from.hasRateLimitConfig()) { + to.setRateLimitConfig( fromProto( from.getRateLimitConfig() ) ); + } + if (from.hasInputSchema()) { + to.setInputSchema( fromProto( from.getInputSchema() ) ); + } + if (from.hasOutputSchema()) { + to.setOutputSchema( fromProto( from.getOutputSchema() ) ); + } return to; } @@ -1247,6 +1443,9 @@ public WorkflowSummaryPb.WorkflowSummary toProto(WorkflowSummary from) { } to.setPriority( from.getPriority() ); to.addAllFailedTaskNames( from.getFailedTaskNames() ); + if (from.getCreatedBy() != null) { + to.setCreatedBy( from.getCreatedBy() ); + } return to.build(); } @@ -1270,6 +1469,7 @@ public WorkflowSummary fromProto(WorkflowSummaryPb.WorkflowSummary from) { to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() ); to.setPriority( from.getPriority() ); to.setFailedTaskNames( from.getFailedTaskNamesList().stream().collect(Collectors.toCollection(HashSet::new)) ); + to.setCreatedBy( from.getCreatedBy() ); return to; } @@ -1351,6 +1551,9 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { if (from.getExpression() != null) { to.setExpression( from.getExpression() ); } + if (from.getJoinStatus() != null) { + to.setJoinStatus( from.getJoinStatus() ); + } to.setPermissive( from.isPermissive() ); return to.build(); } @@ -1397,6 +1600,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { to.setRetryCount( from.getRetryCount() ); to.setEvaluatorType( from.getEvaluatorType() ); to.setExpression( from.getExpression() ); + to.setJoinStatus( from.getJoinStatus() ); to.setPermissive( from.getPermissive() ); return to; } diff --git a/grpc/src/main/proto/model/eventhandler.proto b/grpc/src/main/proto/model/eventhandler.proto index cfc623b53..8806bb6ca 100644 --- a/grpc/src/main/proto/model/eventhandler.proto +++ b/grpc/src/main/proto/model/eventhandler.proto @@ -9,6 +9,15 @@ option java_outer_classname = "EventHandlerPb"; option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"; message EventHandler { + message UpdateWorkflowVariables { + string workflow_id = 1; + map variables = 2; + bool append_array = 3; + } + message TerminateWorkflow { + string workflow_id = 1; + string termination_reason = 2; + } message StartWorkflow { string name = 1; int32 version = 2; @@ -29,12 +38,16 @@ message EventHandler { START_WORKFLOW = 0; COMPLETE_TASK = 1; FAIL_TASK = 2; + TERMINATE_WORKFLOW = 3; + UPDATE_WORKFLOW_VARIABLES = 4; } EventHandler.Action.Type action = 1; EventHandler.StartWorkflow start_workflow = 2; EventHandler.TaskDetails complete_task = 3; EventHandler.TaskDetails fail_task = 4; bool expand_inline_json = 5; + EventHandler.TerminateWorkflow terminate_workflow = 6; + EventHandler.UpdateWorkflowVariables update_workflow_variables = 7; } string name = 1; string event = 2; diff --git a/grpc/src/main/proto/model/ratelimitconfig.proto b/grpc/src/main/proto/model/ratelimitconfig.proto new file mode 100644 index 000000000..96a6ab357 --- /dev/null +++ b/grpc/src/main/proto/model/ratelimitconfig.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package conductor.proto; + + +option java_package = "com.netflix.conductor.proto"; +option java_outer_classname = "RateLimitConfigPb"; +option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"; + +message RateLimitConfig { + string rate_limit_key = 1; + int32 concurrent_exec_limit = 2; +} diff --git a/grpc/src/main/proto/model/schemadef.proto b/grpc/src/main/proto/model/schemadef.proto new file mode 100644 index 000000000..58583bdc2 --- /dev/null +++ b/grpc/src/main/proto/model/schemadef.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +package conductor.proto; + + +option java_package = "com.netflix.conductor.proto"; +option java_outer_classname = "SchemaDefPb"; +option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"; + +message SchemaDef { + enum Type { + JSON = 0; + AVRO = 1; + PROTOBUF = 2; + } + string name = 1; + int32 version = 2; + SchemaDef.Type type = 3; +} diff --git a/grpc/src/main/proto/model/startworkflowrequest.proto b/grpc/src/main/proto/model/startworkflowrequest.proto index 4a71f28ed..73d8d3c23 100644 --- a/grpc/src/main/proto/model/startworkflowrequest.proto +++ b/grpc/src/main/proto/model/startworkflowrequest.proto @@ -17,4 +17,5 @@ message StartWorkflowRequest { WorkflowDef workflow_def = 6; string external_input_payload_storage_path = 7; int32 priority = 8; + string created_by = 9; } diff --git a/grpc/src/main/proto/model/taskdef.proto b/grpc/src/main/proto/model/taskdef.proto index 43c086c9e..e531bcfec 100644 --- a/grpc/src/main/proto/model/taskdef.proto +++ b/grpc/src/main/proto/model/taskdef.proto @@ -37,4 +37,5 @@ message TaskDef { string owner_email = 18; int32 poll_timeout_seconds = 19; int32 backoff_scale_factor = 20; + string base_type = 21; } diff --git a/grpc/src/main/proto/model/upgradeworkflowrequest.proto b/grpc/src/main/proto/model/upgradeworkflowrequest.proto new file mode 100644 index 000000000..f9ebcf890 --- /dev/null +++ b/grpc/src/main/proto/model/upgradeworkflowrequest.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package conductor.proto; + +import "google/protobuf/struct.proto"; + +option java_package = "com.netflix.conductor.proto"; +option java_outer_classname = "UpgradeWorkflowRequestPb"; +option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"; + +message UpgradeWorkflowRequest { + map task_output = 4; + map workflow_input = 3; + int32 version = 2; + string name = 1; +} diff --git a/grpc/src/main/proto/model/workflow.proto b/grpc/src/main/proto/model/workflow.proto index 4c1488aa3..d623a2dce 100644 --- a/grpc/src/main/proto/model/workflow.proto +++ b/grpc/src/main/proto/model/workflow.proto @@ -39,4 +39,5 @@ message Workflow { map variables = 23; int64 last_retried_time = 24; repeated string failed_task_names = 25; + repeated Workflow history = 26; } diff --git a/grpc/src/main/proto/model/workflowdef.proto b/grpc/src/main/proto/model/workflowdef.proto index ddf75e38a..8f496fc34 100644 --- a/grpc/src/main/proto/model/workflowdef.proto +++ b/grpc/src/main/proto/model/workflowdef.proto @@ -1,8 +1,10 @@ syntax = "proto3"; package conductor.proto; +import "model/ratelimitconfig.proto"; import "model/workflowtask.proto"; import "google/protobuf/struct.proto"; +import "model/schemadef.proto"; option java_package = "com.netflix.conductor.proto"; option java_outer_classname = "WorkflowDefPb"; @@ -28,4 +30,8 @@ message WorkflowDef { int64 timeout_seconds = 13; map variables = 14; map input_template = 15; + string workflow_status_listener_sink = 16; + RateLimitConfig rate_limit_config = 17; + SchemaDef input_schema = 18; + SchemaDef output_schema = 19; } diff --git a/grpc/src/main/proto/model/workflowsummary.proto b/grpc/src/main/proto/model/workflowsummary.proto index 7b0e3f652..c48338066 100644 --- a/grpc/src/main/proto/model/workflowsummary.proto +++ b/grpc/src/main/proto/model/workflowsummary.proto @@ -26,4 +26,5 @@ message WorkflowSummary { string external_output_payload_storage_path = 16; int32 priority = 17; repeated string failed_task_names = 18; + string created_by = 19; } diff --git a/grpc/src/main/proto/model/workflowtask.proto b/grpc/src/main/proto/model/workflowtask.proto index 2c35d56dd..501f0e214 100644 --- a/grpc/src/main/proto/model/workflowtask.proto +++ b/grpc/src/main/proto/model/workflowtask.proto @@ -41,5 +41,6 @@ message WorkflowTask { int32 retry_count = 26; string evaluator_type = 27; string expression = 28; - bool permissive = 29; + string join_status = 29; + bool permissive = 30; } From 52f0ddb77acd07c63df51106bd04a8e4b8484fdb Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 1 Jul 2024 01:16:44 -0700 Subject: [PATCH 05/10] updates --- .../metadata/workflow/WorkflowTask.java | 21 ++------ .../conductor/grpc/AbstractProtoMapper.java | 48 +++++++++++++++++++ .../main/proto/model/statechangeevent.proto | 13 +++++ grpc/src/main/proto/model/workflowdef.proto | 9 ++-- grpc/src/main/proto/model/workflowtask.proto | 9 +++- 5 files changed, 78 insertions(+), 22 deletions(-) create mode 100644 grpc/src/main/proto/model/statechangeevent.proto diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index 43ad71ce8..2e42e7319 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -38,9 +38,13 @@ @ProtoMessage public class WorkflowTask { + @ProtoMessage public static class CacheConfig { + @ProtoField(id = 1) private String key; + + @ProtoField(id = 2) private int ttlInSecond; public String getKey() { @@ -172,24 +176,9 @@ public void setTasks(List tasks) { Map of events to be emitted when the task status changed. key can be comma separated values of the status changes prefixed with "on" */ - @ProtoField(id = 29) + // @ProtoField(id = 29) private @Valid Map> onStateChange = new HashMap<>(); - @ProtoMessage(wrapper = true) - public static class StateChangeEventList { - - public List getTasks() { - return events; - } - - public void setTasks(List events) { - this.events = events; - } - - @ProtoField(id = 1) - private List events; - } - @ProtoField(id = 30) private String joinStatus; diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 424da891e..47ccc9d8b 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -16,6 +16,7 @@ import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.metadata.workflow.StateChangeEvent; import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; import com.netflix.conductor.common.metadata.workflow.UpgradeWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; @@ -34,6 +35,7 @@ import com.netflix.conductor.proto.SchemaDefPb; import com.netflix.conductor.proto.SkipTaskRequestPb; import com.netflix.conductor.proto.StartWorkflowRequestPb; +import com.netflix.conductor.proto.StateChangeEventPb; import com.netflix.conductor.proto.SubWorkflowParamsPb; import com.netflix.conductor.proto.TaskDefPb; import com.netflix.conductor.proto.TaskExecLogPb; @@ -593,6 +595,28 @@ public StartWorkflowRequest fromProto(StartWorkflowRequestPb.StartWorkflowReques return to; } + public StateChangeEventPb.StateChangeEvent toProto(StateChangeEvent from) { + StateChangeEventPb.StateChangeEvent.Builder to = StateChangeEventPb.StateChangeEvent.newBuilder(); + if (from.getType() != null) { + to.setType( from.getType() ); + } + for (Map.Entry pair : from.getPayload().entrySet()) { + to.putPayload( pair.getKey(), toProto( pair.getValue() ) ); + } + return to.build(); + } + + public StateChangeEvent fromProto(StateChangeEventPb.StateChangeEvent from) { + StateChangeEvent to = new StateChangeEvent(); + to.setType( from.getType() ); + Map payloadMap = new HashMap(); + for (Map.Entry pair : from.getPayloadMap().entrySet()) { + payloadMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setPayload(payloadMap); + return to; + } + public SubWorkflowParamsPb.SubWorkflowParams toProto(SubWorkflowParams from) { SubWorkflowParamsPb.SubWorkflowParams.Builder to = SubWorkflowParamsPb.SubWorkflowParams.newBuilder(); if (from.getName() != null) { @@ -1310,6 +1334,7 @@ public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { if (from.getOutputSchema() != null) { to.setOutputSchema( toProto( from.getOutputSchema() ) ); } + to.setEnforceSchema( from.isEnforceSchema() ); return to.build(); } @@ -1352,6 +1377,7 @@ public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { if (from.hasOutputSchema()) { to.setOutputSchema( fromProto( from.getOutputSchema() ) ); } + to.setEnforceSchema( from.getEnforceSchema() ); return to; } @@ -1554,6 +1580,9 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { if (from.getJoinStatus() != null) { to.setJoinStatus( from.getJoinStatus() ); } + if (from.getCacheConfig() != null) { + to.setCacheConfig( toProto( from.getCacheConfig() ) ); + } to.setPermissive( from.isPermissive() ); return to.build(); } @@ -1601,10 +1630,29 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { to.setEvaluatorType( from.getEvaluatorType() ); to.setExpression( from.getExpression() ); to.setJoinStatus( from.getJoinStatus() ); + if (from.hasCacheConfig()) { + to.setCacheConfig( fromProto( from.getCacheConfig() ) ); + } to.setPermissive( from.getPermissive() ); return to; } + public WorkflowTaskPb.WorkflowTask.CacheConfig toProto(WorkflowTask.CacheConfig from) { + WorkflowTaskPb.WorkflowTask.CacheConfig.Builder to = WorkflowTaskPb.WorkflowTask.CacheConfig.newBuilder(); + if (from.getKey() != null) { + to.setKey( from.getKey() ); + } + to.setTtlInSecond( from.getTtlInSecond() ); + return to.build(); + } + + public WorkflowTask.CacheConfig fromProto(WorkflowTaskPb.WorkflowTask.CacheConfig from) { + WorkflowTask.CacheConfig to = new WorkflowTask.CacheConfig(); + to.setKey( from.getKey() ); + to.setTtlInSecond( from.getTtlInSecond() ); + return to; + } + public abstract WorkflowTaskPb.WorkflowTask.WorkflowTaskList toProto(List in); public abstract List fromProto(WorkflowTaskPb.WorkflowTask.WorkflowTaskList in); diff --git a/grpc/src/main/proto/model/statechangeevent.proto b/grpc/src/main/proto/model/statechangeevent.proto new file mode 100644 index 000000000..57660ea7b --- /dev/null +++ b/grpc/src/main/proto/model/statechangeevent.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +package conductor.proto; + +import "google/protobuf/struct.proto"; + +option java_package = "com.netflix.conductor.proto"; +option java_outer_classname = "StateChangeEventPb"; +option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"; + +message StateChangeEvent { + string type = 1; + map payload = 2; +} diff --git a/grpc/src/main/proto/model/workflowdef.proto b/grpc/src/main/proto/model/workflowdef.proto index 8f496fc34..c98c6cb25 100644 --- a/grpc/src/main/proto/model/workflowdef.proto +++ b/grpc/src/main/proto/model/workflowdef.proto @@ -30,8 +30,9 @@ message WorkflowDef { int64 timeout_seconds = 13; map variables = 14; map input_template = 15; - string workflow_status_listener_sink = 16; - RateLimitConfig rate_limit_config = 17; - SchemaDef input_schema = 18; - SchemaDef output_schema = 19; + string workflow_status_listener_sink = 17; + RateLimitConfig rate_limit_config = 18; + SchemaDef input_schema = 19; + SchemaDef output_schema = 20; + bool enforce_schema = 21; } diff --git a/grpc/src/main/proto/model/workflowtask.proto b/grpc/src/main/proto/model/workflowtask.proto index 501f0e214..0bee4ce44 100644 --- a/grpc/src/main/proto/model/workflowtask.proto +++ b/grpc/src/main/proto/model/workflowtask.proto @@ -10,6 +10,10 @@ option java_outer_classname = "WorkflowTaskPb"; option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model"; message WorkflowTask { + message CacheConfig { + string key = 1; + int32 ttl_in_second = 2; + } message WorkflowTaskList { repeated WorkflowTask tasks = 1; } @@ -41,6 +45,7 @@ message WorkflowTask { int32 retry_count = 26; string evaluator_type = 27; string expression = 28; - string join_status = 29; - bool permissive = 30; + string join_status = 30; + WorkflowTask.CacheConfig cache_config = 31; + bool permissive = 32; } From 8947e6732cf30c6451c953499071d85dd5218a43 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 1 Jul 2024 01:31:26 -0700 Subject: [PATCH 06/10] back to async --- .../java/com/netflix/conductor/core/execution/tasks/Join.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 23c9acd63..5b0db258b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -126,6 +126,6 @@ public Optional getEvaluationOffset(TaskModel taskModel, long defaultOffse } public boolean isAsync() { - return false; + return true; } } From b0efce7c0e95101e283860484ba5682cc72a7a84 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 1 Jul 2024 17:32:19 -0700 Subject: [PATCH 07/10] update tests --- .../netflix/conductor/test/integration/ForkJoinSpec.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 5c17e816c..7600a648e 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -331,13 +331,13 @@ class ForkJoinSpec extends AbstractSpecification { and: "the workflow is in the failed state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { - status == Workflow.WorkflowStatus.FAILED + status == Workflow.WorkflowStatus.RUNNING tasks.size() == 4 tasks[1].status == Task.Status.FAILED tasks[1].taskType == 'integration_task_1' tasks[2].status == Task.Status.COMPLETED tasks[2].taskType == 'integration_task_2' - tasks[3].status == Task.Status.FAILED + tasks[3].status == Task.Status.IN_PROGRESS tasks[3].taskType == 'JOIN' } From 81ab5ee8b15622466c4a53b488dc3b8cdbb2ace4 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Tue, 2 Jul 2024 11:23:02 -0700 Subject: [PATCH 08/10] add AfterburnerModule --- .../netflix/conductor/common/config/ObjectMapperProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java index 7f2b64d96..6b976a30c 100644 --- a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java +++ b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java @@ -12,6 +12,7 @@ */ package com.netflix.conductor.common.config; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import com.netflix.conductor.common.jackson.JsonProtoModule; import com.fasterxml.jackson.annotation.JsonInclude; @@ -56,6 +57,7 @@ private static ObjectMapper _getObjectMapper() { objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.registerModule(new JsonProtoModule()); objectMapper.registerModule(new JavaTimeModule()); + objectMapper.registerModule(new AfterburnerModule()); return objectMapper; } } From ac6f13f735156f10999e054ad2787fbcc82c62d6 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Tue, 2 Jul 2024 11:36:56 -0700 Subject: [PATCH 09/10] formatting --- .../netflix/conductor/common/config/ObjectMapperProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java index 6b976a30c..5e3a5562c 100644 --- a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java +++ b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java @@ -12,7 +12,6 @@ */ package com.netflix.conductor.common.config; -import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import com.netflix.conductor.common.jackson.JsonProtoModule; import com.fasterxml.jackson.annotation.JsonInclude; @@ -20,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; /** * A Factory class for creating a customized {@link ObjectMapper}. This is only used by the From bf024b8f272b60eb0e69d99a54f21912ea321915 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Tue, 2 Jul 2024 12:20:19 -0700 Subject: [PATCH 10/10] fix tests --- .../integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy index 6152ce355..c1c93f304 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy @@ -81,6 +81,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { correlationId, input, null) then: "verify that the workflow is in a RUNNING state" + workflowExecutor.decide(rootWorkflowId) with(workflowExecutionService.getExecutionStatus(rootWorkflowId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 4 @@ -115,6 +116,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { and: "verify that the mid-level workflow is RUNNING, and first task is in SCHEDULED state" midLevelWorkflowId = rootWorkflowInstance.tasks[1].subWorkflowId + workflowExecutor.decide(midLevelWorkflowId) with(workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 4