Skip to content

Commit

Permalink
enable always to publish workflow status
Browse files Browse the repository at this point in the history
  • Loading branch information
darkobuvac committed Jan 4, 2024
1 parent a581ab9 commit 4c86a69
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.netflix.conductor.contribs.queue.amqp.AMQPConnection;
import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.rabbitmq.listener.TaskStatusPublisherRabbitMQ;
import com.netflix.conductor.rabbitmq.listener.WorkflowStatusPublisherRabbitMQ;
import com.netflix.conductor.rabbitmq.services.RabbitMQService;
Expand Down Expand Up @@ -62,7 +64,7 @@ public RabbitMQService rabbitMQService(
havingValue = "rabbitmq",
matchIfMissing = false)
@Bean
public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ(
public WorkflowStatusListener workflowStatusListenerRabbitMQ(
RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties);
}
Expand All @@ -72,7 +74,7 @@ public WorkflowStatusPublisherRabbitMQ workflowStatusListenerRabbitMQ(
havingValue = "rabbitmq",
matchIfMissing = false)
@Bean
public TaskStatusPublisherRabbitMQ taskStatusPublisherRabbitMQ(
public TaskStatusListener taskStatusPublisherRabbitMQ(
RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
return new TaskStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@ConfigurationProperties("conductor.message-publisher.rabbitmq")
public class RabbitMQProperties {
private String hosts = ConnectionFactory.DEFAULT_HOST;
private String username = ConnectionFactory.DEFAULT_HOST;
private String username = ConnectionFactory.DEFAULT_USER;
private String password = ConnectionFactory.DEFAULT_PASS;

private int port = ConnectionFactory.DEFAULT_AMQP_PORT;
Expand All @@ -37,6 +37,8 @@ public class RabbitMQProperties {
private String workflowStatusExchange;
private String taskStatusExchange;

private boolean alwaysPublishWorkflowStatusEnabled = true;

public String getHosts() {
return hosts;
}
Expand Down Expand Up @@ -124,4 +126,12 @@ public List<String> getAllowedTaskStatuses() {
public void setAllowedTaskStatuses(String allowedTaskStatuses) {
this.allowedTaskStatuses = allowedTaskStatuses;
}

public boolean isAlwaysPublishWorkflowStatusEnabled() {
return alwaysPublishWorkflowStatusEnabled;
}

public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) {
this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,36 @@ public class WorkflowStatusPublisherRabbitMQ implements WorkflowStatusListener {

private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusPublisherRabbitMQ.class);
private final RabbitMQService rabbitMQService;
private final String EXCHANGE_NAME;
private final RabbitMQProperties rabbitMQProperties;

public WorkflowStatusPublisherRabbitMQ(
RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
this.rabbitMQService = rabbitMQService;
this.EXCHANGE_NAME = rabbitMQProperties.getWorkflowStatusExchange();
this.rabbitMQProperties = rabbitMQProperties;
}

@Override
public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) {
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
|| rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) {
onWorkflowCompleted(workflow);
}
}

@Override
public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) {
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
|| rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) {
onWorkflowTerminated(workflow);
}
}

@Override
public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) {
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
|| rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) {
onWorkflowFinalized(workflow);
}
}

@Override
Expand All @@ -42,12 +66,20 @@ public void onWorkflowTerminated(WorkflowModel workflow) {
publishMessage(workflow);
}

@Override
public void onWorkflowFinalized(WorkflowModel workflow) {
publishMessage(workflow);
}

private void publishMessage(WorkflowModel workflow) {
try {
rabbitMQService.publishMessage(EXCHANGE_NAME, workflow);
rabbitMQService.publishMessage(
rabbitMQProperties.getWorkflowStatusExchange(), workflow);
} catch (Exception e) {
LOGGER.error(
"Failed to publish message to exchange: {}. Exception: {}", EXCHANGE_NAME, e);
"Failed to publish message to exchange: {}. Exception: {}",
rabbitMQProperties.getWorkflowStatusExchange(),
e);
throw new RuntimeException(e);
}
}
Expand Down

0 comments on commit 4c86a69

Please sign in to comment.