From 962bb28d187914986b63c1a7d3a95dac6630c8da Mon Sep 17 00:00:00 2001 From: supermonk Date: Thu, 24 Dec 2015 02:09:57 -0800 Subject: [PATCH 1/4] Added the Trident RabbitMQ Spout for review, followed TridentKafkaSpout --- .gitignore | 17 +- .../trident/rabbitmq/RabbitMQBatch.java | 10 + .../rabbitmq/RabbitMQBatchCoordinator.java | 41 ++++ .../rabbitmq/RabbitMQTridentEmitter.java | 222 ++++++++++++++++++ .../rabbitmq/TridentRabbitMqSpout.java | 61 +++++ .../storm/rabbitmq/RabbitMQMessageScheme.java | 47 ++-- 6 files changed, 380 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatch.java create mode 100644 src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatchCoordinator.java create mode 100644 src/main/java/com/narendra/trident/rabbitmq/RabbitMQTridentEmitter.java create mode 100644 src/main/java/com/narendra/trident/rabbitmq/TridentRabbitMqSpout.java diff --git a/.gitignore b/.gitignore index 5dcb4b2..16c9ccc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,18 @@ -target/ +# Eclipse +.classpath +.project +.settings/ +# Intellij +.idea/ *.iml +*.iws + +# Mac +.DS_Store + +# Maven +logs/ +target/ +dist/ +bin/ diff --git a/src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatch.java b/src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatch.java new file mode 100644 index 0000000..7d0a185 --- /dev/null +++ b/src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatch.java @@ -0,0 +1,10 @@ +package com.narendra.trident.rabbitmq; + +/** + * Batch coordination metadata object for the TridentRabbitMQSpout. + * This implementation does not use batch metadata, so the object is empty. + * + */ +public class RabbitMQBatch { + +} \ No newline at end of file diff --git a/src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatchCoordinator.java b/src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatchCoordinator.java new file mode 100644 index 0000000..99e297e --- /dev/null +++ b/src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatchCoordinator.java @@ -0,0 +1,41 @@ +package com.narendra.trident.rabbitmq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import storm.trident.spout.ITridentSpout.BatchCoordinator; + +/** + * Bare implementation of a BatchCoordinator, returning a null RabbitMQ object + * + */ +public class RabbitMQBatchCoordinator implements BatchCoordinator { + + private final String name; + + private final Logger LOG = LoggerFactory.getLogger(RabbitMQBatchCoordinator.class); + + public RabbitMQBatchCoordinator(String name) { + this.name = name; + LOG.info("Created batch coordinator for " + name); + } + + @Override + public RabbitMQBatch initializeTransaction(long txid, RabbitMQBatch prevMetadata, RabbitMQBatch curMetadata) { + LOG.debug("Initialise transaction " + txid + " for " + name); + return null; + } + + @Override + public void success(long txid) {} + + @Override + public boolean isReady(long txid) { + return true; + } + + @Override + public void close() {} + +} + diff --git a/src/main/java/com/narendra/trident/rabbitmq/RabbitMQTridentEmitter.java b/src/main/java/com/narendra/trident/rabbitmq/RabbitMQTridentEmitter.java new file mode 100644 index 0000000..2f7438a --- /dev/null +++ b/src/main/java/com/narendra/trident/rabbitmq/RabbitMQTridentEmitter.java @@ -0,0 +1,222 @@ +package com.narendra.trident.rabbitmq; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.Config; +import backtype.storm.task.TopologyContext; +import backtype.storm.utils.RotatingMap; +import io.latent.storm.rabbitmq.Declarator; +import io.latent.storm.rabbitmq.ErrorReporter; +import io.latent.storm.rabbitmq.Message; +import io.latent.storm.rabbitmq.MessageScheme; +import io.latent.storm.rabbitmq.RabbitMQConsumer; +import io.latent.storm.rabbitmq.config.ConsumerConfig; +import storm.trident.operation.TridentCollector; +import storm.trident.spout.ITridentSpout.Emitter; +import storm.trident.topology.TransactionAttempt; + +/** + * The RabbitMQTridentEmitter class listens for incoming messages and stores them in a blocking + * queue. On each invocation of emit, the queued messages are emitted as a batch. + * + */ + +@SuppressWarnings("rawtypes") +public class RabbitMQTridentEmitter implements Emitter { + + private final Logger LOG; + + private int maxBatchSize = 3; + private MessageScheme scheme = null; + private transient RabbitMQConsumer consumer; + private boolean active; + private String streamId; + + private RotatingMap> batchMessageMap = null; // Maps transaction Ids + + private long rotateTimeMillis; + + private long lastRotate; + + public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; + + @SuppressWarnings("unchecked") + public RabbitMQTridentEmitter(MessageScheme scheme, final TopologyContext context, + Declarator declarator, String streamId, Map consumerMap) { + + batchMessageMap = new RotatingMap>(3); + ConsumerConfig consumerConfig = ConsumerConfig.getFromStormConfig(consumerMap); + ErrorReporter reporter = new ErrorReporter() { + @Override + public void reportError(Throwable error) { + + } + }; + this.scheme = scheme; + consumer = loadConsumer(declarator, reporter, consumerConfig); + scheme.open(consumerMap, context); + consumer.open(); + maxBatchSize = Integer.parseInt(consumerMap.get(MAX_BATCH_SIZE_CONF).toString()); + LOG = LoggerFactory.getLogger(RabbitMQTridentEmitter.class); + active = true; + rotateTimeMillis = + 1000L * Integer.parseInt(consumerMap.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString()); + lastRotate = System.currentTimeMillis(); + } + + + + protected RabbitMQConsumer loadConsumer(Declarator declarator, ErrorReporter reporter, + ConsumerConfig config) { + return new RabbitMQConsumer(config.getConnectionConfig(), config.getPrefetchCount(), + config.getQueueName(), config.isRequeueOnFail(), declarator, reporter); + } + + + @Override + public void success(TransactionAttempt tx) { + + @SuppressWarnings("unchecked") + List messages = (List) batchMessageMap.remove(tx.getTransactionId()); + + if (messages != null) { + if (!messages.isEmpty()) { + LOG.debug("Success for batch with transaction id " + tx.getTransactionId() + "/" + + tx.getAttemptId() + "StreamId " + streamId); + } + + for (Message msg : messages) { + Long messageId = Long.MIN_VALUE; + try { + messageId = getDeliveryTag(msg); + // acking is important + if (messageId instanceof Long) + consumer.ack((Long) messageId); + LOG.trace("Acknowledged message " + messageId); + } catch (Exception e) { + LOG.warn("Failed to acknowledge message " + messageId, e); + } + } + } else { + LOG.warn("No messages found in batch with transaction id " + tx.getTransactionId() + "/" + + tx.getAttemptId()); + } + } + + + + @Override + public void close() { + try { + LOG.info("Closing Consumer connection."); + consumer.close(); + scheme.close(); + } catch (Exception e) { + LOG.warn("Error closing Consumer connection.", e); + } + } + + @Override + public void emitBatch(TransactionAttempt tx, RabbitMQBatch coordinatorMeta, + TridentCollector collector) { + + + long now = System.currentTimeMillis(); + if (now - lastRotate > rotateTimeMillis) { + Map> failed = batchMessageMap.rotate(); + for (Long id : failed.keySet()) { + LOG.warn("TIMED OUT batch with transaction id " + id + " for " + streamId); + fail(id, failed.get(id)); + } + lastRotate = now; + } + + + if (batchMessageMap.containsKey(tx.getTransactionId())) { + LOG.warn("FAILED duplicate batch with transaction id " + tx.getTransactionId() + "/" + + tx.getAttemptId() + " for " + streamId); + fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId())); + } + + if (!active) + return; + int emitted = 0; + Message message; + + List batchMessages = new ArrayList(); + + while (emitted < maxBatchSize && (message = consumer.nextMessage()) != Message.NONE) { + List tuple = extractTuple(message, collector); + if (!tuple.isEmpty()) { + batchMessages.add(message); + collector.emit(tuple); + emitted += 1; + } + + } + + + + if (!batchMessages.isEmpty()) { + LOG.debug("Emitting batch with transaction id " + tx.getTransactionId() + "/" + + tx.getAttemptId() + " and size " + batchMessages.size() + " for " + streamId); + } else { + LOG.trace("No items to acknowledge for batch with transaction id " + tx.getTransactionId() + + "/" + tx.getAttemptId() + " for " + streamId); + } + batchMessageMap.put(tx.getTransactionId(), batchMessages); + } + + + /** + * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new + * batch with a matching transaction id is emitted. Note that the current implementation does + * nothing - i.e. it discards messages that have been failed. + * + * @param transactionId The transaction id of the failed batch + * @param messages The list of messages to fail. + */ + private void fail(Long transactionId, List messages) { + LOG.debug("Failure for batch with transaction id " + transactionId + " for " + streamId); + if (messages != null) { + for (Message msg : messages) { + try { + Long msgId = getDeliveryTag(msg); + if (msgId instanceof Long) + consumer.fail((Long) msgId); + LOG.trace("Failed message " + msgId); + } catch (Exception e) { + LOG.warn("Could not identify failed message ", e); + } + } + } else { + LOG.warn("Failed batch has no messages with transaction id " + transactionId); + } + } + + + + private List extractTuple(Message message, TridentCollector collector) { + try { + List tuple = scheme.deserialize(message); + if (tuple != null && !tuple.isEmpty()) { + return tuple; + } + } catch (Exception e) { + collector.reportError(e); + } + return Collections.emptyList(); + } + + protected long getDeliveryTag(Message message) { + return ((Message.DeliveredMessage) message).getDeliveryTag(); + } + + +} diff --git a/src/main/java/com/narendra/trident/rabbitmq/TridentRabbitMqSpout.java b/src/main/java/com/narendra/trident/rabbitmq/TridentRabbitMqSpout.java new file mode 100644 index 0000000..9a36213 --- /dev/null +++ b/src/main/java/com/narendra/trident/rabbitmq/TridentRabbitMqSpout.java @@ -0,0 +1,61 @@ +package com.narendra.trident.rabbitmq; + +import java.util.Map; + +import backtype.storm.spout.Scheme; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Fields; +import io.latent.storm.rabbitmq.Declarator; +import io.latent.storm.rabbitmq.MessageScheme; +import storm.trident.spout.ITridentSpout; + +@SuppressWarnings("rawtypes") +public class TridentRabbitMqSpout implements ITridentSpout { + + public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; + public static final int DEFAULT_BATCH_SIZE = 1000; + private static final long serialVersionUID = 1L; + + private String name; + private static int nameIndex = 1; + private String streamId; + private Map consumerMap; + private Declarator declarator; + private MessageScheme scheme; + + public TridentRabbitMqSpout(Scheme scheme, String streamId,Map consumerMap){ + this(MessageScheme.Builder.from(scheme), null,new Declarator.NoOp(), streamId,consumerMap); + } + + public TridentRabbitMqSpout(MessageScheme scheme, final TopologyContext context, + Declarator declarator, String streamId, Map consumerMap) { + this.scheme = scheme; + this.declarator = declarator; + this.streamId = streamId; + this.consumerMap = consumerMap; + this.name = "RabbitMQSpout" + (nameIndex++); + } + + @Override + public Map getComponentConfiguration() { + return consumerMap; + } + + @Override + public storm.trident.spout.ITridentSpout.BatchCoordinator getCoordinator(String txStateId, + Map conf, TopologyContext context) { + return new RabbitMQBatchCoordinator(name); + } + + @Override + public storm.trident.spout.ITridentSpout.Emitter getEmitter(String txStateId, + Map conf, TopologyContext context) { + return new RabbitMQTridentEmitter(scheme, context, declarator, streamId, consumerMap); + } + + @Override + public Fields getOutputFields() { + return this.scheme.getOutputFields(); + } + +} diff --git a/src/main/java/io/latent/storm/rabbitmq/RabbitMQMessageScheme.java b/src/main/java/io/latent/storm/rabbitmq/RabbitMQMessageScheme.java index 36c8d49..1f4c344 100644 --- a/src/main/java/io/latent/storm/rabbitmq/RabbitMQMessageScheme.java +++ b/src/main/java/io/latent/storm/rabbitmq/RabbitMQMessageScheme.java @@ -6,6 +6,8 @@ import java.util.*; import backtype.storm.task.TopologyContext; +import com.rabbitmq.client.LongString; + import java.io.Serializable; @@ -70,7 +72,7 @@ private Properties createProperties(Message.DeliveredMessage dm) { dm.getCorrelationId(), dm.getDeliveryMode(), dm.getExpiration(), - serialiazableHeaders(dm.getHeaders()), + serializableHeaders(dm.getHeaders()), dm.getMessageId(), dm.getPriority(), dm.getReplyTo(), @@ -79,22 +81,33 @@ private Properties createProperties(Message.DeliveredMessage dm) { dm.getUserId()); } - private Map serialiazableHeaders(Map headers) { - if (headers == null) { - return new HashMap(); - } - Map serializableHeaders = new HashMap(headers.size()); - for (Map.Entry entry : headers.entrySet()) { - if (entry.getValue() instanceof Number || - entry.getValue() instanceof Boolean || - entry.getValue() instanceof Character || - entry.getValue() instanceof String || - entry.getValue() instanceof Date) { - serializableHeaders.put(entry.getKey(), entry.getValue()); - } + private Map serializableHeaders(Map headers) { + if (headers == null) { + return new HashMap(); + } + + Map headersSerializable = new HashMap(headers.size()); + for (Map.Entry entry : headers.entrySet()) { + if (entry.getValue() instanceof Number || + entry.getValue() instanceof Boolean || + entry.getValue() instanceof Character || + entry.getValue() instanceof String || + entry.getValue() instanceof Date) { + headersSerializable.put(entry.getKey(), entry.getValue()); + } else if (entry.getValue() instanceof LongString) { + headersSerializable.put(entry.getKey(), entry.getValue().toString()); + } else if (entry.getValue() instanceof ArrayList) { + ArrayList serializedList = new ArrayList(); + for (Object elm : ((ArrayList) entry.getValue())) { + if (elm instanceof HashMap) { + serializedList.add(serializableHeaders((HashMap) elm)); + } + } + headersSerializable.put(entry.getKey(), serializedList); + } + } + return headersSerializable; } - return serializableHeaders; - } public static class Envelope implements Serializable { private final boolean isRedelivery; @@ -102,7 +115,7 @@ public static class Envelope implements Serializable { private final String exchange; private final String routingKey; - Envelope(boolean isRedelivery, long deliveryTag, String exchange, String routingKey) { + public Envelope(boolean isRedelivery, long deliveryTag, String exchange, String routingKey) { this.isRedelivery = isRedelivery; this.deliveryTag = deliveryTag; this.exchange = exchange; From 9160591c94d89914d467e1066faf6375fdc3a96a Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Tue, 5 Apr 2016 14:17:42 -0700 Subject: [PATCH 2/4] Updated README --- .gitignore | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 16c9ccc..c8a90c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,18 +1,4 @@ -# Eclipse -.classpath -.project -.settings/ +target/ -# Intellij -.idea/ *.iml -*.iws - -# Mac -.DS_Store - -# Maven -logs/ -target/ -dist/ -bin/ +/bin/ From 7c358db8bc9274b57d3aba004c349bcb4de13886 Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Tue, 5 Apr 2016 14:24:34 -0700 Subject: [PATCH 3/4] Update .gitignore --- .gitignore | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index c8a90c4..16c9ccc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,18 @@ -target/ +# Eclipse +.classpath +.project +.settings/ +# Intellij +.idea/ *.iml -/bin/ +*.iws + +# Mac +.DS_Store + +# Maven +logs/ +target/ +dist/ +bin/ From 90fddaf23dc71acc9d9df4362cbf8900f9867867 Mon Sep 17 00:00:00 2001 From: supermonk Date: Tue, 5 Apr 2016 14:31:16 -0700 Subject: [PATCH 4/4] Updated ReadMe --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 5745e90..0fde7af 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,11 @@ While the standard ```RabbitMQSpout``` above will deliver messages on an anchore Scheme scheme = new YourCustomMessageScheme(); IRichSpout spout = new UnanchoredRabbitMQSpout(scheme); ``` +### Trident Spout +```java +Scheme scheme = new YourCustomMessageScheme(); +Config producerConf = new Config(); +TridentRabbitMqSpout spout = new TridentRabbitMqSpout(scheme, streamId, producerConf); ## MultiStream Spout