From 9ffcabcee3d9c5466259fd780593c4f821811ca3 Mon Sep 17 00:00:00 2001 From: Venkat Reddy Thangella Date: Thu, 24 Feb 2022 04:16:25 +0530 Subject: [PATCH] AMQP enhancements for resiliency and concurrency (#2587) * AMQP messages will generate the observable change * Reverting unnecessary changes * Capturing the only changes required for AMQP messagres reading * Capturing the only changes required for AMQP messagres reading * Support for sequential and parallel processing of messages * Channel reusability changes * Channel reusability changes * Subscriber channel caching * Max channel changes * Connection and opertion retries * Channel usage for exchanges and publishers * Acknowledge the message even on failures * Resolved conflicts * Apply coding style changes using spotless * Retry on publish and ack failures * Externalizing the redis config * Externalizing the redis config * Make Event System Task Synchronous Event is a task that publishes messages to an external eventing system. The publishing is not a long running task, and publishing message to an async system using async task is not optimal. marking event as sync, improves the overall performance and has negligible overhead given the task completes quite quick. * Update integration tests to reflect Event is synchronous * Getting the latest changes Co-authored-by: Thangella, Venkat Co-authored-by: Viren Baraiya --- .../contribs/queue/amqp/AMQPConnection.java | 400 ++++++++++------ .../queue/amqp/AMQPObservableQueue.java | 435 ++++++++++++++---- .../amqp/config/AMQPEventQueueProperties.java | 116 ++++- .../queue/amqp/config/AMQPRetryPattern.java | 54 +++ .../queue/amqp/util/AMQPConstants.java | 12 + .../queue/amqp/util/AMQPSettings.java | 102 ++-- .../contribs/queue/amqp/util/RetryType.java | 20 + .../amqp/AMQPEventQueueProviderTest.java | 3 +- .../queue/amqp/AMQPObservableQueueTest.java | 140 ++---- .../contribs/queue/amqp/AMQPSettingsTest.java | 3 +- .../core/events/DefaultEventProcessor.java | 25 +- .../conductor/core/execution/tasks/Event.java | 2 +- .../conductor/core/utils/SemaphoreUtil.java | 6 +- .../core/execution/tasks/EventSpec.groovy | 4 +- dependencies.gradle | 2 +- docker/server/config/config.properties | 2 + .../redis/config/RedisProperties.java | 62 +++ .../config/RedisSentinelConfiguration.java | 9 +- .../src/main/resources/application.properties | 19 +- .../test/integration/EventTaskSpec.groovy | 9 +- 20 files changed, 988 insertions(+), 437 deletions(-) create mode 100644 contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java create mode 100644 contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java index 3b566a9a7..ab97c58e2 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java @@ -14,7 +14,10 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -22,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; import com.rabbitmq.client.Address; @@ -42,8 +47,11 @@ public class AMQPConnection { private static AMQPConnection amqpConnection = null; private static final String PUBLISHER = "Publisher"; private static final String SUBSCRIBER = "Subscriber"; - private static final String SEPARATOR = ":"; - Map queueNameToChannel = new ConcurrentHashMap(); + private static final Map> availableChannelPool = + new ConcurrentHashMap>(); + private static final Map subscriberReservedChannelPool = + new ConcurrentHashMap(); + private static AMQPRetryPattern retrySettings = null; private AMQPConnection() {} @@ -53,11 +61,13 @@ private AMQPConnection(final ConnectionFactory factory, final Address[] address) } public static synchronized AMQPConnection getInstance( - final ConnectionFactory factory, final Address[] address) { + final ConnectionFactory factory, + final Address[] address, + final AMQPRetryPattern retrySettings) { if (AMQPConnection.amqpConnection == null) { AMQPConnection.amqpConnection = new AMQPConnection(factory, address); } - + AMQPConnection.retrySettings = retrySettings; return AMQPConnection.amqpConnection; } @@ -71,181 +81,233 @@ public Address[] getAddresses() { } private Connection createConnection(String connectionPrefix) { - - try { - Connection connection = - factory.newConnection( - addresses, System.getenv("HOSTNAME") + "-" + connectionPrefix); - if (connection == null || !connection.isOpen()) { - throw new RuntimeException("Failed to open connection"); + int retryIndex = 1; + while (true) { + try { + Connection connection = + factory.newConnection( + addresses, System.getenv("HOSTNAME") + "-" + connectionPrefix); + if (connection == null || !connection.isOpen()) { + throw new RuntimeException("Failed to open connection"); + } + connection.addShutdownListener( + new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + LOGGER.error( + "Received a shutdown exception for the connection {}. reason {} cause{}", + connection.getClientProvidedName(), + cause.getMessage(), + cause); + } + }); + connection.addBlockedListener( + new BlockedListener() { + @Override + public void handleUnblocked() throws IOException { + LOGGER.info( + "Connection {} is unblocked", + connection.getClientProvidedName()); + } + + @Override + public void handleBlocked(String reason) throws IOException { + LOGGER.error( + "Connection {} is blocked. reason: {}", + connection.getClientProvidedName(), + reason); + } + }); + return connection; + } catch (final IOException e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + final String error = + "IO error while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + final String error = + "Retries completed. IO error while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + retryIndex++; + } catch (final TimeoutException e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + final String error = + "Timeout while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + final String error = + "Retries completed. Timeout while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + retryIndex++; } - - connection.addShutdownListener( - new ShutdownListener() { - @Override - public void shutdownCompleted(ShutdownSignalException cause) { - LOGGER.error( - "Received a shutdown exception for the connection {}. reason {} cause{}", - connection.getClientProvidedName(), - cause.getMessage(), - cause); - } - }); - - connection.addBlockedListener( - new BlockedListener() { - @Override - public void handleUnblocked() throws IOException { - LOGGER.info( - "Connection {} is unblocked", - connection.getClientProvidedName()); - } - - @Override - public void handleBlocked(String reason) throws IOException { - LOGGER.error( - "Connection {} is blocked. reason: {}", - connection.getClientProvidedName(), - reason); - } - }); - - return connection; - } catch (final IOException e) { - final String error = - "IO error while connecting to " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")); - LOGGER.error(error, e); - throw new RuntimeException(error, e); - } catch (final TimeoutException e) { - final String error = - "Timeout while connecting to " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")); - LOGGER.error(error, e); - throw new RuntimeException(error, e); } } - public Channel getOrCreateChannel(ConnectionType connectionType, String queueOrExchangeName) { + public Channel getOrCreateChannel(ConnectionType connectionType, String queueOrExchangeName) + throws Exception { LOGGER.debug( "Accessing the channel for queueOrExchange {} with type {} ", queueOrExchangeName, connectionType); switch (connectionType) { case SUBSCRIBER: - return getOrCreateSubscriberChannel(queueOrExchangeName); - + String subChnName = connectionType + ";" + queueOrExchangeName; + if (subscriberReservedChannelPool.containsKey(subChnName)) { + Channel locChn = subscriberReservedChannelPool.get(subChnName); + if (locChn != null && locChn.isOpen()) { + return locChn; + } + } + synchronized (this) { + if (subscriberConnection == null || !subscriberConnection.isOpen()) { + subscriberConnection = createConnection(SUBSCRIBER); + } + } + Channel subChn = borrowChannel(connectionType, subscriberConnection); + // Add the subscribed channels to Map to avoid messages being acknowledged on + // different from the subscribed one + subscriberReservedChannelPool.put(subChnName, subChn); + return subChn; case PUBLISHER: - return getOrCreatePublisherChannel(queueOrExchangeName); + synchronized (this) { + if (publisherConnection == null || !publisherConnection.isOpen()) { + publisherConnection = createConnection(PUBLISHER); + } + } + return borrowChannel(connectionType, publisherConnection); default: return null; } } - private Channel getOrCreateSubscriberChannel(String queueOrExchangeName) { - - String prefix = SUBSCRIBER + SEPARATOR; - // Return the existing channel if it's still opened - Channel subscriberChannel = queueNameToChannel.get(prefix + queueOrExchangeName); - if (subscriberChannel != null) { - return subscriberChannel; - } + private Channel getOrCreateChannel(ConnectionType connType, Connection rmqConnection) { // Channel creation is required - try { - synchronized (this) { - if (subscriberConnection == null) { - subscriberConnection = createConnection(SUBSCRIBER); + Channel locChn = null; + int retryIndex = 1; + while (true) { + try { + LOGGER.debug("Creating a channel for " + connType); + locChn = rmqConnection.createChannel(); + if (locChn == null || !locChn.isOpen()) { + throw new RuntimeException("Fail to open " + connType + " channel"); } - LOGGER.debug("Creating a channel for subscriber"); - subscriberChannel = subscriberConnection.createChannel(); - subscriberChannel.addShutdownListener( + locChn.addShutdownListener( cause -> { LOGGER.error( - "subscription Channel has been shutdown: {}", + connType + " Channel has been shutdown: {}", cause.getMessage(), cause); }); - if (subscriberChannel == null || !subscriberChannel.isOpen()) { - throw new RuntimeException("Fail to open subscription channel"); + return locChn; + } catch (final IOException e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + throw new RuntimeException( + "Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); } - queueNameToChannel.putIfAbsent(prefix + queueOrExchangeName, subscriberChannel); - } - } catch (final IOException e) { - throw new RuntimeException( - "Cannot open subscription channel on " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")), - e); - } - - return subscriberChannel; - } - - private Channel getOrCreatePublisherChannel(String queueOrExchangeName) { - - String prefix = PUBLISHER + SEPARATOR; - Channel publisherChannel = queueNameToChannel.get(prefix + queueOrExchangeName); - if (publisherChannel != null) { - return publisherChannel; - } - // Channel creation is required - try { - - synchronized (this) { - if (publisherConnection == null) { - publisherConnection = createConnection(PUBLISHER); + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + throw new RuntimeException( + "Retries completed. Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); } - - LOGGER.debug("Creating a channel for publisher"); - publisherChannel = publisherConnection.createChannel(); - publisherChannel.addShutdownListener( - cause -> { - LOGGER.error( - "Publish Channel has been shutdown: {}", - cause.getMessage(), - cause); - }); - - if (publisherChannel == null || !publisherChannel.isOpen()) { - throw new RuntimeException("Fail to open publish channel"); + retryIndex++; + } catch (final Exception e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + throw new RuntimeException( + "Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + throw new RuntimeException( + "Retries completed. Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); } - queueNameToChannel.putIfAbsent(prefix + queueOrExchangeName, publisherChannel); + retryIndex++; } - - } catch (final IOException e) { - throw new RuntimeException( - "Cannot open channel on " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")), - e); } - return publisherChannel; } public void close() { LOGGER.info("Closing all connections and channels"); try { - for (Map.Entry entry : queueNameToChannel.entrySet()) { - closeChannel(entry.getValue()); - } + closeChannelsInMap(ConnectionType.PUBLISHER); + closeChannelsInMap(ConnectionType.SUBSCRIBER); closeConnection(publisherConnection); closeConnection(subscriberConnection); } finally { - queueNameToChannel.clear(); + availableChannelPool.clear(); publisherConnection = null; subscriberConnection = null; } } + private void closeChannelsInMap(ConnectionType conType) { + Set channels = availableChannelPool.get(conType); + if (channels != null && !channels.isEmpty()) { + Iterator itr = channels.iterator(); + while (itr.hasNext()) { + Channel channel = itr.next(); + closeChannel(channel); + } + channels.clear(); + } + } + private void closeConnection(Connection connection) { - if (connection == null) { - LOGGER.warn("Connection is null. Do not close it"); + if (connection == null || !connection.isOpen()) { + LOGGER.warn("Connection is null or closed already. Not closing it again"); } else { try { connection.close(); @@ -256,8 +318,8 @@ private void closeConnection(Connection connection) { } private void closeChannel(Channel channel) { - if (channel == null) { - LOGGER.warn("Channel is null. Do not close it"); + if (channel == null || !channel.isOpen()) { + LOGGER.warn("Channel is null or closed already. Not closing it again"); } else { try { channel.close(); @@ -266,4 +328,64 @@ private void closeChannel(Channel channel) { } } } + + /** + * Gets the channel for specified connectionType. + * + * @param connectionType holds the multiple channels for different connection types for thread + * safe operation. + * @param rmqConnection publisher or subscriber connection instance + * @return channel instance + * @throws Exception + */ + private synchronized Channel borrowChannel( + ConnectionType connectionType, Connection rmqConnection) throws Exception { + if (!availableChannelPool.containsKey(connectionType)) { + Channel channel = getOrCreateChannel(connectionType, rmqConnection); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_CREATION_SUCCESS, connectionType)); + return channel; + } + Set channels = availableChannelPool.get(connectionType); + if (channels != null && channels.isEmpty()) { + Channel channel = getOrCreateChannel(connectionType, rmqConnection); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_CREATION_SUCCESS, connectionType)); + return channel; + } + Iterator itr = channels.iterator(); + while (itr.hasNext()) { + Channel channel = itr.next(); + if (channel != null && channel.isOpen()) { + itr.remove(); + LOGGER.info( + String.format(AMQPConstants.INFO_CHANNEL_BORROW_SUCCESS, connectionType)); + return channel; + } else { + itr.remove(); + } + } + Channel channel = getOrCreateChannel(connectionType, rmqConnection); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_RESET_SUCCESS, connectionType)); + return channel; + } + + /** + * Returns the channel to connection pool for specified connectionType. + * + * @param connectionType + * @param channel + * @throws Exception + */ + public synchronized void returnChannel(ConnectionType connectionType, Channel channel) + throws Exception { + if (channel == null || !channel.isOpen()) { + channel = null; // channel is reset. + } + Set channels = availableChannelPool.get(connectionType); + if (channels == null) { + channels = new HashSet(); + availableChannelPool.put(connectionType, channels); + } + channels.add(channel); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_RETURN_SUCCESS, connectionType)); + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 6cad63cb1..203c4dfbe 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings; import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; @@ -38,12 +39,14 @@ import com.google.common.collect.Maps; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; import rx.Observable; +import rx.Subscriber; /** @author Ritu Parathody */ public class AMQPObservableQueue implements ObservableQueue { @@ -51,6 +54,8 @@ public class AMQPObservableQueue implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class); private final AMQPSettings settings; + private final AMQPRetryPattern retrySettings; + private final String QUEUE_TYPE = "x-queue-type"; private final int batchSize; private final boolean useExchange; private int pollTimeInMS; @@ -64,6 +69,7 @@ public AMQPObservableQueue( Address[] addresses, boolean useExchange, AMQPSettings settings, + AMQPRetryPattern retrySettings, int batchSize, int pollTimeInMS) { if (factory == null) { @@ -84,52 +90,69 @@ public AMQPObservableQueue( this.useExchange = useExchange; this.settings = settings; this.batchSize = batchSize; - this.amqpConnection = AMQPConnection.getInstance(factory, addresses); + this.amqpConnection = AMQPConnection.getInstance(factory, addresses, retrySettings); + this.retrySettings = retrySettings; this.setPollTimeInMS(pollTimeInMS); } @Override public Observable observe() { - receiveMessages(); - Observable.OnSubscribe onSubscribe = - subscriber -> { - Observable interval = - Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); - interval.flatMap( - (Long x) -> { - if (!isRunning()) { - LOGGER.debug( - "Component stopped, skip listening for messages from RabbitMQ"); - return Observable.from(Collections.emptyList()); - } else { - List available = new LinkedList<>(); - messages.drainTo(available); - - if (!available.isEmpty()) { - AtomicInteger count = new AtomicInteger(0); - StringBuilder buffer = new StringBuilder(); - available.forEach( - msg -> { - buffer.append(msg.getId()) - .append("=") - .append(msg.getPayload()); - count.incrementAndGet(); - - if (count.get() < available.size()) { - buffer.append(","); - } - }); - LOGGER.info( - String.format( - "Batch from %s to conductor is %s", - settings.getQueueOrExchangeName(), - buffer.toString())); + Observable.OnSubscribe onSubscribe = null; + // This will enabled the messages to be processed one after the other as per the + // observable next behavior. + if (settings.isSequentialProcessing()) { + LOGGER.info("Subscribing for the message processing on schedule basis"); + receiveMessages(); + onSubscribe = + subscriber -> { + Observable interval = + Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); + interval.flatMap( + (Long x) -> { + if (!isRunning()) { + LOGGER.debug( + "Component stopped, skip listening for messages from RabbitMQ"); + return Observable.from(Collections.emptyList()); + } else { + List available = new LinkedList<>(); + messages.drainTo(available); + + if (!available.isEmpty()) { + AtomicInteger count = new AtomicInteger(0); + StringBuilder buffer = new StringBuilder(); + available.forEach( + msg -> { + buffer.append(msg.getId()) + .append("=") + .append(msg.getPayload()); + count.incrementAndGet(); + + if (count.get() + < available.size()) { + buffer.append(","); + } + }); + LOGGER.info( + String.format( + "Batch from %s to conductor is %s", + settings + .getQueueOrExchangeName(), + buffer.toString())); + } + return Observable.from(available); } - return Observable.from(available); - } - }) - .subscribe(subscriber::onNext, subscriber::onError); - }; + }) + .subscribe(subscriber::onNext, subscriber::onError); + }; + LOGGER.info("Subscribed for the message processing on schedule basis"); + } else { + onSubscribe = + subscriber -> { + LOGGER.info("Subscribing for the event based AMQP message processing"); + receiveMessages(subscriber); + LOGGER.info("Subscribed for the event based AMQP message processing"); + }; + } return Observable.create(onSubscribe); } @@ -164,16 +187,34 @@ public Address[] getAddresses() { public List ack(List messages) { final List processedDeliveryTags = new ArrayList<>(); for (final Message message : messages) { - try { - LOGGER.info("ACK message with delivery tag {}", message.getReceipt()); - amqpConnection - .getOrCreateChannel( - ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) - .basicAck(Long.parseLong(message.getReceipt()), false); - // Message ACKed - processedDeliveryTags.add(message.getReceipt()); - } catch (final IOException e) { - LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); + int retryIndex = 1; + while (true) { + try { + LOGGER.info("ACK message with delivery tag {}", message.getReceipt()); + Channel chn = + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, + getSettings().getQueueOrExchangeName()); + chn.basicAck(Long.parseLong(message.getReceipt()), false); + processedDeliveryTags.add(message.getReceipt()); + LOGGER.info("Ack'ed the message with delivery tag {}", message.getReceipt()); + break; + } catch (final Exception e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + LOGGER.error( + "Cannot ACK message with delivery tag {}", message.getReceipt(), e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + LOGGER.error( + "Retries completed. Cannot ACK message with delivery tag {}", + message.getReceipt(), + e); + } + retryIndex++; + } } } return processedDeliveryTags; @@ -197,20 +238,54 @@ private static AMQP.BasicProperties buildBasicProperties( } private void publishMessage(Message message, String exchange, String routingKey) { - try { - final String payload = message.getPayload(); - amqpConnection - .getOrCreateChannel( - ConnectionType.PUBLISHER, getSettings().getQueueOrExchangeName()) - .basicPublish( + Channel chn = null; + int retryIndex = 1; + while (true) { + try { + final String payload = message.getPayload(); + chn = + amqpConnection.getOrCreateChannel( + ConnectionType.PUBLISHER, getSettings().getQueueOrExchangeName()); + chn.basicPublish( + exchange, + routingKey, + buildBasicProperties(message, settings), + payload.getBytes(settings.getContentEncoding())); + LOGGER.info(String.format("Published message to %s: %s", exchange, payload)); + break; + } catch (Exception ex) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + LOGGER.error( + "Failed to publish message {} to {}", + message.getPayload(), exchange, - routingKey, - buildBasicProperties(message, settings), - payload.getBytes(settings.getContentEncoding())); - LOGGER.info(String.format("Published message to %s: %s", exchange, payload)); - } catch (Exception ex) { - LOGGER.error("Failed to publish message {} to {}", message.getPayload(), exchange, ex); - throw new RuntimeException(ex); + ex); + throw new RuntimeException(ex); + } + try { + retry.continueOrPropogate(ex, retryIndex); + } catch (Exception e) { + LOGGER.error( + "Retries completed. Failed to publish message {} to {}", + message.getPayload(), + exchange, + ex); + throw new RuntimeException(ex); + } + retryIndex++; + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(ConnectionType.PUBLISHER, chn); + } catch (Exception e) { + LOGGER.error( + "Failed to return the channel of {}. {}", + ConnectionType.PUBLISHER, + e); + } + } + } } } @@ -258,13 +333,23 @@ public void setUnackTimeout(Message message, long unackTimeout) { @Override public long size() { + Channel chn = null; try { - return amqpConnection - .getOrCreateChannel( - ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) - .messageCount(settings.getQueueOrExchangeName()); + chn = + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()); + return chn.messageCount(settings.getQueueOrExchangeName()); } catch (final Exception e) { throw new RuntimeException(e); + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(ConnectionType.SUBSCRIBER, chn); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } } } @@ -353,31 +438,38 @@ private ConnectionFactory buildConnectionFactory() { } else { factory.setPort(port); } - // Get connection timeout from config - final int connectionTimeout = (int) properties.getConnectionTimeout().toMillis(); - if (connectionTimeout <= 0) { - throw new IllegalArgumentException("Connection timeout must be greater than 0"); - } else { - factory.setConnectionTimeout(connectionTimeout); - } final boolean useNio = properties.isUseNio(); if (useNio) { factory.useNio(); } + factory.setConnectionTimeout(properties.getConnectionTimeoutInMilliSecs()); + factory.setRequestedHeartbeat(properties.getRequestHeartbeatTimeoutInSecs()); + factory.setNetworkRecoveryInterval(properties.getNetworkRecoveryIntervalInMilliSecs()); + factory.setHandshakeTimeout(properties.getHandshakeTimeoutInMilliSecs()); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(true); + factory.setRequestedChannelMax(properties.getMaxChannelCount()); return factory; } public AMQPObservableQueue build(final boolean useExchange, final String queueURI) { final AMQPSettings settings = new AMQPSettings(properties).fromURI(queueURI); + final AMQPRetryPattern retrySettings = + new AMQPRetryPattern( + properties.getLimit(), properties.getDuration(), properties.getType()); return new AMQPObservableQueue( - factory, addresses, useExchange, settings, batchSize, pollTimeInMS); + factory, + addresses, + useExchange, + settings, + retrySettings, + batchSize, + pollTimeInMS); } } private AMQP.Exchange.DeclareOk getOrCreateExchange(ConnectionType connectionType) - throws IOException { + throws Exception { return getOrCreateExchange( connectionType, settings.getQueueOrExchangeName(), @@ -394,27 +486,35 @@ private AMQP.Exchange.DeclareOk getOrCreateExchange( final boolean isDurable, final boolean autoDelete, final Map arguments) - throws IOException { + throws Exception { if (StringUtils.isEmpty(name)) { throw new RuntimeException("Exchange name is undefined"); } if (StringUtils.isEmpty(type)) { throw new RuntimeException("Exchange type is undefined"); } - + Channel chn = null; try { LOGGER.debug("Creating exchange {} of type {}", name, type); - return amqpConnection - .getOrCreateChannel(connectionType, getSettings().getQueueOrExchangeName()) - .exchangeDeclare(name, type, isDurable, autoDelete, arguments); - } catch (final IOException e) { + chn = + amqpConnection.getOrCreateChannel( + connectionType, getSettings().getQueueOrExchangeName()); + return chn.exchangeDeclare(name, type, isDurable, autoDelete, arguments); + } catch (final Exception e) { LOGGER.warn("Failed to create exchange {} of type {}", name, type, e); throw e; + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(connectionType, chn); + } catch (Exception e) { + LOGGER.error("Failed to return the channel of {}. {}", connectionType, e); + } + } } } - private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType) - throws IOException { + private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType) throws Exception { return getOrCreateQueue( connectionType, settings.getQueueOrExchangeName(), @@ -431,19 +531,29 @@ private AMQP.Queue.DeclareOk getOrCreateQueue( final boolean isExclusive, final boolean autoDelete, final Map arguments) - throws IOException { + throws Exception { if (StringUtils.isEmpty(name)) { throw new RuntimeException("Queue name is undefined"); } - + arguments.put(QUEUE_TYPE, settings.getQueueType()); + Channel chn = null; try { LOGGER.debug("Creating queue {}", name); - return amqpConnection - .getOrCreateChannel(connectionType, getSettings().getQueueOrExchangeName()) - .queueDeclare(name, isDurable, isExclusive, autoDelete, arguments); - } catch (final IOException e) { + chn = + amqpConnection.getOrCreateChannel( + connectionType, getSettings().getQueueOrExchangeName()); + return chn.queueDeclare(name, isDurable, isExclusive, autoDelete, arguments); + } catch (final Exception e) { LOGGER.warn("Failed to create queue {}", name, e); throw e; + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(connectionType, chn); + } catch (Exception e) { + LOGGER.error("Failed to return the channel of {}. {}", connectionType, e); + } + } } } @@ -459,7 +569,6 @@ private static Message asMessage(AMQPSettings settings, GetResponse response) th } private void receiveMessagesFromQueue(String queueName) throws Exception { - int nb = 0; LOGGER.debug("Accessing channel for queue {}", queueName); Consumer consumer = @@ -492,15 +601,23 @@ public void handleDelivery( LOGGER.info("receiveMessagesFromQueue- End method {}", messages); } } catch (InterruptedException e) { + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); Thread.currentThread().interrupt(); } catch (Exception e) { - // + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); } } public void handleCancel(String consumerTag) throws IOException { LOGGER.error( - "Recieved a consumer cancel notification for subscriber. Will monitor and make changes"); + "Recieved a consumer cancel notification for subscriber {}", + consumerTag); } }; @@ -511,6 +628,75 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) Monitors.recordEventQueueMessagesProcessed(getType(), queueName, messages.size()); } + private void receiveMessagesFromQueue(String queueName, Subscriber subscriber) + throws Exception { + LOGGER.debug("Accessing channel for queue {}", queueName); + + Consumer consumer = + new DefaultConsumer( + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, + getSettings().getQueueOrExchangeName())) { + + @Override + public void handleDelivery( + final String consumerTag, + final Envelope envelope, + final AMQP.BasicProperties properties, + final byte[] body) + throws IOException { + try { + Message message = + asMessage( + settings, + new GetResponse( + envelope, properties, body, Integer.MAX_VALUE)); + if (message == null) { + return; + } + LOGGER.info( + "Got message with ID {} and receipt {}", + message.getId(), + message.getReceipt()); + LOGGER.debug("Message content {}", message); + // Not using thread-pool here as the number of concurrent threads are + // controlled + // by the number of messages delivery using pre-fetch count in RabbitMQ + Thread newThread = + new Thread( + () -> { + LOGGER.info( + "Spawning a new thread for message with ID {}", + message.getId()); + subscriber.onNext(message); + }); + newThread.start(); + } catch (InterruptedException e) { + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); + } + } + + public void handleCancel(String consumerTag) throws IOException { + LOGGER.error( + "Recieved a consumer cancel notification for subscriber {}", + consumerTag); + } + }; + amqpConnection + .getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) + .basicConsume(queueName, false, consumer); + } + protected void receiveMessages() { try { amqpConnection @@ -522,11 +708,14 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) // Consume messages from an exchange getOrCreateExchange(ConnectionType.SUBSCRIBER); /* - * Create queue if not present based on the settings provided in the queue URI or configuration properties. - * Sample URI format: amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive=false&autoDelete=false&durable=true - * Default settings if not provided in the queue URI or properties: isDurable: true, autoDelete: false, isExclusive: false - * The same settings are currently used during creation of exchange as well as queue. - * TODO: This can be enhanced further to get the settings separately for exchange and queue from the URI + * Create queue if not present based on the settings provided in the queue URI + * or configuration properties. Sample URI format: + * amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive + * =false&autoDelete=false&durable=true Default settings if not provided in the + * queue URI or properties: isDurable: true, autoDelete: false, isExclusive: + * false The same settings are currently used during creation of exchange as + * well as queue. TODO: This can be enhanced further to get the settings + * separately for exchange and queue from the URI */ final AMQP.Queue.DeclareOk declareOk = getOrCreateQueue( @@ -558,6 +747,56 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) } } + protected void receiveMessages(Subscriber subscriber) { + try { + amqpConnection + .getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) + .basicQos(batchSize); + String queueName; + if (useExchange) { + // Consume messages from an exchange + getOrCreateExchange(ConnectionType.SUBSCRIBER); + /* + * Create queue if not present based on the settings provided in the queue URI + * or configuration properties. Sample URI format: + * amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive + * =false&autoDelete=false&durable=true Default settings if not provided in the + * queue URI or properties: isDurable: true, autoDelete: false, isExclusive: + * false The same settings are currently used during creation of exchange as + * well as queue. TODO: This can be enhanced further to get the settings + * separately for exchange and queue from the URI + */ + final AMQP.Queue.DeclareOk declareOk = + getOrCreateQueue( + ConnectionType.SUBSCRIBER, + String.format("bound_to_%s", settings.getQueueOrExchangeName()), + settings.isDurable(), + settings.isExclusive(), + settings.autoDelete(), + Maps.newHashMap()); + // Bind the declared queue to exchange + queueName = declareOk.getQueue(); + amqpConnection + .getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) + .queueBind( + queueName, + settings.getQueueOrExchangeName(), + settings.getRoutingKey()); + } else { + // Consume messages from a queue + queueName = getOrCreateQueue(ConnectionType.SUBSCRIBER).getQueue(); + } + // Consume messages + LOGGER.info("Consuming from queue {}", queueName); + receiveMessagesFromQueue(queueName, subscriber); + } catch (Exception exception) { + LOGGER.error("Exception while getting messages from RabbitMQ", exception); + Monitors.recordObservableQMessageReceivedErrors(getType()); + } + } + public int getPollTimeInMS() { return pollTimeInMS; } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java index 8960febf4..3ebdd5788 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java @@ -16,6 +16,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import com.netflix.conductor.contribs.queue.amqp.util.RetryType; + import com.rabbitmq.client.AMQP.PROTOCOL; import com.rabbitmq.client.ConnectionFactory; @@ -36,8 +38,62 @@ public class AMQPEventQueueProperties { private int port = PROTOCOL.PORT; - private Duration connectionTimeout = - Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT); + private int connectionTimeoutInMilliSecs = 180000; + private int networkRecoveryIntervalInMilliSecs = 5000; + private int requestHeartbeatTimeoutInSecs = 30; + private int handshakeTimeoutInMilliSecs = 180000; + private int maxChannelCount = 5000; + private int limit = 50; + private int duration = 1000; + private RetryType retryType = RetryType.REGULARINTERVALS; + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public int getDuration() { + return duration; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public RetryType getType() { + return retryType; + } + + public void setType(RetryType type) { + this.retryType = type; + } + + public int getConnectionTimeoutInMilliSecs() { + return connectionTimeoutInMilliSecs; + } + + public void setConnectionTimeoutInMilliSecs(int connectionTimeoutInMilliSecs) { + this.connectionTimeoutInMilliSecs = connectionTimeoutInMilliSecs; + } + + public int getHandshakeTimeoutInMilliSecs() { + return handshakeTimeoutInMilliSecs; + } + + public void setHandshakeTimeoutInMilliSecs(int handshakeTimeoutInMilliSecs) { + this.handshakeTimeoutInMilliSecs = handshakeTimeoutInMilliSecs; + } + + public int getMaxChannelCount() { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) { + this.maxChannelCount = maxChannelCount; + } private boolean useNio = false; @@ -53,6 +109,10 @@ public class AMQPEventQueueProperties { private String exchangeType = "topic"; + private String queueType = "classic"; + + private boolean sequentialMsgProcessing = true; + private int deliveryMode = 2; private boolean useExchange = true; @@ -115,14 +175,6 @@ public void setPort(int port) { this.port = port; } - public Duration getConnectionTimeout() { - return connectionTimeout; - } - - public void setConnectionTimeout(Duration connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - public boolean isUseNio() { return useNio; } @@ -202,4 +254,48 @@ public String getListenerQueuePrefix() { public void setListenerQueuePrefix(String listenerQueuePrefix) { this.listenerQueuePrefix = listenerQueuePrefix; } + + public String getQueueType() { + return queueType; + } + + /** + * @param queueType Supports two queue types, 'classic' and 'quorum'. Classic will be be + * deprecated in 2022 and its usage discouraged from RabbitMQ community. So not using enum + * type here to hold different values. + */ + public void setQueueType(String queueType) { + this.queueType = queueType; + } + + /** @return the sequentialMsgProcessing */ + public boolean isSequentialMsgProcessing() { + return sequentialMsgProcessing; + } + + /** + * @param sequentialMsgProcessing the sequentialMsgProcessing to set Supports sequential and + * parallel message processing capabilities. In parallel message processing, number of + * threads are controlled by batch size. No thread control or execution framework required + * here as threads are limited and short-lived. + */ + public void setSequentialMsgProcessing(boolean sequentialMsgProcessing) { + this.sequentialMsgProcessing = sequentialMsgProcessing; + } + + public int getNetworkRecoveryIntervalInMilliSecs() { + return networkRecoveryIntervalInMilliSecs; + } + + public void setNetworkRecoveryIntervalInMilliSecs(int networkRecoveryIntervalInMilliSecs) { + this.networkRecoveryIntervalInMilliSecs = networkRecoveryIntervalInMilliSecs; + } + + public int getRequestHeartbeatTimeoutInSecs() { + return requestHeartbeatTimeoutInSecs; + } + + public void setRequestHeartbeatTimeoutInSecs(int requestHeartbeatTimeoutInSecs) { + this.requestHeartbeatTimeoutInSecs = requestHeartbeatTimeoutInSecs; + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java new file mode 100644 index 000000000..3890a9980 --- /dev/null +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java @@ -0,0 +1,54 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.contribs.queue.amqp.config; + +import com.netflix.conductor.contribs.queue.amqp.util.RetryType; + +public class AMQPRetryPattern { + + private int limit = 50; + private int duration = 1000; + private RetryType type = RetryType.REGULARINTERVALS; + + public AMQPRetryPattern() {} + + public AMQPRetryPattern(int limit, int duration, RetryType type) { + this.limit = limit; + this.duration = duration; + this.type = type; + } + + /** + * This gets executed if the retry index is within the allowed limits, otherwise exception will + * be thrown. + * + * @throws Exception + */ + public void continueOrPropogate(Exception ex, int retryIndex) throws Exception { + if (retryIndex > limit) { + throw ex; + } + // Regular Intervals is the default + long waitDuration = duration; + if (type == RetryType.INCREMENTALINTERVALS) { + waitDuration = duration * retryIndex; + } else if (type == RetryType.EXPONENTIALBACKOFF) { + waitDuration = (long) Math.pow(2, retryIndex) * duration; + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java index 6ecd0b6d1..f8f06aece 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java @@ -70,4 +70,16 @@ public class AMQPConstants { * polling time to drain the in-memory queue. */ public static int DEFAULT_POLL_TIME_MS = 100; + + // info channel messages. + public static final String INFO_CHANNEL_BORROW_SUCCESS = + "Borrowed the channel object from the channel pool for " + "the connection type [%s]"; + public static final String INFO_CHANNEL_RETURN_SUCCESS = + "Returned the borrowed channel object to the pool for " + "the connection type [%s]"; + public static final String INFO_CHANNEL_CREATION_SUCCESS = + "Channels are not available in the pool. Created a" + + " channel for the connection type [%s]"; + public static final String INFO_CHANNEL_RESET_SUCCESS = + "No proper channels available in the pool. Created a " + + "channel for the connection type [%s]"; } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java index 56cd77e1d..7ca2c5140 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java @@ -44,14 +44,14 @@ public class AMQPSettings { private String queueOrExchangeName; private String eventName; private String exchangeType; + private String queueType; private String routingKey; private final String contentEncoding; private final String contentType; - private boolean durable; private boolean exclusive; private boolean autoDelete; - + private boolean sequentialProcessing; private int deliveryMode; private final Map arguments = new HashMap<>(); @@ -66,6 +66,8 @@ public AMQPSettings(final AMQPEventQueueProperties properties) { contentEncoding = properties.getContentEncoding(); exchangeType = properties.getExchangeType(); routingKey = StringUtils.EMPTY; + queueType = properties.getQueueType(); + sequentialProcessing = properties.isSequentialMsgProcessing(); // Set common settings for publishing and consuming setDeliveryMode(properties.getDeliveryMode()); } @@ -213,77 +215,85 @@ public final AMQPSettings fromURI(final String queueURI) { } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof AMQPSettings)) { - return false; - } - AMQPSettings that = (AMQPSettings) o; - return isDurable() == that.isDurable() - && isExclusive() == that.isExclusive() - && autoDelete == that.autoDelete - && getDeliveryMode() == that.getDeliveryMode() - && Objects.equals(getQueueOrExchangeName(), that.getQueueOrExchangeName()) - && Objects.equals(getExchangeType(), that.getExchangeType()) - && Objects.equals(getRoutingKey(), that.getRoutingKey()) - && Objects.equals(getContentType(), that.getContentType()) - && Objects.equals(getContentEncoding(), that.getContentEncoding()) - && Objects.equals(getArguments(), that.getArguments()); + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof AMQPSettings)) return false; + AMQPSettings other = (AMQPSettings) obj; + return Objects.equals(arguments, other.arguments) + && autoDelete == other.autoDelete + && Objects.equals(contentEncoding, other.contentEncoding) + && Objects.equals(contentType, other.contentType) + && deliveryMode == other.deliveryMode + && durable == other.durable + && Objects.equals(eventName, other.eventName) + && Objects.equals(exchangeType, other.exchangeType) + && exclusive == other.exclusive + && Objects.equals(queueOrExchangeName, other.queueOrExchangeName) + && Objects.equals(queueType, other.queueType) + && Objects.equals(routingKey, other.routingKey) + && sequentialProcessing == other.sequentialProcessing; } @Override public int hashCode() { return Objects.hash( - getQueueOrExchangeName(), - getExchangeType(), - getRoutingKey(), - getContentType(), - isDurable(), - isExclusive(), + arguments, autoDelete, - getDeliveryMode(), - getContentEncoding(), - getArguments()); + contentEncoding, + contentType, + deliveryMode, + durable, + eventName, + exchangeType, + exclusive, + queueOrExchangeName, + queueType, + routingKey, + sequentialProcessing); } @Override public String toString() { - return "AMQSettings{" - + "queueOrExchangeName='" + return "AMQPSettings [queueOrExchangeName=" + queueOrExchangeName - + '\'' - + ", exchangeType='" + + ", eventName=" + + eventName + + ", exchangeType=" + exchangeType - + '\'' - + ", routingKey='" + + ", queueType=" + + queueType + + ", routingKey=" + routingKey - + '\'' - + ", contentType='" + + ", contentEncoding=" + + contentEncoding + + ", contentType=" + contentType - + '\'' + ", durable=" + durable + ", exclusive=" + exclusive + ", autoDelete=" + autoDelete + + ", sequentialProcessing=" + + sequentialProcessing + ", deliveryMode=" + deliveryMode - + ", contentEncoding='" - + contentEncoding - + '\'' + ", arguments=" + arguments - + ", durable=" - + isDurable() - + ", exclusive=" - + isExclusive() - + '}'; + + "]"; } public String getEventName() { return eventName; } + + /** @return the queueType */ + public String getQueueType() { + return queueType; + } + + /** @return the sequentialProcessing */ + public boolean isSequentialProcessing() { + return sequentialProcessing; + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java new file mode 100644 index 000000000..a8b072576 --- /dev/null +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.contribs.queue.amqp.util; + +/** RetryType holds the retry type */ +public enum RetryType { + REGULARINTERVALS, + EXPONENTIALBACKOFF, + INCREMENTALINTERVALS +} diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java index a0eeb95ea..38e949121 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java @@ -44,8 +44,7 @@ public void setUp() { when(properties.getPassword()).thenReturn(ConnectionFactory.DEFAULT_PASS); when(properties.getVirtualHost()).thenReturn(ConnectionFactory.DEFAULT_VHOST); when(properties.getPort()).thenReturn(PROTOCOL.PORT); - when(properties.getConnectionTimeout()) - .thenReturn(Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT)); + when(properties.getConnectionTimeoutInMilliSecs()).thenReturn(60000); when(properties.isUseNio()).thenReturn(false); when(properties.isDurable()).thenReturn(true); when(properties.isExclusive()).thenReturn(false); diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java index 730ac65c2..86b3ac596 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java @@ -32,8 +32,10 @@ import org.mockito.stubbing.OngoingStubbing; import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings; +import com.netflix.conductor.contribs.queue.amqp.util.RetryType; import com.netflix.conductor.core.events.queue.Message; import com.rabbitmq.client.AMQP; @@ -90,8 +92,7 @@ public void setUp() { when(properties.getPassword()).thenReturn(ConnectionFactory.DEFAULT_PASS); when(properties.getVirtualHost()).thenReturn(ConnectionFactory.DEFAULT_VHOST); when(properties.getPort()).thenReturn(PROTOCOL.PORT); - when(properties.getConnectionTimeout()) - .thenReturn(Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT)); + when(properties.getConnectionTimeoutInMilliSecs()).thenReturn(60000); when(properties.isUseNio()).thenReturn(false); when(properties.isDurable()).thenReturn(true); when(properties.isExclusive()).thenReturn(false); @@ -322,26 +323,6 @@ void runObserve( observableQueue.close(); } - // Tests - - @Test - public void testGetMessagesFromExistingExchangeAndDefaultConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true); - } - - @Test - public void testGetMessagesFromNotExistingExchangeAndDefaultConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, false, true); - } - @Test public void testGetMessagesFromExistingExchangeWithDurableExclusiveAutoDeleteQueueConfiguration() @@ -353,28 +334,6 @@ public void testGetMessagesFromNotExistingExchangeAndDefaultConfiguration() channel, connection, true, true, true, true, true); } - @Test - public void - testGetMessagesFromNotExistingExchangeWithNonDurableNonExclusiveNonAutoDeleteQueueConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndCustomConfigurationFromURI( - channel, connection, false, true, false, false, false); - } - - @Test - public void - testGetMessagesFromNotExistingExchangeWithDurableExclusiveNonAutoDeleteQueueConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndCustomConfigurationFromURI( - channel, connection, false, true, true, true, false); - } - @Test public void testPublishMessagesToNotExistingExchangeAndDefaultConfiguration() throws IOException, TimeoutException { @@ -384,40 +343,6 @@ public void testPublishMessagesToNotExistingExchangeAndDefaultConfiguration() testPublishMessagesToExchangeAndDefaultConfiguration(channel, connection, false, true); } - @Test(expected = RuntimeException.class) - public void testGetMessagesFromExchangeWithBadConnection() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true); - } - - @Test(expected = RuntimeException.class) - public void testPublishMessagesToExchangeWithBadConnection() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testPublishMessagesToExchangeAndDefaultConfiguration(channel, connection, true, true); - } - - @Test - public void testGetMessagesFromExchangeWithBadChannel() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, false); - } - - @Test(expected = RuntimeException.class) - public void testPublishMessagesToExchangeWithBadChannel() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testPublishMessagesToExchangeAndDefaultConfiguration(channel, connection, true, false); - } - @Test public void testAck() throws IOException, TimeoutException { // Mock channel and connection @@ -428,7 +353,7 @@ public void testAck() throws IOException, TimeoutException { final String name = RandomStringUtils.randomAlphabetic(30), type = "topic", routingKey = RandomStringUtils.randomAlphabetic(30); - + AMQPRetryPattern retrySettings = null; final AMQPSettings settings = new AMQPSettings(properties) .fromURI( @@ -444,6 +369,7 @@ public void testAck() throws IOException, TimeoutException { addresses, true, settings, + retrySettings, batchSize, pollTimeMs); List messages = new LinkedList<>(); @@ -495,13 +421,14 @@ private void testGetMessagesFromExchangeAndDefaultConfiguration( type, routingKey, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, true, settings, + retrySettings, batchSize, pollTimeMs); @@ -595,13 +522,14 @@ private void testGetMessagesFromExchangeAndCustomConfigurationFromURI( type, routingKey, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, true, settings, + retrySettings, batchSize, pollTimeMs); @@ -689,13 +617,14 @@ private void testPublishMessagesToExchangeAndDefaultConfiguration( type, routingKey, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, true, settings, + retrySettings, batchSize, pollTimeMs); @@ -750,31 +679,6 @@ public void testGetMessagesFromNotExistingQueueAndDefaultConfiguration() testGetMessagesFromQueueAndDefaultConfiguration(channel, connection, false, true); } - @Test - public void testPublishMessagesToNotExistingQueueAndDefaultConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testPublishMessagesToQueueAndDefaultConfiguration(channel, connection, false, true); - } - - @Test(expected = RuntimeException.class) - public void testGetMessagesFromQueueWithBadConnection() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testGetMessagesFromQueueAndDefaultConfiguration(channel, connection, true, true); - } - - @Test(expected = RuntimeException.class) - public void testPublishMessagesToQueueWithBadConnection() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testPublishMessagesToQueueAndDefaultConfiguration(channel, connection, true, true); - } - @Test public void testGetMessagesFromQueueWithBadChannel() throws IOException, TimeoutException { // Mock channel and connection @@ -794,32 +698,37 @@ public void testPublishMessagesToQueueWithBadChannel() throws IOException, Timeo @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_empty() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = - new AMQPObservableQueue(null, addresses, false, settings, batchSize, pollTimeMs); + new AMQPObservableQueue( + null, addresses, false, settings, retrySettings, batchSize, pollTimeMs); } @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_addressEmpty() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), null, false, settings, + retrySettings, batchSize, pollTimeMs); } @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_settingsEmpty() throws IOException, TimeoutException { - AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), addresses, false, null, + retrySettings, batchSize, pollTimeMs); } @@ -827,12 +736,14 @@ public void testAMQPObservalbleQueue_settingsEmpty() throws IOException, Timeout @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_batchsizezero() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), addresses, false, settings, + retrySettings, 0, pollTimeMs); } @@ -840,12 +751,14 @@ public void testAMQPObservalbleQueue_batchsizezero() throws IOException, Timeout @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_polltimezero() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), addresses, false, settings, + retrySettings, batchSize, 0); } @@ -869,13 +782,14 @@ private void testGetMessagesFromQueueAndDefaultConfiguration( List queue = buildQueue(random, batchSize); channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, false, settings, + retrySettings, batchSize, pollTimeMs); @@ -900,13 +814,14 @@ private void testGetMessagesFromQueueAndDefaultConfiguration_close( List queue = buildQueue(random, batchSize); channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, false, settings, + retrySettings, batchSize, pollTimeMs); observableQueue.close(); @@ -938,13 +853,14 @@ private void testPublishMessagesToQueueAndDefaultConfiguration( List queue = buildQueue(random, batchSize); channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue); - + AMQPRetryPattern retrySettings = new AMQPRetryPattern(3, 5, RetryType.REGULARINTERVALS); AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, false, settings, + retrySettings, batchSize, pollTimeMs); diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java index 12b8d3ef4..91afc6e50 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java @@ -43,8 +43,7 @@ public void setUp() { when(properties.getPassword()).thenReturn(ConnectionFactory.DEFAULT_PASS); when(properties.getVirtualHost()).thenReturn(ConnectionFactory.DEFAULT_VHOST); when(properties.getPort()).thenReturn(PROTOCOL.PORT); - when(properties.getConnectionTimeout()) - .thenReturn(Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT)); + when(properties.getConnectionTimeoutInMilliSecs()).thenReturn(60000); when(properties.isUseNio()).thenReturn(false); when(properties.isDurable()).thenReturn(true); when(properties.isExclusive()).thenReturn(false); diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java index 52e50f288..c9422cd8c 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.events.EventExecution.Status; @@ -103,28 +104,30 @@ public DefaultEventProcessor( } public void handle(ObservableQueue queue, Message msg) { + List transientFailures = null; + Boolean executionFailed = false; try { if (isEventMessageIndexingEnabled) { executionService.addMessage(queue.getName(), msg); } String event = queue.getType() + ":" + queue.getName(); LOGGER.debug("Evaluating message: {} for event: {}", msg.getId(), event); - List transientFailures = executeEvent(event, msg); - - if (transientFailures.isEmpty()) { + transientFailures = executeEvent(event, msg); + } catch (Exception e) { + executionFailed = true; + LOGGER.error("Error handling message: {} on queue:{}", msg, queue.getName(), e); + Monitors.recordEventQueueMessagesError(queue.getType(), queue.getName()); + } finally { + if (executionFailed || CollectionUtils.isEmpty(transientFailures)) { queue.ack(Collections.singletonList(msg)); LOGGER.debug("Message: {} acked on queue: {}", msg.getId(), queue.getName()); } else if (queue.rePublishIfNoAck()) { // re-submit this message to the queue, to be retried later - // This is needed for queues with no unack timeout, since messages are removed from - // the queue + // This is needed for queues with no unack timeout, since messages are removed + // from the queue queue.publish(Collections.singletonList(msg)); LOGGER.debug("Message: {} published to queue: {}", msg.getId(), queue.getName()); } - } catch (Exception e) { - LOGGER.error("Error handling message: {} on queue:{}", msg, queue.getName(), e); - Monitors.recordEventQueueMessagesError(queue.getType(), queue.getName()); - } finally { Monitors.recordEventQueueMessagesHandled(queue.getType(), queue.getName()); } } @@ -144,8 +147,8 @@ protected List executeEvent(String event, Message msg) throws Ex for (EventHandler eventHandler : eventHandlerList) { String condition = eventHandler.getCondition(); String evaluatorType = eventHandler.getEvaluatorType(); - // Set default to true so that if condition is not specified, it falls through to - // process the event. + // Set default to true so that if condition is not specified, it falls through + // to process the event. Boolean success = true; if (StringUtils.isNotEmpty(condition) && evaluators.get(evaluatorType) != null) { Object result = diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index 2c18a5d4a..0dec908c8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -113,7 +113,7 @@ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor work @Override public boolean isAsync() { - return true; + return false; } @VisibleForTesting diff --git a/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java b/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java index 001067d06..793494bd9 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java @@ -37,13 +37,13 @@ public SemaphoreUtil(int numSlots) { */ public boolean acquireSlots(int numSlots) { boolean acquired = semaphore.tryAcquire(numSlots); - LOGGER.debug("Trying to acquire {} permit: {}", numSlots, acquired); + LOGGER.trace("Trying to acquire {} permit: {}", numSlots, acquired); return acquired; } /** Signals that processing is complete and the specified number of permits can be released. */ public void completeProcessing(int numSlots) { - LOGGER.debug("Completed execution; releasing permit"); + LOGGER.trace("Completed execution; releasing permit"); semaphore.release(numSlots); } @@ -54,7 +54,7 @@ public void completeProcessing(int numSlots) { */ public int availableSlots() { int available = semaphore.availablePermits(); - LOGGER.debug("Number of available permits: {}", available); + LOGGER.trace("Number of available permits: {}", available); return available; } } diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy index 3b2977f50..a8a4451e6 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy @@ -54,12 +54,12 @@ class EventSpec extends Specification { event = new Event(eventQueues, parametersUtils, objectMapper) } - def "verify that event task is async"() { + def "verify that event task is NOT async"() { when: def async = event.isAsync() then: - async + !async } def "event cancel calls ack on the queue"() { diff --git a/dependencies.gradle b/dependencies.gradle index a937d121d..fe0043067 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -16,7 +16,7 @@ */ ext { revActivation = '2.0.0' - revAmqpClient = '5.13.0' + revAmqpClient = '5.14.0' revAwaitility = '3.1.6' revAwsSdk = '1.11.86' revAzureStorageBlobSdk = '12.7.0' diff --git a/docker/server/config/config.properties b/docker/server/config/config.properties index 3c6b10a77..55124c78c 100755 --- a/docker/server/config/config.properties +++ b/docker/server/config/config.properties @@ -36,6 +36,8 @@ conductor.elasticsearch.url=http://es:9200 # Name of the elasticsearch cluster conductor.elasticsearch.indexName=conductor +#conductor.event-queues.amqp.queueType=classic +#conductor.event-queues.amqp.sequentialMsgProcessing=true # Additional modules for metrics collection exposed via logger (optional) # conductor.metrics-logger.enabled=true diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java index 2d2538b9e..2c0b3eadb 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java @@ -94,6 +94,68 @@ public RedisProperties(ConductorProperties conductorProperties) { @DurationUnit(ChronoUnit.SECONDS) private Duration eventExecutionPersistenceTTL = Duration.ofSeconds(60); + // Maximum number of idle connections to be maintained + private int maxIdleConnections = 8; + + // Minimum number of idle connections to be maintained + private int minIdleConnections = 5; + + private long minEvictableIdleTimeMillis = 1800000; + + private long timeBetweenEvictionRunsMillis = -1L; + + private boolean testWhileIdle = false; + + private int numTestsPerEvictionRun = 3; + + public int getNumTestsPerEvictionRun() { + return numTestsPerEvictionRun; + } + + public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) { + this.numTestsPerEvictionRun = numTestsPerEvictionRun; + } + + public boolean isTestWhileIdle() { + return testWhileIdle; + } + + public void setTestWhileIdle(boolean testWhileIdle) { + this.testWhileIdle = testWhileIdle; + } + + public long getMinEvictableIdleTimeMillis() { + return minEvictableIdleTimeMillis; + } + + public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { + this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; + } + + public long getTimeBetweenEvictionRunsMillis() { + return timeBetweenEvictionRunsMillis; + } + + public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { + this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; + } + + public int getMinIdleConnections() { + return minIdleConnections; + } + + public void setMinIdleConnections(int minIdleConnections) { + this.minIdleConnections = minIdleConnections; + } + + public int getMaxIdleConnections() { + return maxIdleConnections; + } + + public void setMaxIdleConnections(int maxIdleConnections) { + this.maxIdleConnections = maxIdleConnections; + } + public String getDataCenterRegion() { return dataCenterRegion; } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java index 90d442b71..90a3fd6cd 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java @@ -43,8 +43,15 @@ protected JedisCommands createJedisCommands( HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier) { GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig<>(); - genericObjectPoolConfig.setMinIdle(5); + genericObjectPoolConfig.setMinIdle(properties.getMinIdleConnections()); + genericObjectPoolConfig.setMaxIdle(properties.getMaxIdleConnections()); genericObjectPoolConfig.setMaxTotal(properties.getMaxConnectionsPerHost()); + genericObjectPoolConfig.setTestWhileIdle(properties.isTestWhileIdle()); + genericObjectPoolConfig.setMinEvictableIdleTimeMillis( + properties.getMinEvictableIdleTimeMillis()); + genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis( + properties.getTimeBetweenEvictionRunsMillis()); + genericObjectPoolConfig.setNumTestsPerEvictionRun(properties.getNumTestsPerEvictionRun()); log.info( "Starting conductor server using redis_sentinel and cluster " + properties.getClusterName()); diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index d3ec09e98..69b0ebf98 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -39,6 +39,12 @@ conductor.redis.queuesNonQuorumPort=22122 # For a single node dynomite or redis server, make sure the value below is set to same as rack specified in the "workflow.dynomite.cluster.hosts" property. conductor.redis.availabilityZone=us-east-1c +#conductor.redis.maxIdleConnections=8 +#conductor.redis.minIdleConnections=5 +#conductor.redis.minEvictableIdleTimeMillis = 1800000 +#conductor.redis.timeBetweenEvictionRunsMillis = -1L +#conductor.redis.testWhileIdle = false +#conductor.redis.numTestsPerEvictionRun = 3 #Transport address to elasticsearch conductor.elasticsearch.url=localhost:9300 @@ -52,7 +58,7 @@ conductor.elasticsearch.version=6 # Default event queue type to listen on for wait task conductor.default-event-queue.type=sqs - + #zookeeper # conductor.zookeeper-lock.connectionString=host1.2181,host2:2181,host3:2181 # conductor.zookeeper-lock.sessionTimeoutMs @@ -84,8 +90,17 @@ conductor.workflow-execution-lock.type=noop_lock #conductor.event-queues.amqp.port=5672 #conductor.event-queues.amqp.useNio=false #conductor.event-queues.amqp.batchSize=1 - #conductor.event-queues.amqp.pollTimeDuration=100ms +#conductor.event-queues.amqp.queueType=classic +#conductor.event-queues.amqp.sequentialMsgProcessing=true +#conductor.event-queues.amqp.connectionTimeoutInMilliSecs=180000 +#conductor.event-queues.amqp.networkRecoveryIntervalInMilliSecs=5000 +#conductor.event-queues.amqp.requestHeartbeatTimeoutInSecs=30 +#conductor.event-queues.amqp.handshakeTimeoutInMilliSecs=180000 +#conductor.event-queues.amqp.maxChannelCount=5000 +#conductor.event-queues.amqp.limit=50 +#conductor.event-queues.amqp.duration=1000 +#conductor.event-queues.amqp.retryType=REGULARINTERVALS #conductor.event-queues.amqp.useExchange=true( exchange or queue) #conductor.event-queues.amqp.listenerQueuePrefix=myqueue diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy index 30a64dd6d..43c227640 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy @@ -45,16 +45,11 @@ class EventTaskSpec extends AbstractSpecification { then: "Retrieve the workflow" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 1 + tasks.size() == 2 tasks[0].taskType == TaskType.EVENT.name() - tasks[0].status == Task.Status.SCHEDULED + tasks[0].status == Task.Status.COMPLETED } - when: "the event task is executed by issuing a system task call" - List polledTaskIds = queueDAO.pop(eventTask.taskType, 1, 200) - String eventTaskId = polledTaskIds.get(0) - asyncSystemTaskExecutor.execute(eventTask, eventTaskId) - then: "Retrieve the workflow" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING