Skip to content

Commit

Permalink
Merge pull request #189 from astelmashenko/feature/nats-jetstream-con…
Browse files Browse the repository at this point in the history
…fig-props

added additional configuration properties, error listener, proper han…
  • Loading branch information
v1r3n authored Jun 21, 2024
2 parents 3e909ab + 9ccf28e commit aefeea5
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 54 deletions.
2 changes: 1 addition & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ ext {
revCodec = '1.15'
revAzureStorageBlobSdk = '12.7.0'
revNatsStreaming = '2.6.5'
revNats = '2.15.6'
revNats = '2.16.14'
revStan = '2.2.3'

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,20 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationEventPublisher;

import com.netflix.conductor.contribs.queue.nats.config.JetStreamProperties;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.*;
import io.nats.client.api.*;
import rx.Observable;
import rx.Scheduler;

Expand All @@ -58,31 +52,49 @@ public class JetStreamObservableQueue implements ObservableQueue {
private final JetStreamProperties properties;
private final Scheduler scheduler;
private final AtomicBoolean running = new AtomicBoolean(false);
private final ApplicationEventPublisher eventPublisher;
private Connection nc;
private JetStreamSubscription sub;
private Observable<Long> interval;
private final String queueGroup;

public JetStreamObservableQueue(
ConductorProperties conductorProperties,
JetStreamProperties properties,
String queueType,
String queueUri,
Scheduler scheduler) {
Scheduler scheduler,
ApplicationEventPublisher eventPublisher) {
LOG.debug("JSM obs queue create, qtype={}, quri={}", queueType, queueUri);

this.queueUri = queueUri;
// If queue specified (e.g. subject:queue) - split to subject & queue
if (queueUri.contains(":")) {
this.subject = queueUri.substring(0, queueUri.indexOf(':'));
this.subject =
getQueuePrefix(conductorProperties, properties)
+ queueUri.substring(0, queueUri.indexOf(':'));
queueGroup = queueUri.substring(queueUri.indexOf(':') + 1);
} else {
this.subject = queueUri;
this.subject = getQueuePrefix(conductorProperties, properties) + queueUri;
queueGroup = null;
}

this.queueType = queueType;
this.properties = properties;
this.scheduler = scheduler;
this.eventPublisher = eventPublisher;
}

public static String getQueuePrefix(
ConductorProperties conductorProperties, JetStreamProperties properties) {
String stack = "";
if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
stack = conductorProperties.getStack() + "_";
}

return StringUtils.isBlank(properties.getListenerQueuePrefix())
? conductorProperties.getAppId() + "_jsm_notify_" + stack
: properties.getListenerQueuePrefix();
}

@Override
Expand Down Expand Up @@ -211,11 +223,19 @@ private void natsConnect() {
.connectionListener(
(conn, type) -> {
LOG.info("Connection to JSM updated: {}", type);
if (ConnectionListener.Events.CLOSED.equals(type)) {
LOG.error(
"Could not reconnect to NATS! Changing liveness status to {}!",
LivenessState.BROKEN);
AvailabilityChangeEvent.publish(
eventPublisher, type, LivenessState.BROKEN);
}
this.nc = conn;
subscribeOnce(conn, type);
})
.errorListener(new LoggingNatsErrorListener())
.server(properties.getUrl())
.maxReconnects(-1)
.maxReconnects(properties.getMaxReconnects())
.build(),
true);
} catch (InterruptedException e) {
Expand All @@ -224,43 +244,71 @@ private void natsConnect() {
}
}

private void createStream(Connection nc) {
JetStreamManagement jsm;
try {
jsm = nc.jetStreamManagement();
} catch (IOException e) {
throw new NatsException("Failed to get jsm management", e);
}

private void createStream(JetStreamManagement jsm) {
StreamConfiguration streamConfig =
StreamConfiguration.builder()
.name(subject)
.retentionPolicy(RetentionPolicy.WorkQueue)
.replicas(properties.getReplicas())
.retentionPolicy(RetentionPolicy.Limits)
.maxBytes(properties.getStreamMaxBytes())
.storageType(StorageType.get(properties.getStreamStorageType()))
.build();

try {
StreamInfo streamInfo = jsm.addStream(streamConfig);
LOG.debug("Create stream, info: {}", streamInfo);
LOG.debug("Updated stream, info: {}", streamInfo);
} catch (IOException | JetStreamApiException e) {
LOG.error("Failed to add stream: " + streamConfig, e);
AvailabilityChangeEvent.publish(eventPublisher, e, LivenessState.BROKEN);
}
}

private void subscribeOnce(Connection nc, ConnectionListener.Events type) {
if (type.equals(ConnectionListener.Events.CONNECTED)
|| type.equals(ConnectionListener.Events.RECONNECTED)) {
createStream(nc);
subscribe(nc);
JetStreamManagement jsm;
try {
jsm = nc.jetStreamManagement();
} catch (IOException e) {
throw new NatsException("Failed to get jsm management", e);
}
createStream(jsm);
var consumerConfig = createConsumer(jsm);
subscribe(nc, consumerConfig);
}
}

private void subscribe(Connection nc) {
private ConsumerConfiguration createConsumer(JetStreamManagement jsm) {
ConsumerConfiguration consumerConfig =
ConsumerConfiguration.builder()
.name(properties.getDurableName())
.deliverGroup(queueGroup)
.durable(properties.getDurableName())
.ackWait(properties.getAckWait())
.maxDeliver(properties.getMaxDeliver())
.maxAckPending(properties.getMaxAckPending())
.ackPolicy(AckPolicy.Explicit)
.deliverSubject(subject + "-deliver")
.deliverPolicy(DeliverPolicy.New)
.build();

try {
jsm.addOrUpdateConsumer(subject, consumerConfig);
return consumerConfig;
} catch (IOException | JetStreamApiException e) {
throw new NatsException("Failed to add/update consumer", e);
}
}

private void subscribe(Connection nc, ConsumerConfiguration consumerConfig) {
try {
JetStream js = nc.jetStream();

PushSubscribeOptions pso =
PushSubscribeOptions.builder().durable(properties.getDurableName()).build();
PushSubscribeOptions.builder().configuration(consumerConfig).stream(subject)
.bind(true)
.build();

LOG.debug("Subscribing jsm, subject={}, options={}", subject, pso);
sub =
js.subscribe(
Expand All @@ -270,7 +318,7 @@ private void subscribe(Connection nc) {
msg -> {
var message = new JsmMessage();
message.setJsmMsg(msg);
message.setId(msg.getSID());
message.setId(NUID.nextGlobal());
message.setPayload(new String(msg.getData()));
messages.add(message);
},
Expand All @@ -279,7 +327,7 @@ private void subscribe(Connection nc) {
LOG.debug("Subscribed successfully {}", sub.getConsumerInfo());
this.running.set(true);
} catch (IOException | JetStreamApiException e) {
LOG.error("Failed to subscribe", e);
throw new NatsException("Failed to subscribe", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.queue.nats;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.nats.client.Connection;
import io.nats.client.ErrorListener;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;

public class LoggingNatsErrorListener implements ErrorListener {
private static final Logger LOG = LoggerFactory.getLogger(LoggingNatsErrorListener.class);

@Override
public void errorOccurred(Connection conn, String error) {
LOG.error("Nats connection error occurred: {}", error);
}

@Override
public void exceptionOccurred(Connection conn, Exception exp) {
LOG.error("Nats connection exception occurred", exp);
}

@Override
public void messageDiscarded(Connection conn, Message msg) {
LOG.error("Nats message discarded, SID={}, ", msg.getSID());
}

@Override
public void heartbeatAlarm(
Connection conn,
JetStreamSubscription sub,
long lastStreamSequence,
long lastConsumerSequence) {
LOG.warn("Heartbit missed, subject={}", sub.getSubject());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import java.util.EnumMap;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -37,32 +37,25 @@
public class JetStreamConfiguration {
@Bean
public EventQueueProvider jsmEventQueueProvider(
JetStreamProperties properties, Scheduler scheduler) {
return new JetStreamEventQueueProvider(properties, scheduler);
JetStreamProperties properties,
Scheduler scheduler,
ConductorProperties conductorProperties,
ApplicationEventPublisher eventPublisher) {
return new JetStreamEventQueueProvider(
conductorProperties, properties, scheduler, eventPublisher);
}

@ConditionalOnProperty(name = "conductor.default-event-queue.type", havingValue = "jsm")
@Bean
public Map<TaskModel.Status, ObservableQueue> getQueues(
JetStreamEventQueueProvider provider,
ConductorProperties conductorProperties,
JetStreamProperties properties) {
String stack = "";
if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
stack = conductorProperties.getStack() + "_";
}
EventQueueProvider jsmEventQueueProvider, JetStreamProperties properties) {
TaskModel.Status[] statuses =
new TaskModel.Status[] {TaskModel.Status.COMPLETED, TaskModel.Status.FAILED};
Map<TaskModel.Status, ObservableQueue> queues = new EnumMap<>(TaskModel.Status.class);
for (TaskModel.Status status : statuses) {
String queuePrefix =
StringUtils.isBlank(properties.getListenerQueuePrefix())
? conductorProperties.getAppId() + "_jsm_notify_" + stack
: properties.getListenerQueuePrefix();

String queueName = queuePrefix + status.name() + getQueueGroup(properties);
String queueName = status.name() + getQueueGroup(properties);

ObservableQueue queue = provider.getQueue(queueName);
ObservableQueue queue = jsmEventQueueProvider.getQueue(queueName);
queues.put(status, queue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.lang.NonNull;

import com.netflix.conductor.contribs.queue.nats.JetStreamObservableQueue;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;

Expand All @@ -33,12 +35,20 @@ public class JetStreamEventQueueProvider implements EventQueueProvider {
private static final Logger LOG = LoggerFactory.getLogger(JetStreamEventQueueProvider.class);
private final Map<String, ObservableQueue> queues = new ConcurrentHashMap<>();
private final JetStreamProperties properties;
private final ConductorProperties conductorProperties;
private final Scheduler scheduler;
private final ApplicationEventPublisher eventPublisher;

public JetStreamEventQueueProvider(JetStreamProperties properties, Scheduler scheduler) {
public JetStreamEventQueueProvider(
ConductorProperties conductorProperties,
JetStreamProperties properties,
Scheduler scheduler,
ApplicationEventPublisher eventPublisher) {
LOG.info("NATS Event Queue Provider initialized...");
this.properties = properties;
this.conductorProperties = conductorProperties;
this.scheduler = scheduler;
this.eventPublisher = eventPublisher;
}

@Override
Expand All @@ -49,9 +59,16 @@ public String getQueueType() {
@Override
@NonNull
public ObservableQueue getQueue(String queueURI) throws IllegalArgumentException {
LOG.debug("Getting obs queue, quri={}", queueURI);
LOG.info("Getting obs queue, quri={}", queueURI);
return queues.computeIfAbsent(
queueURI,
q -> new JetStreamObservableQueue(properties, getQueueType(), queueURI, scheduler));
q ->
new JetStreamObservableQueue(
conductorProperties,
properties,
getQueueType(),
queueURI,
scheduler,
eventPublisher));
}
}
Loading

0 comments on commit aefeea5

Please sign in to comment.