Skip to content

Java messageproducer

dennis zhuang edited this page May 28, 2013 · 13 revisions

#消息生产者

MQ系统的三个角色:生产者,消费者和队列。生产者作为消息内容的提供商,是整个系统的驱动者。生产者发送消息到队列服务器,消费者从队列服务器获取消息并消费。三者分工明确,密切协作来完成异步任务。

#创建生产者

上一节创建了消息会话工厂后,就可以通过工厂来创建消息生产者,这都是通过createProducer方法:

    final MessageProducer producer = sessionFactory.createProducer();

创建的生产者是MessageProducer类的实例,有了生产者是不是就可以马上发送消息?很抱歉,不行。原因是这样,服务端注册它提供的topic到zookeeper,但是客户端链接到zookeeper后还不知道应该连接哪个服务器。它必须告诉zookeeper想去连接哪个topic的服务器,拿到服务器地址,然后才能建立socket连接,最后才可以发送消息。

因此创建生产者之后还需要一个发布Topic的过程:

        // publish topic
        final String topic = "meta-test";
        producer.publish(topic);

假设我们要发送的消息的主题是meta-test,那么只要调用publish(topic)这个方法就可以告诉zookeeper我们想要查找提供topic为meta-test的服务器,从zookeeper获取服务器地址后,会话工厂会连接服务器,生产者就可以使用这些连接来发送消息了。

publish(topic)方法可以针对同一个topic调用多次,这跟调用一次的效果是一样的。

同一个生产者想要发送多个不同topic的消息,那么这些topic都需要被发布一次:

    producer.publish("topic1");
    producer.publish("topic2");

例如,这里我们发布了两个topic:topic1topic2,接下来就可以尝试发送这些主题的消息。

##发布的topic没有服务器提供

我们发布了meta-test的主题,然后希望zookeeper告诉我们哪里有提供meta-test主题的队列服务的服务器,但是zookeeper可能找不到提供这个主题服务的服务器,那么在发送消息的时候就会看到这样的异常: There is no aviable partition for topic meta-test,maybe you don't publish it at first?,这种情况你必须检查你的服务器是否配置了meta-test这个topic,并且客户端和服务器的zookeeper配置也保持一致。

#发送消息

消息一节,我们已经介绍了怎么创建消息,现在我们要做的就是调用send方法将创建的消息发送出去:

         // send message
         try{
            final SendResult sendResult = producer.sendMessage(new Message(topic, "hello,MetaQ!".getBytes()));
            // check result
            if (!sendResult.isSuccess()) {
                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
            }
            else {
                System.out.println("Send message successfully,sent to " + sendResult.getPartition());
            }
         }catch(MetaClientException e){
             e.printStackTrace(); //TODO log
         }catch(InterruptedException e){
             Thread.currentThread().interrupt();
         }

发送的结果保存在返回的SendResult实例。同时。你可能需要处理一些异常,如中断异常和客户端异常。通过SendResult.isSuccess()返回的布尔值可以判断发送成功还是失败。

如果发送失败,通过SendResult.getErrorMessage()方法可以获取发送失败的具体错误信息,方便调试。如果发送成功,则可以通过getPartition()getOffset()方法获取消息被发送到哪个分区,以及消息在该分区数据文件中的绝对偏移量。

send(message)是一个同步调用,默认使用3秒的超时时间,如果超时还没有返回则会抛出MetaOpeartionTimeoutException异常,这个异常是MetaClientException异常的子类。如果要捕捉超时异常,可以单独catch这个异常信息。

send同时有一个同步的重载的方法用于设定发送超时时间,比如5秒发送超时:

     producer.send(message, 5000, TimeUnit.MILLISECONDS);

#异步发送

同步发送会阻塞当前线程,直到服务端返回发送结果或者超时才能解除阻塞。现在应用都讲究异步化,因此

Clone this wiki locally