Skip to content
This repository was archived by the owner on Jan 13, 2023. It is now read-only.

Added TridentSpout for RabbitMq #43

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
target/
# Eclipse
.classpath
.project
.settings/

# Intellij
.idea/
*.iml
*.iws

# Mac
.DS_Store

# Maven
logs/
target/
dist/
bin/
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ While the standard ```RabbitMQSpout``` above will deliver messages on an anchore
Scheme scheme = new YourCustomMessageScheme();
IRichSpout spout = new UnanchoredRabbitMQSpout(scheme);
```
### Trident Spout
```java
Scheme scheme = new YourCustomMessageScheme();
Config producerConf = new Config();
TridentRabbitMqSpout spout = new TridentRabbitMqSpout(scheme, streamId, producerConf);

## MultiStream Spout

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/narendra/trident/rabbitmq/RabbitMQBatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.narendra.trident.rabbitmq;

/**
* Batch coordination metadata object for the TridentRabbitMQSpout.
* This implementation does not use batch metadata, so the object is empty.
*
*/
public class RabbitMQBatch {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.narendra.trident.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.trident.spout.ITridentSpout.BatchCoordinator;

/**
* Bare implementation of a BatchCoordinator, returning a null RabbitMQ object
*
*/
public class RabbitMQBatchCoordinator implements BatchCoordinator<RabbitMQBatch> {

private final String name;

private final Logger LOG = LoggerFactory.getLogger(RabbitMQBatchCoordinator.class);

public RabbitMQBatchCoordinator(String name) {
this.name = name;
LOG.info("Created batch coordinator for " + name);
}

@Override
public RabbitMQBatch initializeTransaction(long txid, RabbitMQBatch prevMetadata, RabbitMQBatch curMetadata) {
LOG.debug("Initialise transaction " + txid + " for " + name);
return null;
}

@Override
public void success(long txid) {}

@Override
public boolean isReady(long txid) {
return true;
}

@Override
public void close() {}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package com.narendra.trident.rabbitmq;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.RotatingMap;
import io.latent.storm.rabbitmq.Declarator;
import io.latent.storm.rabbitmq.ErrorReporter;
import io.latent.storm.rabbitmq.Message;
import io.latent.storm.rabbitmq.MessageScheme;
import io.latent.storm.rabbitmq.RabbitMQConsumer;
import io.latent.storm.rabbitmq.config.ConsumerConfig;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout.Emitter;
import storm.trident.topology.TransactionAttempt;

/**
* The RabbitMQTridentEmitter class listens for incoming messages and stores them in a blocking
* queue. On each invocation of emit, the queued messages are emitted as a batch.
*
*/

@SuppressWarnings("rawtypes")
public class RabbitMQTridentEmitter implements Emitter<RabbitMQBatch> {

private final Logger LOG;

private int maxBatchSize = 3;
private MessageScheme scheme = null;
private transient RabbitMQConsumer consumer;
private boolean active;
private String streamId;

private RotatingMap<Long, List<Message>> batchMessageMap = null; // Maps transaction Ids

private long rotateTimeMillis;

private long lastRotate;

public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";

@SuppressWarnings("unchecked")
public RabbitMQTridentEmitter(MessageScheme scheme, final TopologyContext context,
Declarator declarator, String streamId, Map consumerMap) {

batchMessageMap = new RotatingMap<Long, List<Message>>(3);
ConsumerConfig consumerConfig = ConsumerConfig.getFromStormConfig(consumerMap);
ErrorReporter reporter = new ErrorReporter() {
@Override
public void reportError(Throwable error) {

}
};
this.scheme = scheme;
consumer = loadConsumer(declarator, reporter, consumerConfig);
scheme.open(consumerMap, context);
consumer.open();
maxBatchSize = Integer.parseInt(consumerMap.get(MAX_BATCH_SIZE_CONF).toString());
LOG = LoggerFactory.getLogger(RabbitMQTridentEmitter.class);
active = true;
rotateTimeMillis =
1000L * Integer.parseInt(consumerMap.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
lastRotate = System.currentTimeMillis();
}



protected RabbitMQConsumer loadConsumer(Declarator declarator, ErrorReporter reporter,
ConsumerConfig config) {
return new RabbitMQConsumer(config.getConnectionConfig(), config.getPrefetchCount(),
config.getQueueName(), config.isRequeueOnFail(), declarator, reporter);
}


@Override
public void success(TransactionAttempt tx) {

@SuppressWarnings("unchecked")
List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId());

if (messages != null) {
if (!messages.isEmpty()) {
LOG.debug("Success for batch with transaction id " + tx.getTransactionId() + "/"
+ tx.getAttemptId() + "StreamId " + streamId);
}

for (Message msg : messages) {
Long messageId = Long.MIN_VALUE;
try {
messageId = getDeliveryTag(msg);
// acking is important
if (messageId instanceof Long)
consumer.ack((Long) messageId);
LOG.trace("Acknowledged message " + messageId);
} catch (Exception e) {
LOG.warn("Failed to acknowledge message " + messageId, e);
}
}
} else {
LOG.warn("No messages found in batch with transaction id " + tx.getTransactionId() + "/"
+ tx.getAttemptId());
}
}



@Override
public void close() {
try {
LOG.info("Closing Consumer connection.");
consumer.close();
scheme.close();
} catch (Exception e) {
LOG.warn("Error closing Consumer connection.", e);
}
}

@Override
public void emitBatch(TransactionAttempt tx, RabbitMQBatch coordinatorMeta,
TridentCollector collector) {


long now = System.currentTimeMillis();
if (now - lastRotate > rotateTimeMillis) {
Map<Long, List<Message>> failed = batchMessageMap.rotate();
for (Long id : failed.keySet()) {
LOG.warn("TIMED OUT batch with transaction id " + id + " for " + streamId);
fail(id, failed.get(id));
}
lastRotate = now;
}


if (batchMessageMap.containsKey(tx.getTransactionId())) {
LOG.warn("FAILED duplicate batch with transaction id " + tx.getTransactionId() + "/"
+ tx.getAttemptId() + " for " + streamId);
fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId()));
}

if (!active)
return;
int emitted = 0;
Message message;

List<Message> batchMessages = new ArrayList<Message>();

while (emitted < maxBatchSize && (message = consumer.nextMessage()) != Message.NONE) {
List<Object> tuple = extractTuple(message, collector);
if (!tuple.isEmpty()) {
batchMessages.add(message);
collector.emit(tuple);
emitted += 1;
}

}



if (!batchMessages.isEmpty()) {
LOG.debug("Emitting batch with transaction id " + tx.getTransactionId() + "/"
+ tx.getAttemptId() + " and size " + batchMessages.size() + " for " + streamId);
} else {
LOG.trace("No items to acknowledge for batch with transaction id " + tx.getTransactionId()
+ "/" + tx.getAttemptId() + " for " + streamId);
}
batchMessageMap.put(tx.getTransactionId(), batchMessages);
}


/**
* Fail a batch with the given transaction id. This is called when a batch is timed out, or a new
* batch with a matching transaction id is emitted. Note that the current implementation does
* nothing - i.e. it discards messages that have been failed.
*
* @param transactionId The transaction id of the failed batch
* @param messages The list of messages to fail.
*/
private void fail(Long transactionId, List<Message> messages) {
LOG.debug("Failure for batch with transaction id " + transactionId + " for " + streamId);
if (messages != null) {
for (Message msg : messages) {
try {
Long msgId = getDeliveryTag(msg);
if (msgId instanceof Long)
consumer.fail((Long) msgId);
LOG.trace("Failed message " + msgId);
} catch (Exception e) {
LOG.warn("Could not identify failed message ", e);
}
}
} else {
LOG.warn("Failed batch has no messages with transaction id " + transactionId);
}
}



private List<Object> extractTuple(Message message, TridentCollector collector) {
try {
List<Object> tuple = scheme.deserialize(message);
if (tuple != null && !tuple.isEmpty()) {
return tuple;
}
} catch (Exception e) {
collector.reportError(e);
}
return Collections.emptyList();
}

protected long getDeliveryTag(Message message) {
return ((Message.DeliveredMessage) message).getDeliveryTag();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.narendra.trident.rabbitmq;

import java.util.Map;

import backtype.storm.spout.Scheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import io.latent.storm.rabbitmq.Declarator;
import io.latent.storm.rabbitmq.MessageScheme;
import storm.trident.spout.ITridentSpout;

@SuppressWarnings("rawtypes")
public class TridentRabbitMqSpout implements ITridentSpout<RabbitMQBatch> {

public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
public static final int DEFAULT_BATCH_SIZE = 1000;
private static final long serialVersionUID = 1L;

private String name;
private static int nameIndex = 1;
private String streamId;
private Map consumerMap;
private Declarator declarator;
private MessageScheme scheme;

public TridentRabbitMqSpout(Scheme scheme, String streamId,Map consumerMap){
this(MessageScheme.Builder.from(scheme), null,new Declarator.NoOp(), streamId,consumerMap);
}

public TridentRabbitMqSpout(MessageScheme scheme, final TopologyContext context,
Declarator declarator, String streamId, Map consumerMap) {
this.scheme = scheme;
this.declarator = declarator;
this.streamId = streamId;
this.consumerMap = consumerMap;
this.name = "RabbitMQSpout" + (nameIndex++);
}

@Override
public Map getComponentConfiguration() {
return consumerMap;
}

@Override
public storm.trident.spout.ITridentSpout.BatchCoordinator<RabbitMQBatch> getCoordinator(String txStateId,
Map conf, TopologyContext context) {
return new RabbitMQBatchCoordinator(name);
}

@Override
public storm.trident.spout.ITridentSpout.Emitter<RabbitMQBatch> getEmitter(String txStateId,
Map conf, TopologyContext context) {
return new RabbitMQTridentEmitter(scheme, context, declarator, streamId, consumerMap);
}

@Override
public Fields getOutputFields() {
return this.scheme.getOutputFields();
}

}