博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
2、Rabbitmq消费者确认机制
阅读量:6967 次
发布时间:2019-06-27

本文共 2764 字,大约阅读时间需要 9 分钟。

hot3.png

从上一篇文章的基本消息模型案例中,我们发现消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ怎么知道消息被接收了呢?

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK

  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

在基本消息模型案例的代码中有一段

// 监听队列,第二个参数:是否自动进行消息确认。        channel.basicConsume(QUEUE_NAME, true, consumer);

e4416dd903a89cd396223bafccfbcb070a4.jpg

当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息,rabbitmq就删除了该条消息

 

自动ack存在的问题:

生产者不做任何修改,直接运行,可以看到一条消息存在队列中:

9ff664da69f1ec1cd0bf0c6595e032d4195.jpg

消费者代码,模拟了一个异常:

/** * 消费者 */@Slf4jpublic class ConsumerError {    private final static String QUEUE_NAME = "hello";    public static void main(String[] argv) throws Exception {        // 获取到连接        Connection connection = ConnectionUtil.getConnection();        // 创建一个信道,意味着每个线程单独一个信道        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 定义队列的消费者        DefaultConsumer consumer = new DefaultConsumer(channel) {            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用            @Override            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,                                       byte[] body) throws IOException {                // body 即消息体                String msg = new String(body);                //异常模拟                int i = 1/0;                log.debug("消费消息:{}",msg);            }        };        // 监听队列,第二个参数:是否自动进行消息确认。        channel.basicConsume(QUEUE_NAME, true, consumer);    }}

运行消费者

f1d14488595784a3395948558b69d74a680.jpg

实际上并没有消费成功消息,队列中的消息却删除了。

c7382dd122a3307fe25232dd36b0e7adf28.jpg

 

手动确认消息

/** * 消费者 */@Slf4jpublic class Consumer {    private final static String QUEUE_NAME = "hello";    public static void main(String[] argv) throws Exception {        // 获取到连接        Connection connection = ConnectionUtil.getConnection();        // 创建一个信道,意味着每个线程单独一个信道        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 定义队列的消费者        DefaultConsumer consumer = new DefaultConsumer(channel) {            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用            @Override            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,                                       byte[] body) throws IOException {                // body 即消息体                String msg = new String(body);                int i = 1/0;                log.debug("消费消息:{}",msg);                // 手动进行ACK                channel.basicAck(envelope.getDeliveryTag(), false);            }        };        // 监听队列,第二个参数:是否自动进行消息确认。        channel.basicConsume(QUEUE_NAME, false, consumer);    }}

运行生产者,生产一条消息。

ccd953876022ac7d1a0bdf51ba346faa67d.jpg

运行消费者,出现异常

6fc8e70bf134383f6bba083e4fb7688c638.jpg

查看管理界面,发现该条消息还存在。

9fbccaf2ac71014de990d8b03fa44b8e4cf.jpg

将异常去掉,运行。

6f7efca9a2389e9e00e10a1557fdf0ffbca.jpg

控制台打印出日志

d44bd8ca55abdf21752366c4340cc9c520c.jpg

查看管理界面,发现消息已经被消费。

93f8bf3dfcb395437b1e3d933cd42b0b90d.jpg

详细源码地址

8d6d4cfaf36b9127309d40b706da14b6935.jpg

转载于:https://my.oschina.net/suzheworld/blog/3002275

你可能感兴趣的文章
漫谈vfp程序界面及设计观
查看>>
性能魔方mmTrix:应用性能与用户体验“石器时代”终结者
查看>>
1.大道至简的数据处理工具(Power Query)
查看>>
python第二阶段第五天额外增加
查看>>
开始写博客啦
查看>>
Cisco_IP_Communicator_软电话配置过程全解
查看>>
友元函数的产生和理解
查看>>
系统运维问题诊断集合
查看>>
springboot 2.0+elasticseach 5.5.0集群环境搭建示例(附源码)
查看>>
利用Hadoop提供的RPC API实现简单的RPC程序
查看>>
avg count null值计算
查看>>
SfB迁移CMS中央存储数据库镜像配置
查看>>
Gartner:2013-2014年全球MSS市场分析
查看>>
std::function源码分析
查看>>
Android的Activity组件启动,切换和值传递学习
查看>>
SQL Server中truncate、delete和drop的异同点
查看>>
我的友情链接
查看>>
工作节点配置
查看>>
fedora 20安装软件
查看>>
我的友情链接
查看>>