當前位置:
首頁 > 知識 > springboot與rabbitMQ實現延遲載入

springboot與rabbitMQ實現延遲載入

為什麼要延遲載入:

制定一項任務,在某個時間之後去執行,這種場景比較適合使用延遲載入的模式。

延遲隊列存儲的對象肯定是對應的延時消息,所謂」延時消息」是指當消息被發送以後,並不想讓消費者立即拿到消息,而是等待指定時間後,消費者才拿到這個消息進行消費。

原理:

Time To Live(TTL)

RabbitMQ可以針對Queue和Message設置 x-message-tt,來控制消息的生存時間,如果超時,則消息變為dead letter

RabbitMQ針對隊列中的消息過期時間有兩種方法可以設置。

A: 通過隊列屬性設置,隊列中所有消息都有相同的過期時間。

B: 對消息進行單獨設置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead letter

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由。

x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange

x-dead-letter-routing-key:指定routing-key發送

隊列出現dead letter的情況有:

消息或者隊列的TTL過期

隊列達到最大長度

消息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

利用DLX,當消息在一個隊列中變成死信後,它能被重新publish到另一個Exchange。這時候消息就可以重新被消費。

利用這兩個特性,設置消息的過期時間,當生產的消息沒有消費者去接收(這樣的隊列稱為死信隊列),消息在

rabbitServer的到達設置的過期時間時,就會將死信隊列中的過期消息發送到DLX中設置的Exchange中,這樣

就實現了延遲載入。

上圖:

springboot與rabbitMQ實現延遲載入

集成過程如下:

生產者端配置

1.引入依賴:

[java] view plain copy

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-amqp</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-devtools</artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>com.alibaba</groupId>
  21. <artifactId>fastjson</artifactId>
  22. <version>1.2.41</version>
  23. </dependency>
  24. </dependencies>

2.配置application.properties

[java] view plain copy

  1. server.port=10001
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.publisher-confirms=

    true

  7. spring.rabbitmq.virtual-host=/

3.配置AMQP

[java] view plain copy

  1. @Configuration
  2. public

    class

    AmqpConfig {

  3. @Bean
  4. RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  5. return

    new

    RabbitAdmin(connectionFactory);
  6. }
  7. @Bean
  8. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  9. public

    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  10. RabbitTemplate template =

    new

    RabbitTemplate(connectionFactory);
  11. return

    template;
  12. }
  13. }

4.聲明隊列、交換機

[java] view plain copy

  1. @Configuration
  2. public

    class

    ExchangeConfig {
  3. /******************************************死信隊列***************************************************/
  4. //exchange name
  5. public

    static

    final

    String DEFAULT_EXCHANGE = "KSHOP";
  6. //DLX QUEUE
  7. public

    static

    final

    String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";
  8. //DLX repeat QUEUE 死信轉發隊列
  9. public

    static

    final

    String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";
  10. //信道配置
  11. @Bean
  12. public

    DirectExchange defaultExchange() {
  13. return

    new

    DirectExchange(DEFAULT_EXCHANGE,

    true

    ,

    false

    );
  14. }
  15. @Bean
  16. public

    Queue repeatTradeQueue() {
  17. Queue queue =

    new

    Queue(DEFAULT_REPEAT_TRADE_QUEUE_NAME,

    true

    ,

    false

    ,

    false

    );
  18. return

    queue;
  19. }
  20. @Bean
  21. public

    Binding drepeatTradeBinding() {

  22. return

    BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(DEFAULT_REPEAT_TRADE_QUEUE_NAME);
  23. }
  24. @Bean
  25. public

    Queue deadLetterQueue() {
  26. Map<String, Object> arguments =

    new

    HashMap<>();
  27. arguments.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);
  28. arguments.put("x-dead-letter-routing-key", DEFAULT_REPEAT_TRADE_QUEUE_NAME);
  29. Queue queue =

    new

    Queue(DEFAULT_DEAD_LETTER_QUEUE_NAME,

    true

    ,

    false

    ,

    false

    ,arguments);
  30. System.out.println("arguments :" + queue.getArguments());
  31. return

    queue;
  32. }
  33. @Bean
  34. public

    Binding deadLetterBinding() {
  35. return

    BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(DEFAULT_DEAD_LETTER_QUEUE_NAME);

  36. }
  37. }

這裡指定了聲明了死信隊列失效之後的發送的交換機和routing-key,其實這裡可以指定兩個交換機,一個是死信隊列的交換機1綁定死信隊列,一個是失效之後到達的交換機2綁定延遲隊列,死信隊列的交換機沒有消費者去監聽,而交換機2綁定的隊列就是真正的延遲隊列了,消費者去監聽這個隊列。

5.定義業務service

[java] view plain copy

  1. @Service
  2. public

    class

    DeadLetterService {
  3. @Autowired
  4. private

    RabbitTemplate rabbitTemplate;
  5. public

    void

    send(LogCarrier logCarrier) {
  6. MessagePostProcessor processor =

    new

    MessagePostProcessor() {
  7. @Override
  8. public

    Message postProcessMessage(Message message)

    throws

    AmqpException {
  9. message.getMessageProperties().setExpiration(30000 + "");
  10. return

    message;
  11. }
  12. };
  13. rabbitTemplate.convertAndSend(ExchangeConfig.DEFAULT_EXCHANGE, ExchangeConfig.DEFAULT_DEAD_LETTER_QUEUE_NAME,
  14. JSON.toJSONString(logCarrier), processor);
  15. }
  16. }

這裡指定了超時時間為30秒

6.創建controller去調用

[java] view plain copy

  1. @RestController
  2. public

    class

    DeadLetterController {
  3. <span stylex="white-space:pre;"> </span>@Autowired
  4. <span stylex="white-space:pre;"> </span>

    private

    DeadLetterService deadLetterService;
  5. <span stylex="white-space:pre;"> </span>@GetMapping("deadLetter")
  6. <span stylex="white-space:pre;"> </span>

    public

    void

    direct()

    throws

    InterruptedException {
  7. <span stylex="white-space:pre;"> </span>

    long

    i = 0;
  8. <span stylex="white-space:pre;"> </span>

    while

    (i<10) {
  9. <span stylex="white-space:pre;"> </span>LogCarrier contract =

    new

    LogCarrier();
  10. <span stylex="white-space:pre;"> </span>contract.setId(i++);
  11. <span stylex="white-space:pre;"> </span>contract.setType("direct");
  12. <span stylex="white-space:pre;"> </span>deadLetterService.send(contract);<span stylex="white-space:pre;"> </span>
  13. <span stylex="white-space:pre;"> </span>}
  14. <span stylex="white-space:pre;"> </span>System.out.println("消息發送時間:"+

    new

    Date());
  15. <span stylex="white-space:pre;"> </span>}
  16. }

這樣,生產者端的基本都配置完成。

消費者端配置,消費者端的配置很簡單:

1.依賴

[java] view plain copy

  1. <dependencies>
  2. <!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  3. </dependency> -->
  4. <dependency>
  5. <groupId>org.springframework.boot</groupId>
  6. <artifactId>spring-boot-starter-web</artifactId>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-test</artifactId>
  11. <scope>test</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-amqp</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-devtools</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. <version>1.2.41</version>
  25. </dependency>
  26. </dependencies>

2.application.properties

[java] view plain copy

  1. server.port=0
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.publisher-confirms=

    true

  7. spring.rabbitmq.virtual-host=/

3.AMQP配置

[java] view plain copy

  1. @Configuration
  2. @EnableRabbit
  3. public

    class

    ConsumerConfig

    implements

    RabbitListenerConfigurer {
  4. @Bean
  5. public

    DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
  6. DefaultMessageHandlerMethodFactory factory =

    new

    DefaultMessageHandlerMethodFactory();
  7. factory.setMessageConverter(

    new

    MappingJackson2MessageConverter());
  8. return

    factory;
  9. }
  10. @Bean
  11. public

    SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  12. SimpleRabbitListenerContainerFactory factory =

    new

    SimpleRabbitListenerContainerFactory();
  13. factory.setConnectionFactory(connectionFactory);
  14. factory.setPrefetchCount(1);//設置預讀取數,可以進行有效的負載均衡。
  15. factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//自動ask
  16. return

    factory;
  17. }
  18. @Override
  19. public

    void

    configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
  20. registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
  21. }
  22. }

4.監聽service

[java] view plain copy

  1. /**
  2. * 死信隊列
  3. *
  4. * @author cfyj 2017年11月24日 下午3:11:05
  5. *
  6. */
  7. @Service
  8. public

    class

    CustomService4 {
  9. private

    static

    int

    num = 0;
  10. @RabbitListener(queues = "kshop.repeat.trade.queue")
  11. @RabbitHandler
  12. public

    void

    process(String obj) {
  13. LogCarrier logCarrier= JSON.parseObject(obj, LogCarrier.

    class

    );
  14. System.out.println(num+":------消息接收時間"+

    new

    Date()+logCarrier);
  15. }
  16. }

傳輸實體:

[java] view plain copy

  1. package

    com.cfyj.demo.domain;
  2. public

    class

    LogCarrier {
  3. private

    Long id;
  4. private

    String type;
  5. public

    String getType() {
  6. return

    type;
  7. }
  8. public

    void

    setType(String type) {
  9. this

    .type = type;
  10. }
  11. public

    Long getId() {
  12. return

    id;
  13. }
  14. public

    void

    setId(Long id) {
  15. this

    .id = id;
  16. }
  17. @Override
  18. public

    String toString() {
  19. return

    "LogCarrier [id=" + id + ", type=" + type + "]";
  20. }
  21. }

這樣生產者和消費者都配置完成了。注意生產者和消費者端傳輸的對象實體類信息必須一致。

這樣就開始測試吧,測試之前我們帶著幾個問題去測試:

只啟動生產者,然後向死信隊列發送信息,消息失效後會怎麼樣?

如果指定交換機的類型為fanout,沒有消費者監聽是否會將信息直接丟棄呢?

1.測試,啟動生產者和消費者(先啟動生產者來聲明交換機和隊列)

發送消息的時間

springboot與rabbitMQ實現延遲載入

死信隊列中的消息數

springboot與rabbitMQ實現延遲載入

延遲隊列的消息,這時因為過期時間還沒到,所以死信隊列中的信息還沒有到達延遲隊列中

springboot與rabbitMQ實現延遲載入

消費者收到延遲隊列的時間

springboot與rabbitMQ實現延遲載入

接收-發送的時間正好為過期時間30s,這樣就實現了消息的延遲消費,在到達過期時間後,死信隊列的消息會發送到指定x-dead-letter-exchange的交換機中,由交換機發送到設置的延遲隊列。

2.當我們只啟動生產者時,發送消息,消息會怎麼樣?(topic類型和direct類型的測試結果一致)

發送請求時間:

springboot與rabbitMQ實現延遲載入

死信隊列中的消息:

springboot與rabbitMQ實現延遲載入

當達到過期時間後,延遲隊列的消息(注意兩個隊列收到消息的時間):

springboot與rabbitMQ實現延遲載入

當只啟動生產者服務然後發送消息到死信隊列時,消息會先堆積到死信隊列,然後到達過期時間後重發到延遲隊列中。

3.如果指定交換機的類型為fanout,沒有消費者監聽是否會將信息直接丟棄呢?

發送信息後,消息會先進入死信隊列中,並沒有直接丟棄消息

死信隊列,注意接收消息的時間:

springboot與rabbitMQ實現延遲載入

死信隊列中的消息到達過期時間後:

springboot與rabbitMQ實現延遲載入

延遲隊列:

springboot與rabbitMQ實現延遲載入

測試結果與direct類型相同。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

解決 nodejs callback無限嵌套(回調地獄)問題
MeanShift濾波演算法與實現

TAG:程序員小新人學習 |