RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

作者: 时间: 2018-11-28 分类: 技术文章 | 0条评论 |

Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息,这个广播模式场景,适用于分布式服务器更新缓存或配置等场景。代码实现上其实很简单,就是在消费端添加:

 Java Code By WuleBa.COM
1
consumer.setMessageModel(MessageModel.BROADCASTING);


就可以了。我们看实验步骤:

一、启动ConsumerBroadCastMember1

二、启动ConsumerBroadCastMember2

三、运行ProducerBraodCast

四、我们可以看到两个Consumer都收到了同样的消息。

Producer端:

 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.hope.lee.producer;



import com.alibaba.rocketmq.client.exception.MQBrokerException;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendCallback;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

import com.alibaba.rocketmq.common.message.MessageQueue;

import com.alibaba.rocketmq.remoting.exception.RemotingException;



public class ProducerBroadCast {

    
public static void main(String[] args) {

        DefaultMQProducer producer = 
new DefaultMQProducer(“push_consumer”);

        producer.setNamesrvAddr(
“192.168.31.176:9876;192.168.31.165:9876”);

        
try {

            
// 设置实例名称

            producer.setInstanceName(“producer_broadcast”);

            
// 设置重试次数

            producer.setRetryTimesWhenSendFailed(3);

            
// 开启生产者

            producer.start();

            
// 创建一条消息

            Message msg = new Message(“topic_broadcast”“TagA”“OrderID0034”“message_broadcast_test”.getBytes());

            SendResult send = producer.send(msg);

            System.out.println(
“id:—>” + send.getMsgId() + “,result:—>” + send.getSendStatus());

            

        } 
catch (MQClientException e) {

            e.printStackTrace();

        } 
catch (RemotingException e) {

            e.printStackTrace();

        } 
catch (MQBrokerException e) {

            e.printStackTrace();

        } 
catch (InterruptedException e) {

            e.printStackTrace();

        } 

        producer.shutdown();

    }

}


Consumer端:

 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package org.hope.lee.consumer;



import java.util.List;



import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



public class ConsumerBroadCastMember1 {

    
public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = 
new DefaultMQPushConsumer(“consumer_broadcast”);

        consumer.setNamesrvAddr(
“192.168.31.176:9876;192.168.31.165:9876”);

        
// 批量消费,每次拉取10条

        consumer.setConsumeMessageBatchMaxSize(10);

        
//设置广播消费

        consumer.setMessageModel(MessageModel.BROADCASTING);

        
//设置集群消费

//        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 如果非第一次启动,那么按照上次消费的位置继续消费

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        
// 订阅PushTopic下Tag为push的消息

        consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

        consumer.registerMessageListener(
new MqBroadCastListener());

        consumer.start();

        System.out.println(
“Consumer1 Started.”);



    }

}

class MqBroadCastListener implements MessageListenerConcurrently{

    
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        
try {

            MessageExt msg = msgs.get(
0);

            
String msgBody = new String(msg.getBody(), “utf-8”);

            System.out.println(
“msgBody:” + msgBody);

        } 
catch(Exception e) {

            e.printStackTrace();

            
return ConsumeConcurrentlyStatus.RECONSUME_LATER;

        }

        
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

    

}


 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.hope.lee.consumer;



import java.util.List;



import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



public class ConsumerBroadCastMember2 {

    
public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = 
new DefaultMQPushConsumer(“consumer_broadcast”);

        consumer.setNamesrvAddr(
“192.168.31.176:9876;192.168.31.165:9876”);

        
// 批量消费,每次拉取10条

        consumer.setConsumeMessageBatchMaxSize(10);

        
//设置广播消费

        consumer.setMessageModel(MessageModel.BROADCASTING);

        
//设置集群消费

//        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 如果非第一次启动,那么按照上次消费的位置继续消费

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        
// 订阅PushTopic下Tag为push的消息

        consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

        consumer.registerMessageListener(
new MqBroadCastListener());

        consumer.start();

        System.out.println(
“Consumer2 Started.”);



    }

}


结果:

RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

吾乐吧软件站提醒大家:

上面的代码是转载的,下面才是吾乐吧要说的重点,小编按照上面代码整合到自己的项目之后发现了几个大坑,一直没跑起来,现在分享下解决方法:

1、上面的Lisener部分,可以改成这样的写法:

 Java Code By WuleBa.COM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//设置一个Listener,主要进行消息的逻辑处理  

consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override  

    
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                    ConsumeConcurrentlyContext context) {  

        
for (MessageExt messageExt : msgs) {    

           
String messageBody = new String(messageExt.getBody());   

            System.out.println(
new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(

                 
new Date())+“消费响应:msgId : “ + messageExt.getMsgId() + “,  msgBody : “ + messageBody);//输出消息内容    

        }    

          

        
//返回消费状态  

        //CONSUME_SUCCESS 消费成功  

        //RECONSUME_LATER 消费失败,需要稍后重新消费  

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

    }  

}); 


2、RocketMQ的消费者一直提示“readLocalOffset Exception, maybe fastjson version too low”的解决方法:

一开始,还以为是fastjson版本不正确,换了最新版也是不行,后面调试进去mq代码才发现,广播模式会在本地生成一个一些文件,然后里面的文件出问题了(内容为空,然后强制转JSON。。。你懂的。。。),你把里面的东西删除,就可以正常了,删除以下2个文件:
C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json
C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json.bak

3、广播模式下,RocketMQ不会更新已消费的状态,依然是 NOT_ONLINE 状态。

RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

所以如果你发现没有变成CONSUMED状态,完全不用担心,详情请看这里: https://github.com/apache/rocketmq/issues/296#issuecomment-384849461

All by flydoos 2018-11-28

本文采用 CC协议 发布,转载请注明:转载自 吾乐吧软件站

本文链接地址:http://www.wuleba.com/?p=29968

发表评论


微软MSDN资源免费订阅,MSDN 我告诉你