-
Notifications
You must be signed in to change notification settings - Fork 690
Java advancetopics
dennis zhuang edited this page Jun 6, 2013
·
10 revisions
本指南以1.4.5版本的java客户端为起点编写。
- 简单例子
- 消息
- 客户端配置
- 会话工厂MessageSessionFactory
- 发送消息MessageProducer
- 订阅消息MessageConsumer
- 遍历消息TopicBrowser
- Spring框架支持
- 高级主题
这里包括一些MetaQ的高级应用,比如使用log4j appender发送消息作为日志框架,Twitter storm集成以及发送顺序消息等。
#使用log4j扩展发送消息 参见 使用Log4j发送消息 #使用MetaQ作为twitter storm数据源
Maven引用MetaQ storm spout:
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-storm-spout</artifactId>
<version>1.4.5</version>
</dependency>
一个示范性的Topology(在example工程里):
package com.taobao.metamorphosis.example.storm;
import static com.taobao.metamorphosis.example.Help.initMetaConfig;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.storm.scheme.StringScheme;
import com.taobao.metamorphosis.storm.spout.MetaSpout;
public class TestTopology {
public static class FailEveryOther extends BaseRichBolt {
OutputCollector _collector;
int i = 0;
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector collector) {
this._collector = collector;
}
@Override
public void execute(Tuple tuple) {
this.i++;
if (this.i % 2 == 0) {
this._collector.fail(tuple);
}
else {
this._collector.ack(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",
new MetaSpout(new MetaConfig(), new ConsumerConfig("storm-spout"), new StringScheme()), 10);
builder.setBolt("bolt", new FailEveryOther()).shuffleGrouping("spout");
Config conf = new Config();
// Set the consume topic
conf.put(MetaSpout.TOPIC, "neta-test");
// Set the max buffer size in bytes to fetch messages.
conf.put(MetaSpout.FETCH_MAX_SIZE, 1024 * 1024);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
}
}
#发送顺序消息