Skip to content

Commit

Permalink
updated nats and nats streaming libraries (Netflix#39)
Browse files Browse the repository at this point in the history
fixed default queues maps key type

Co-authored-by: astelmashenko <[email protected]>
  • Loading branch information
astelmashenko and astelmashenko authored May 10, 2022
1 parent 7b70f53 commit 8c77227
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 76 deletions.
3 changes: 2 additions & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ ext {
revKafka = '2.6.0'
revMicrometer = '1.6.2'
revMockServerClient = '5.12.0'
revNatsStreaming = '0.5.0'
revNatsStreaming = '2.2.3'
revNats = '2.6.5'
revOpenapi = '1.6.+'
revPowerMock = '2.0.9'
revPrometheus = '0.9.0'
Expand Down
1 change: 1 addition & 0 deletions event-queue/nats/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dependencies {
implementation "com.netflix.conductor:conductor-core:${revConductor}"

implementation "io.nats:java-nats-streaming:${revNatsStreaming}"
implementation "io.nats:jnats:${revNats}"

implementation "org.apache.commons:commons-lang3:"
implementation "com.google.guava:guava:${revGuava}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,37 @@
*/
package com.netflix.conductor.contribs.queue.nats;

import io.nats.client.Nats;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Subscription;
import rx.Scheduler;

/** @author Oleksiy Lysak */
public class NATSObservableQueue extends NATSAbstractQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(NATSObservableQueue.class);
private final ConnectionFactory fact;
private Subscription subs;
private Connection conn;

public NATSObservableQueue(ConnectionFactory factory, String queueURI, Scheduler scheduler) {
public NATSObservableQueue(String queueURI, Scheduler scheduler) {
super(queueURI, "nats", scheduler);
this.fact = factory;
open();
}

@Override
public boolean isConnected() {
return (conn != null && conn.isConnected());
return (conn != null && Connection.Status.CONNECTED.equals(conn.getStatus()));
}

@Override
public void connect() {
try {
Connection temp = fact.createConnection();
Connection temp = Nats.connect();
LOGGER.info("Successfully connected for " + queueURI);
temp.setReconnectedCallback(
(event) -> LOGGER.warn("onReconnect. Reconnected back for " + queueURI));
temp.setDisconnectedCallback(
(event -> LOGGER.warn("onDisconnect. Disconnected for " + queueURI)));
conn = temp;
} catch (Exception e) {
LOGGER.error("Unable to establish nats connection for " + queueURI, e);
Expand All @@ -70,13 +64,13 @@ public void subscribe() {
"No subscription. Creating a queue subscription. subject={}, queue={}",
subject,
queue);
subs =
conn.subscribe(
subject, queue, msg -> onMessage(msg.getSubject(), msg.getData()));
conn.createDispatcher(msg -> onMessage(msg.getSubject(), msg.getData()));
subs = conn.subscribe(subject, queue);
} else {
LOGGER.info(
"No subscription. Creating a pub/sub subscription. subject={}", subject);
subs = conn.subscribe(subject, msg -> onMessage(msg.getSubject(), msg.getData()));
conn.createDispatcher(msg -> onMessage(msg.getSubject(), msg.getData()));
subs = conn.subscribe(subject);
}
} catch (Exception ex) {
LOGGER.error(
Expand All @@ -95,7 +89,7 @@ public void publish(String subject, byte[] data) throws Exception {
public void closeSubs() {
if (subs != null) {
try {
subs.close();
subs.unsubscribe();
} catch (Exception ex) {
LOGGER.error("closeSubs failed with " + ex.getMessage() + " for " + queueURI, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@
*/
package com.netflix.conductor.contribs.queue.nats;

import java.util.UUID;

import io.nats.client.Connection;
import io.nats.streaming.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import rx.Scheduler;

import java.util.UUID;

/** @author Oleksiy Lysak */
public class NATSStreamObservableQueue extends NATSAbstractQueue {

Expand All @@ -39,10 +36,11 @@ public NATSStreamObservableQueue(
String queueURI,
Scheduler scheduler) {
super(queueURI, "nats_stream", scheduler);
this.fact = new StreamingConnectionFactory();
this.fact.setClusterId(clusterId);
this.fact.setClientId(UUID.randomUUID().toString());
this.fact.setNatsUrl(natsUrl);
Options.Builder options = new Options.Builder();
options.clusterId(clusterId);
options.clientId(UUID.randomUUID().toString());
options.natsUrl(natsUrl);
this.fact = new StreamingConnectionFactory(options.build());
this.durableName = durableName;
open();
}
Expand All @@ -51,21 +49,14 @@ public NATSStreamObservableQueue(
public boolean isConnected() {
return (conn != null
&& conn.getNatsConnection() != null
&& conn.getNatsConnection().isConnected());
&& Connection.Status.CONNECTED.equals(conn.getNatsConnection().getStatus()));
}

@Override
public void connect() {
try {
StreamingConnection temp = fact.createConnection();
LOGGER.info("Successfully connected for " + queueURI);
temp.getNatsConnection()
.setReconnectedCallback(
(event) ->
LOGGER.warn("onReconnect. Reconnected back for " + queueURI));
temp.getNatsConnection()
.setDisconnectedCallback(
(event -> LOGGER.warn("onDisconnect. Disconnected for " + queueURI)));
conn = temp;
} catch (Exception e) {
LOGGER.error("Unable to establish nats streaming connection for " + queueURI, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;

import io.nats.client.ConnectionFactory;
import rx.Scheduler;

/** @author Oleksiy Lysak */
Expand All @@ -33,31 +32,10 @@ public class NATSEventQueueProvider implements EventQueueProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(NATSEventQueueProvider.class);

protected Map<String, NATSObservableQueue> queues = new ConcurrentHashMap<>();
private final ConnectionFactory factory;
private final Scheduler scheduler;

public NATSEventQueueProvider(Environment environment, Scheduler scheduler) {
this.scheduler = scheduler;
LOGGER.info("NATS Event Queue Provider init");

// Init NATS API. Handle "io_nats" and "io.nats" ways to specify parameters
Properties props = new Properties();
Properties temp = new Properties();
temp.putAll(System.getenv());
temp.putAll(System.getProperties());
temp.forEach(
(k, v) -> {
String key = k.toString();
String val = v.toString();

if (key.startsWith("io_nats")) {
key = key.replace("_", ".");
}
props.put(key, environment.getProperty(key, val));
});

// Init NATS API
factory = new ConnectionFactory(props);
LOGGER.info("NATS Event Queue Provider initialized...");
}

Expand All @@ -70,8 +48,7 @@ public String getQueueType() {
@NonNull
public ObservableQueue getQueue(String queueURI) {
NATSObservableQueue queue =
queues.computeIfAbsent(
queueURI, q -> new NATSObservableQueue(factory, queueURI, scheduler));
queues.computeIfAbsent(queueURI, q -> new NATSObservableQueue(queueURI, scheduler));
if (queue.isClosed()) {
queue.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@
*/
package com.netflix.conductor.contribs.queue.nats.config;

import java.util.HashMap;
import java.util.Map;

import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.model.TaskModel;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.contribs.queue.nats.NATSStreamObservableQueue;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;

import rx.Scheduler;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableConfigurationProperties(NATSStreamProperties.class)
@ConditionalOnProperty(name = "conductor.event-queues.nats-stream.enabled", havingValue = "true")
Expand All @@ -41,17 +39,17 @@ public EventQueueProvider natsEventQueueProvider(

@ConditionalOnProperty(name = "conductor.default-event-queue.type", havingValue = "nats_stream")
@Bean
public Map<Task.Status, ObservableQueue> getQueues(
public Map<TaskModel.Status, ObservableQueue> getQueues(
ConductorProperties conductorProperties,
NATSStreamProperties properties,
Scheduler scheduler) {
String stack = "";
if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
stack = conductorProperties.getStack() + "_";
}
Task.Status[] statuses = new Task.Status[] {Task.Status.COMPLETED, Task.Status.FAILED};
Map<Task.Status, ObservableQueue> queues = new HashMap<>();
for (Task.Status status : statuses) {
TaskModel.Status[] statuses = new TaskModel.Status[] {TaskModel.Status.COMPLETED, TaskModel.Status.FAILED};
Map<TaskModel.Status, ObservableQueue> queues = new HashMap<>();
for (TaskModel.Status status : statuses) {
String queuePrefix =
StringUtils.isBlank(properties.getListenerQueuePrefix())
? conductorProperties.getAppId() + "_nats_stream_notify_" + stack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
*/
package com.netflix.conductor.contribs.queue.nats.config;

import io.nats.client.Options;
import org.springframework.boot.context.properties.ConfigurationProperties;

import io.nats.client.Nats;

@ConfigurationProperties("conductor.event-queues.nats-stream")
public class NATSStreamProperties {

Expand All @@ -25,7 +24,7 @@ public class NATSStreamProperties {
private String durableName = null;

/** The NATS connection url */
private String url = Nats.DEFAULT_URL;
private String url = Options.DEFAULT_URL;

/** The prefix to be used for the default listener queues */
private String listenerQueuePrefix = "";
Expand Down

0 comments on commit 8c77227

Please sign in to comment.