RabbitMQ消息有效期与死信的处理过程是什么(rabbitmq,开发技术)

时间:2024-05-03 13:55:20 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

    一.前言

    RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

    设置TTL有两种方式:

    • 队列有效期:是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期

    • 消息有效期:发送消息时给消息设置属性,可以为每条消息都设置不同的TTL

    如果两种方式都设置了,则以设置的较小的为准。

    • 区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。

    二.设置消息有效期

    1.设置队列的有效期TTL

    定义队列的方法如下:

    Queue.DeclareOkqueueDeclare(Stringqueue,booleandurable,booleanexclusive,booleanautoDelete,Map<String,Object>arguments)throwsIOException;

    该方法的arguments参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。后台添加的话如下:

    RabbitMQ消息有效期与死信的处理过程是什么

    代码中设置如下:

    Map<String,Object>arguments=newHashMap<String,Object>();arguments.put("x-message-ttl",10000);//10秒钟单位为毫秒channel.queueDeclare(queueName,durable,exclusive,autoDelete,arguments);

    命令行模式来设置:

    rabbitmqctlset_policyTTL".*"'{"message-ttl":100000}'--apply-toqueues

    通过HTTP接口调用:

    $curl-i-uguest:guest-H"content-type:application/json"-XPUT-d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl":100000}}'http://ip:15672/api/queues/{vhost}/{queuename}

    2.设置队列的有效期Expire

    有效期Expire可以让队列在指定时间内 “未被使用” 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

    服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 10000 ,则表示该 queue 如果在 10s之内未被使用则会被删除。

    代码如下:

    Map<String,Object>args=newHashMap<String,Object>();args.put("x-expires",10000);channel.queueDeclare("queue",false,false,false,args);

    3.通过发送消息时设置有效期

    发送消息的方法如下:

    voidbasicPublish(Stringexchange,StringroutingKey,BasicPropertiesprops,byte[]body)throwsIOException;

    在该方法的props参数可以设置其有效期:

    Map<String,Object>headers=newHashMap<String,Object>();AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().deliveryMode(2)//消息持久.contentEncoding("UTF-8")//编码方式.contentType("text/plain").expiration("100000").headers(headers).build();channel.basicPublish("",queueName,properties,message.getBytes());

    通过HTTPAPI 接口设置:

    $curl-i-uguest:guest-H"content-type:application/json"-XPOST-d'{"properties":{"expiration":"100000"},"routing_key":"routingkey","payload":"bodys","payload_encoding":"string"}'http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

    三.死信交换机DLX

    介绍

    • 死信队列:DLX,dead-letter-exchange

    • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

    消息变成死信几种情况

    • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

    • 消息过期

    • 队列达到最大长度

    死信处理过程

    • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

    • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

    • 可以监听这个队列中的消息做相应的处理。

    用途

    通过监控消费死信队列中消息,来观察和分析数据。
    结合TTL实现延迟队列(比如下单超过多长时间自动关闭)

    使用

    代码如下:

    channel.exchangeDeclare("dlx_exchange","direct");//创建DLX:dlx_exchangeMap<String,Object>args=newHashMap<String,Object>();args.put("x-dead-letter-exchange","dlx_exchange");//设置死信交换机args.put("x-dead-letter-routing-key","dlx-routing-key");//设置DLX的路由键(可以不设置)channel.queueDeclare("myqueue",false,false,false,args);

    实例

    publicstaticvoidmain(String[]args)throwsException{ Connectionconnection=ConnectionUtil.getConnection(); Channelchannel=connection.createChannel(); //声明一个交换机,做死信交换机用 channel.exchangeDeclare("dlx_exchange","topic",true,false,null); //声明一个队列,做死信队列用 channel.queueDeclare("dlx_queue",true,false,false,null); //队列绑定到交换机上 channel.queueBind("dlx_queue","dlx_exchange","dlx.*"); channel.exchangeDeclare("normal_exchange","fanout",true,false,null); Map<String,Object>arguments=newHashMap<String,Object>(); arguments.put("x-message-ttl",1000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX arguments.put("x-dead-letter-exchange","dlx_exchange");//设置DLX arguments.put("x-dead-letter-routing-key","dlx.test");//设置DLX的路由键 //为队列normal_queue添加DLX channel.queueDeclare("normal_queue",true,false,false,arguments); channel.queueBind("normal_queue","normal_exchange",""); channel.basicPublish("normal_exchange","",MessageProperties.PERSISTENT_TEXT_PLAIN,("测试死信消息").getBytes()); System.out.println("发送消息时间:"+ConnectionUtil.formatDate(newDate())); channel.close(); connection.close(); }

    说明:

    申明死信队列dlx_queue的绑定如下,与死信交换机dlx_exchange(topic类型)进行绑定,routing key为"dlx.*"
    申明队列normal_queue,与交换机normal_exchange(fanout类型)进行绑定

    执行流程:

    • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上

    • 因为队列normal_queue没有消费者,消息过期后成为死信消息

    • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage

    • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

     </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
    本文:RabbitMQ消息有效期与死信的处理过程是什么的详细内容,希望对您有所帮助,信息来源于网络。
    上一篇:如何实现mysql远程跨库联合查询下一篇:

    14 人围观 / 0 条评论 ↓快速评论↓

    (必须)

    (必须,保密)

    阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18