-
Notifications
You must be signed in to change notification settings - Fork 691
Java messageproducer
- 简单例子
- 消息
- 客户端配置
- 会话工厂MessageSessionFactory
- 发送消息MessageProducer
- 订阅消息MessageConsumer
- 遍历消息TopicBrowser
- Spring框架支持
- 高级主题
#消息生产者
MQ系统的三个角色:生产者,消费者和队列。生产者作为消息内容的提供商,是整个系统的驱动者。生产者发送消息到队列服务器,消费者从队列服务器获取消息并消费。三者分工明确,密切协作来完成异步任务。
#创建生产者
在上一节创建了消息会话工厂后,就可以通过工厂来创建消息生产者,这都是通过createProducer
方法:
final MessageProducer producer = sessionFactory.createProducer();
创建的生产者是MessageProducer
类的实例,有了生产者是不是就可以马上发送消息?很抱歉,不行。原因是这样,服务端注册它提供的topic到zookeeper,但是客户端链接到zookeeper后还不知道应该连接哪个服务器。它必须告诉zookeeper想去连接哪个topic的服务器,拿到服务器地址,然后才能建立socket连接,最后才可以发送消息。
因此创建生产者之后还需要一个发布Topic的过程:
// publish topic
final String topic = "meta-test";
producer.publish(topic);
假设我们要发送的消息的主题是meta-test
,那么只要调用publish(topic)
这个方法就可以告诉zookeeper我们想要查找提供topic为meta-test
的服务器,从zookeeper获取服务器地址后,会话工厂会连接服务器,生产者就可以使用这些连接来发送消息了。
publish(topic)
方法可以针对同一个topic调用多次,这跟调用一次的效果是一样的。
同一个生产者想要发送多个不同topic的消息,那么这些topic都需要被发布一次:
producer.publish("topic1");
producer.publish("topic2");
例如,这里我们发布了两个topic:topic1
和topic2
,接下来就可以尝试发送这些主题的消息。
##发布的topic没有服务器提供
我们发布了meta-test
的主题,然后希望zookeeper告诉我们哪里有提供meta-test
主题的队列服务的服务器,但是zookeeper可能找不到提供这个主题服务的服务器,那么在发送消息的时候就会看到这样的异常:
There is no aviable partition for topic meta-test,maybe you don't publish it at first?
,这种情况你必须检查你的服务器是否配置了meta-test
这个topic,并且客户端和服务器的zookeeper配置也保持一致。
#发送消息
在消息一节,我们已经介绍了怎么创建消息,现在我们要做的就是调用send
方法将创建的消息发送出去:
// send message
try{
final SendResult sendResult = producer.sendMessage(new Message(topic, "hello,MetaQ!".getBytes()));
// check result
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
}
else {
System.out.println("Send message successfully,sent to " + sendResult.getPartition());
}
}catch(MetaClientException e){
e.printStackTrace(); //TODO log
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
发送的结果保存在返回的SendResult
实例。同时。你可能需要处理一些异常,如中断异常和客户端异常。通过SendResult.isSuccess()
返回的布尔值可以判断发送成功还是失败。
如果发送失败,通过SendResult.getErrorMessage()
方法可以获取发送失败的具体错误信息,方便调试。如果发送成功,则可以通过getPartition()
和getOffset()
方法获取消息被发送到哪个分区,以及消息在该分区数据文件中的绝对偏移量。
send(message)
是一个同步调用,默认使用3秒的超时时间,如果超时还没有返回则会抛出MetaOpeartionTimeoutException
异常,这个异常是MetaClientException
异常的子类。如果要捕捉超时异常,可以单独catch这个异常信息。
send
同时有一个同步的重载的方法用于设定发送超时时间,比如5秒发送超时:
producer.send(message, 5000, TimeUnit.MILLISECONDS);
#异步发送
同步发送会阻塞当前线程,直到服务端返回发送结果或者超时才能解除阻塞。现在应用都讲究异步化,因此