diff --git a/dependencies.gradle b/dependencies.gradle index 9ded83e5c..362d4d6a7 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -30,7 +30,8 @@ ext { revKafka = '2.6.0' revMicrometer = '1.6.2' revMockServerClient = '5.12.0' - revNatsStreaming = '0.5.0' + revNatsStreaming = '2.2.3' + revNats = '2.6.5' revOpenapi = '1.6.+' revPowerMock = '2.0.9' revPrometheus = '0.9.0' diff --git a/event-queue/nats/build.gradle b/event-queue/nats/build.gradle index d0c286802..a5e8f3fc2 100644 --- a/event-queue/nats/build.gradle +++ b/event-queue/nats/build.gradle @@ -3,6 +3,7 @@ dependencies { implementation "com.netflix.conductor:conductor-core:${revConductor}" implementation "io.nats:java-nats-streaming:${revNatsStreaming}" + implementation "io.nats:jnats:${revNats}" implementation "org.apache.commons:commons-lang3:" implementation "com.google.guava:guava:${revGuava}" diff --git a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSObservableQueue.java b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSObservableQueue.java index 9ad8c49b9..6b5affe8e 100644 --- a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSObservableQueue.java +++ b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSObservableQueue.java @@ -11,12 +11,12 @@ */ package com.netflix.conductor.contribs.queue.nats; +import io.nats.client.Nats; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.nats.client.Connection; -import io.nats.client.ConnectionFactory; import io.nats.client.Subscription; import rx.Scheduler; @@ -24,30 +24,24 @@ public class NATSObservableQueue extends NATSAbstractQueue { private static final Logger LOGGER = LoggerFactory.getLogger(NATSObservableQueue.class); - private final ConnectionFactory fact; private Subscription subs; private Connection conn; - public NATSObservableQueue(ConnectionFactory factory, String queueURI, Scheduler scheduler) { + public NATSObservableQueue(String queueURI, Scheduler scheduler) { super(queueURI, "nats", scheduler); - this.fact = factory; open(); } @Override public boolean isConnected() { - return (conn != null && conn.isConnected()); + return (conn != null && Connection.Status.CONNECTED.equals(conn.getStatus())); } @Override public void connect() { try { - Connection temp = fact.createConnection(); + Connection temp = Nats.connect(); LOGGER.info("Successfully connected for " + queueURI); - temp.setReconnectedCallback( - (event) -> LOGGER.warn("onReconnect. Reconnected back for " + queueURI)); - temp.setDisconnectedCallback( - (event -> LOGGER.warn("onDisconnect. Disconnected for " + queueURI))); conn = temp; } catch (Exception e) { LOGGER.error("Unable to establish nats connection for " + queueURI, e); @@ -70,13 +64,13 @@ public void subscribe() { "No subscription. Creating a queue subscription. subject={}, queue={}", subject, queue); - subs = - conn.subscribe( - subject, queue, msg -> onMessage(msg.getSubject(), msg.getData())); + conn.createDispatcher(msg -> onMessage(msg.getSubject(), msg.getData())); + subs = conn.subscribe(subject, queue); } else { LOGGER.info( "No subscription. Creating a pub/sub subscription. subject={}", subject); - subs = conn.subscribe(subject, msg -> onMessage(msg.getSubject(), msg.getData())); + conn.createDispatcher(msg -> onMessage(msg.getSubject(), msg.getData())); + subs = conn.subscribe(subject); } } catch (Exception ex) { LOGGER.error( @@ -95,7 +89,7 @@ public void publish(String subject, byte[] data) throws Exception { public void closeSubs() { if (subs != null) { try { - subs.close(); + subs.unsubscribe(); } catch (Exception ex) { LOGGER.error("closeSubs failed with " + ex.getMessage() + " for " + queueURI, ex); } diff --git a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSStreamObservableQueue.java b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSStreamObservableQueue.java index a6de0237c..22d8641ca 100644 --- a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSStreamObservableQueue.java +++ b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSStreamObservableQueue.java @@ -11,18 +11,15 @@ */ package com.netflix.conductor.contribs.queue.nats; -import java.util.UUID; - +import io.nats.client.Connection; +import io.nats.streaming.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import io.nats.streaming.StreamingConnection; -import io.nats.streaming.StreamingConnectionFactory; -import io.nats.streaming.Subscription; -import io.nats.streaming.SubscriptionOptions; import rx.Scheduler; +import java.util.UUID; + /** @author Oleksiy Lysak */ public class NATSStreamObservableQueue extends NATSAbstractQueue { @@ -39,10 +36,11 @@ public NATSStreamObservableQueue( String queueURI, Scheduler scheduler) { super(queueURI, "nats_stream", scheduler); - this.fact = new StreamingConnectionFactory(); - this.fact.setClusterId(clusterId); - this.fact.setClientId(UUID.randomUUID().toString()); - this.fact.setNatsUrl(natsUrl); + Options.Builder options = new Options.Builder(); + options.clusterId(clusterId); + options.clientId(UUID.randomUUID().toString()); + options.natsUrl(natsUrl); + this.fact = new StreamingConnectionFactory(options.build()); this.durableName = durableName; open(); } @@ -51,7 +49,7 @@ public NATSStreamObservableQueue( public boolean isConnected() { return (conn != null && conn.getNatsConnection() != null - && conn.getNatsConnection().isConnected()); + && Connection.Status.CONNECTED.equals(conn.getNatsConnection().getStatus())); } @Override @@ -59,13 +57,6 @@ public void connect() { try { StreamingConnection temp = fact.createConnection(); LOGGER.info("Successfully connected for " + queueURI); - temp.getNatsConnection() - .setReconnectedCallback( - (event) -> - LOGGER.warn("onReconnect. Reconnected back for " + queueURI)); - temp.getNatsConnection() - .setDisconnectedCallback( - (event -> LOGGER.warn("onDisconnect. Disconnected for " + queueURI))); conn = temp; } catch (Exception e) { LOGGER.error("Unable to establish nats streaming connection for " + queueURI, e); diff --git a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java index 6ae3b1670..59e26a14d 100644 --- a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java +++ b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSEventQueueProvider.java @@ -24,7 +24,6 @@ import com.netflix.conductor.core.events.EventQueueProvider; import com.netflix.conductor.core.events.queue.ObservableQueue; -import io.nats.client.ConnectionFactory; import rx.Scheduler; /** @author Oleksiy Lysak */ @@ -33,31 +32,10 @@ public class NATSEventQueueProvider implements EventQueueProvider { private static final Logger LOGGER = LoggerFactory.getLogger(NATSEventQueueProvider.class); protected Map queues = new ConcurrentHashMap<>(); - private final ConnectionFactory factory; private final Scheduler scheduler; public NATSEventQueueProvider(Environment environment, Scheduler scheduler) { this.scheduler = scheduler; - LOGGER.info("NATS Event Queue Provider init"); - - // Init NATS API. Handle "io_nats" and "io.nats" ways to specify parameters - Properties props = new Properties(); - Properties temp = new Properties(); - temp.putAll(System.getenv()); - temp.putAll(System.getProperties()); - temp.forEach( - (k, v) -> { - String key = k.toString(); - String val = v.toString(); - - if (key.startsWith("io_nats")) { - key = key.replace("_", "."); - } - props.put(key, environment.getProperty(key, val)); - }); - - // Init NATS API - factory = new ConnectionFactory(props); LOGGER.info("NATS Event Queue Provider initialized..."); } @@ -70,8 +48,7 @@ public String getQueueType() { @NonNull public ObservableQueue getQueue(String queueURI) { NATSObservableQueue queue = - queues.computeIfAbsent( - queueURI, q -> new NATSObservableQueue(factory, queueURI, scheduler)); + queues.computeIfAbsent(queueURI, q -> new NATSObservableQueue(queueURI, scheduler)); if (queue.isClosed()) { queue.open(); } diff --git a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java index 3538b91f8..bf1762018 100644 --- a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java +++ b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamConfiguration.java @@ -11,23 +11,21 @@ */ package com.netflix.conductor.contribs.queue.nats.config; -import java.util.HashMap; -import java.util.Map; - +import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue; +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.model.TaskModel; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - -import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue; -import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.events.EventQueueProvider; -import com.netflix.conductor.core.events.queue.ObservableQueue; - import rx.Scheduler; +import java.util.HashMap; +import java.util.Map; + @Configuration @EnableConfigurationProperties(NATSStreamProperties.class) @ConditionalOnProperty(name = "conductor.event-queues.nats-stream.enabled", havingValue = "true") @@ -41,7 +39,7 @@ public EventQueueProvider natsEventQueueProvider( @ConditionalOnProperty(name = "conductor.default-event-queue.type", havingValue = "nats_stream") @Bean - public Map getQueues( + public Map getQueues( ConductorProperties conductorProperties, NATSStreamProperties properties, Scheduler scheduler) { @@ -49,9 +47,9 @@ public Map getQueues( if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) { stack = conductorProperties.getStack() + "_"; } - Task.Status[] statuses = new Task.Status[] {Task.Status.COMPLETED, Task.Status.FAILED}; - Map queues = new HashMap<>(); - for (Task.Status status : statuses) { + TaskModel.Status[] statuses = new TaskModel.Status[] {TaskModel.Status.COMPLETED, TaskModel.Status.FAILED}; + Map queues = new HashMap<>(); + for (TaskModel.Status status : statuses) { String queuePrefix = StringUtils.isBlank(properties.getListenerQueuePrefix()) ? conductorProperties.getAppId() + "_nats_stream_notify_" + stack diff --git a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java index f9b2d9dac..e7810b185 100644 --- a/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java +++ b/event-queue/nats/src/main/java/com/netflix/conductor/contribs/queue/nats/config/NATSStreamProperties.java @@ -11,10 +11,9 @@ */ package com.netflix.conductor.contribs.queue.nats.config; +import io.nats.client.Options; import org.springframework.boot.context.properties.ConfigurationProperties; -import io.nats.client.Nats; - @ConfigurationProperties("conductor.event-queues.nats-stream") public class NATSStreamProperties { @@ -25,7 +24,7 @@ public class NATSStreamProperties { private String durableName = null; /** The NATS connection url */ - private String url = Nats.DEFAULT_URL; + private String url = Options.DEFAULT_URL; /** The prefix to be used for the default listener queues */ private String listenerQueuePrefix = "";