博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 消息中间件如何保证消费者customer能够成功处理消息?
阅读量:7120 次
发布时间:2019-06-28

本文共 2073 字,大约阅读时间需要 6 分钟。

hot3.png

一、确保消费者customer处理消息成功

默认情况下消费者C1接收到消息1无论是否正常接受和处理都会立即应答rabbit服务器,然后消息1就会从队列中被删除,假如C1突然出现异常状况导致消息1没有被处理完毕,那么消息1就处理失败了,也不会有其他消费者去处理消息1。事实上我们希望的是消息1如果没有被C1正确处理完毕,那么就发送给其他消费者处理,为了达到这个目的,只需要做两件事情,第一关闭rabbitMq的自动应答机制,第二消费者正确处理完消息后手动应答。

RabbitMQ应答机制:

  • 自动确认,默认是自动确认,即获取消息后,直接确认。
  • 手动确认,给当前消息设置状态,当手动ack后服务端才会删除该消息,如果返回nack,重新入队。

customer在监听队列接收消息的时候,申明取消自动应答,手动返回完成。

channel.basicConsume(QUEUE_NAME, false, consumer);

在完成消费操作时,返回确认状态。

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  • delivery.getEnvelope().getDeliveryTag(): 消息id
  • true: 这里true或者false都代表已经应答

二、存在两个大问题

问题: 1)重复消费 2) 消息堆积

1. 消息重发导致消息重复消费的问题(消息中间件幂等性)

如果在消费方customer在完成消费了之后,由于网络问题没有及时应答,就会存在大量消息堆积在MQ服务器。

由于RabbitMQ有消息重新发送的机制,如果没有及时回应那么就会继续重发,重发就会导致消息重复消费。

(1)在生产者producer产生消息的时候可以给消息一个唯一的id。

(2)在执行完毕之后,利用redis缓存消息id,判断时候消费过。

单个消费者:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	String value = redis.get("key_"+envelope.getDeliveryTag());	if (value != null){		//之前已经执行过了,所以直接应答		channel.basicAck(deliveryTag, true);		return ;	}	String exchange = envelope.getExchange();//交换	long deliveryTag = envelope.getDeliveryTag();//消息id	String routingKey  = envelope.getRoutingKey();//路由key	String message = new String(body, "utf-8");	System.out.println(message);	//先在redis中放入消息id,记得加上过期时间	// 返回确认状态}

多个消费者: 分布式锁解决

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {	try{		//在赋值锁的时候可以加上过期时间		boolean flag = redisTemplate.opsForValue().setIfAbsent("key_"+envelope.getDeliveryTag(),envelope.getDeliveryTag());		//如果没有被赋值则返回true		if(flag){			String exchange = envelope.getExchange();//交换			long deliveryTag = envelope.getDeliveryTag();//消息id			 String routingKey  = envelope.getRoutingKey();//路由key			String message = new String(body, "utf-8");			System.out.println(message);			// 返回确认状态		}	}catch(Exception e){	//打印日志	//删除redis中的记录	//直接return;不应答,等待再次重新发送。	}}

2. 消息堆积解决

1)加大rabbitMQ的内存空间

2)重启服务

转载于:https://my.oschina.net/edisonOnCall/blog/3044853

你可能感兴趣的文章
CISCO RIP动态协议路由回环的产生
查看>>
老板会因为你拼死写代码而感谢你吗?
查看>>
多媒体指令(图像二值化)
查看>>
matlab练习程序(Sepia Tone滤镜)
查看>>
使用EJB3 Java 持久化API来标准化Java的持久化操作
查看>>
发布一款小软件:和讯博客助手-测试版- 0.3.0
查看>>
[原创]BizTalk 开发系列
查看>>
2013年工作生活总结
查看>>
Unity3D笔记 英保通三 脚本编写 、物体间通信
查看>>
CodeSmith注册错误的解决方法
查看>>
生命与工作
查看>>
Timestamp和String的相互转换 Java
查看>>
Visual Studio 2005常用插件搜罗 [转]
查看>>
梁朝伟变刘德华之山寨实现
查看>>
[OpenStack] OpenStack Essex - Glance 安装部署与命令行详解
查看>>
《止杀令》:蹄穿大漠尘 济世有奇功
查看>>
数据结构基础温故-7.排序
查看>>
咏南中间件+开发框架中秋国庆大促
查看>>
mysql列类型
查看>>
CentOS中JAVA_HOME的环境变量设置
查看>>