从上一篇文章的基本消息模型案例中,我们发现消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
-
自动ACK:消息一旦被接收,消费者自动发送ACK
-
手动ACK:消息接收后,不会发送ACK,需要手动调用
在基本消息模型案例的代码中有一段
// 监听队列,第二个参数:是否自动进行消息确认。 channel.basicConsume(QUEUE_NAME, true, consumer);
当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息,rabbitmq就删除了该条消息
自动ack存在的问题:
生产者不做任何修改,直接运行,可以看到一条消息存在队列中:
消费者代码,模拟了一个异常:
/** * 消费者 */@Slf4jpublic class ConsumerError { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建一个信道,意味着每个线程单独一个信道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); //异常模拟 int i = 1/0; log.debug("消费消息:{}",msg); } }; // 监听队列,第二个参数:是否自动进行消息确认。 channel.basicConsume(QUEUE_NAME, true, consumer); }}
运行消费者
实际上并没有消费成功消息,队列中的消息却删除了。
手动确认消息
/** * 消费者 */@Slf4jpublic class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建一个信道,意味着每个线程单独一个信道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); int i = 1/0; log.debug("消费消息:{}",msg); // 手动进行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列,第二个参数:是否自动进行消息确认。 channel.basicConsume(QUEUE_NAME, false, consumer); }}
运行生产者,生产一条消息。
运行消费者,出现异常
查看管理界面,发现该条消息还存在。
将异常去掉,运行。
控制台打印出日志
查看管理界面,发现消息已经被消费。
详细源码地址