教育行業(yè)A股IPO第一股(股票代碼 003032)

全國(guó)咨詢/投訴熱線:400-618-4000

RabbitMQ:偽延時(shí)隊(duì)列

更新時(shí)間:2018年10月24日15時(shí)50分 來(lái)源:傳智播客 瀏覽次數(shù):

  一、什么是延時(shí)隊(duì)列

  所謂延時(shí)隊(duì)列是指消息push到隊(duì)列后,監(jiān)聽(tīng)的消費(fèi)者不能第一時(shí)間獲取消息,需要等到指定時(shí)間才能消費(fèi)。

  一般在業(yè)務(wù)里面需要對(duì)某些消息做定時(shí)發(fā)送,不想走定時(shí)任務(wù)或者是用戶下單之后多長(zhǎng)時(shí)間自動(dòng)失效類似的場(chǎng)景可以考慮通過(guò)延時(shí)隊(duì)列實(shí)現(xiàn)。

  二、RabbitMQ實(shí)現(xiàn)

  MQ本身并不支持直接的延時(shí)隊(duì)列實(shí)現(xiàn),但是我們可以通過(guò)RabbitMQ的消息TTL和Dead Letter規(guī)則來(lái)實(shí)現(xiàn)

  Time TO Live (TTL): RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl,來(lái)控制消息的生存時(shí)間

  Dead Letter 死信 RabbitMQ官網(wǎng)這樣定義死信消息:

  . 消息被拒絕(basic.reject或basic.nack)并且requeue=false.

  . 消息TTL過(guò)期

  隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無(wú)法再添加數(shù)據(jù)到mq中)

  Dead Letter Exchanges(DLX)死信交換機(jī) MQ默認(rèn)的死信消息是丟棄的,但是我們可以通過(guò)設(shè)置以下兩個(gè)屬性讓死信消息轉(zhuǎn)發(fā)到我們指定的隊(duì)列。

  x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange

  x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送

  延時(shí)隊(duì)列實(shí)現(xiàn): 了解了MQ隊(duì)列的TTL和Dead Letter之后,我們就可以通過(guò)這兩個(gè)特性來(lái)實(shí)現(xiàn),首先我們通過(guò)設(shè)置消息或者隊(duì)列的TTL來(lái)設(shè)置消息在指定時(shí)間后成為死信,再設(shè)置死信消息的路由轉(zhuǎn)發(fā)規(guī)則到特定隊(duì)列,消費(fèi)者通過(guò)監(jiān)聽(tīng)這個(gè)特定隊(duì)列就能實(shí)現(xiàn)延時(shí)隊(duì)列的效果。

  代碼實(shí)現(xiàn)

  生產(chǎn)者發(fā)送消息:ttlQueue存放過(guò)期時(shí)間的隊(duì)列,deadLetterQueue死信轉(zhuǎn)發(fā)隊(duì)列,seconds是過(guò)期時(shí)間

  public static void sendTTLMsg(String ttlQueue, String deadLetterQueue, Object msg, Integer seconds) {

  MqSender.getInstance().setHost(RABBIT_MQ_HOST);

  // 獲取到連接以及MQ通道

  Connection connection;

  try {

  connection = MqSender.getInstance().newConnection();

  // 從連接中創(chuàng)建通道

  Channel channel = connection.createChannel();

  // 配置

  Map args = new HashMap();

  args.put("x-dead-letter-exchange", "");

  args.put("x-dead-letter-routing-key", deadLetterQueue);

  channel.queueDeclare(deadLetterQueue, true, false, false, null);

  channel.queueDeclare(ttlQueue, true, false, false, args);

  // 發(fā)送消息

  channel.basicPublish("", ttlQueue, new AMQP.BasicProperties.Builder().expiration(String.valueOf(seconds)).build(), MAPPER.writeValueAsBytes(msg));

  channel.close();

  connection.close();

  } catch (IOException e) {

  e.printStackTrace();

  }

  }

  消費(fèi)者通過(guò)監(jiān)聽(tīng)deadLetterQueue來(lái)實(shí)現(xiàn)延時(shí)消息監(jiān)聽(tīng)

  三、 延時(shí)隊(duì)列的問(wèn)題

  通過(guò)我們測(cè)試發(fā)現(xiàn),這種方式實(shí)現(xiàn)的延時(shí)隊(duì)列,在隊(duì)列設(shè)置TTL的情況下是可以正常的,但是如果根據(jù)消息設(shè)置了不同的TTL,就會(huì)有問(wèn)題,因?yàn)镸Q本質(zhì)上還是消息隊(duì)列中間件,隊(duì)列是遵循先進(jìn)先出的,如果有兩個(gè)消息先后入隊(duì),但是后入隊(duì)的消息TTL小于前面的消息,它必須等待之前的消息被消費(fèi)完后才能挪到隊(duì)列頭部,這樣不同延時(shí)消息就會(huì)出現(xiàn)問(wèn)題。

  通過(guò)RabbitMQ官網(wǎng)的文檔也介紹了這個(gè)問(wèn)題:

  Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered)

  所以我才稱之為MQ的偽延時(shí)隊(duì)列,這種延時(shí)隊(duì)列在消息TTL不同的情況下并不能實(shí)現(xiàn)真正的延時(shí)消費(fèi)。

  四、解決RabbitMQ的偽延時(shí)方案

  既然RabbitMQ無(wú)法支持不同TTL消息的延時(shí)消費(fèi),那么如果我們要實(shí)現(xiàn)這種功能,有什么方案呢,在實(shí)際業(yè)務(wù)開(kāi)發(fā)中,我們有這樣的解決方案:

  首先我們會(huì)創(chuàng)建多級(jí)延時(shí)消費(fèi)隊(duì)列(比如兩分鐘,三十分鐘,一天三種,具體可以根據(jù)業(yè)務(wù)量和訪問(wèn)量還有時(shí)間精確度來(lái)劃分,這里的兩分鐘、三十分鐘是指隊(duì)列統(tǒng)一的TTL),push消費(fèi)隊(duì)列的時(shí)候,會(huì)根據(jù)需要延時(shí)的時(shí)間,丟到不同的消費(fèi)隊(duì)列,比如小于三十分鐘的我們push到兩分鐘隊(duì)列,三十分鐘到一天的放入三十分鐘隊(duì)列,超過(guò)一天的放入一天隊(duì)列,在死信隊(duì)列的監(jiān)聽(tīng)器做同樣的判斷,如果是小于等于當(dāng)前時(shí)間消息的,立馬消費(fèi),否則按照上述規(guī)則繼續(xù)循環(huán)到不同的延時(shí)隊(duì)列

  這種方案解決了多級(jí)延時(shí)消費(fèi)的問(wèn)題,并且能夠較大程度地避免了消息的重復(fù)循環(huán),降低MQ的壓力,但是缺點(diǎn)也比較明顯,因?yàn)樽畹褪莾煞昼姷难訒r(shí),理論上來(lái)說(shuō)最多會(huì)有兩分鐘的誤差,如果對(duì)時(shí)間要求性比較高的,可以適當(dāng)調(diào)低最低一級(jí)別的延時(shí)TTL,比如一分鐘或者三十秒

  類似代碼如下:cts是需要消費(fèi)掉的時(shí)間戳

  long now = System.currentTimeMillis();

  long cts = Long.valueOf(feedComment.getCts());

  if (cts - now <= 30 * 60 * 1000) {

  MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_2MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 2 * 60);

  } else if (cts - now <= 24 * 60 * 60 * 1000) {

  MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_30MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 30 * 60);

  } else {

  MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_24HOUR, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 24 * 60 * 60);

  }



作者:傳智播客JavaEE培訓(xùn)學(xué)院
首發(fā):http://java.itcast.cn/

0 分享到:
和我們?cè)诰€交談!