springboot與rabbitMQ實現延遲載入
為什麼要延遲載入:
制定一項任務,在某個時間之後去執行,這種場景比較適合使用延遲載入的模式。
延遲隊列存儲的對象肯定是對應的延時消息,所謂」延時消息」是指當消息被發送以後,並不想讓消費者立即拿到消息,而是等待指定時間後,消費者才拿到這個消息進行消費。
原理:
Time To Live(TTL)
RabbitMQ可以針對Queue和Message設置 x-message-tt,來控制消息的生存時間,如果超時,則消息變為dead letter
RabbitMQ針對隊列中的消息過期時間有兩種方法可以設置。
A: 通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
B: 對消息進行單獨設置,每條消息TTL可以不同。
如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由。
x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange
x-dead-letter-routing-key:指定routing-key發送
隊列出現dead letter的情況有:
消息或者隊列的TTL過期
隊列達到最大長度
消息被消費端拒絕(basic.reject or basic.nack)並且requeue=false
利用DLX,當消息在一個隊列中變成死信後,它能被重新publish到另一個Exchange。這時候消息就可以重新被消費。
利用這兩個特性,設置消息的過期時間,當生產的消息沒有消費者去接收(這樣的隊列稱為死信隊列),消息在
rabbitServer的到達設置的過期時間時,就會將死信隊列中的過期消息發送到DLX中設置的Exchange中,這樣
就實現了延遲載入。
上圖:
集成過程如下:
生產者端配置
1.引入依賴:
[java] view plain copy
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.41</version>
- </dependency>
- </dependencies>
2.配置application.properties
[java] view plain copy
- server.port=10001
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.publisher-confirms=
true
- spring.rabbitmq.virtual-host=/
3.配置AMQP
[java] view plain copy
- @Configuration
public
class
AmqpConfig {
- @Bean
- RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return
new
RabbitAdmin(connectionFactory);- }
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {- RabbitTemplate template =
new
RabbitTemplate(connectionFactory); return
template;- }
- }
4.聲明隊列、交換機
[java] view plain copy
- @Configuration
public
class
ExchangeConfig {- /******************************************死信隊列***************************************************/
- //exchange name
public
static
final
String DEFAULT_EXCHANGE = "KSHOP";- //DLX QUEUE
public
static
final
String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";- //DLX repeat QUEUE 死信轉發隊列
public
static
final
String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";- //信道配置
- @Bean
public
DirectExchange defaultExchange() {return
new
DirectExchange(DEFAULT_EXCHANGE,true
,false
);- }
- @Bean
public
Queue repeatTradeQueue() {- Queue queue =
new
Queue(DEFAULT_REPEAT_TRADE_QUEUE_NAME,true
,false
,false
); return
queue;- }
- @Bean
public
Binding drepeatTradeBinding() {
return
BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(DEFAULT_REPEAT_TRADE_QUEUE_NAME);- }
- @Bean
public
Queue deadLetterQueue() {- Map<String, Object> arguments =
new
HashMap<>(); - arguments.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);
- arguments.put("x-dead-letter-routing-key", DEFAULT_REPEAT_TRADE_QUEUE_NAME);
- Queue queue =
new
Queue(DEFAULT_DEAD_LETTER_QUEUE_NAME,true
,false
,false
,arguments); - System.out.println("arguments :" + queue.getArguments());
return
queue;- }
- @Bean
public
Binding deadLetterBinding() {return
BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(DEFAULT_DEAD_LETTER_QUEUE_NAME);
- }
- }
這裡指定了聲明了死信隊列失效之後的發送的交換機和routing-key,其實這裡可以指定兩個交換機,一個是死信隊列的交換機1綁定死信隊列,一個是失效之後到達的交換機2綁定延遲隊列,死信隊列的交換機沒有消費者去監聽,而交換機2綁定的隊列就是真正的延遲隊列了,消費者去監聽這個隊列。
5.定義業務service
[java] view plain copy
- @Service
public
class
DeadLetterService {- @Autowired
private
RabbitTemplate rabbitTemplate;public
void
send(LogCarrier logCarrier) {- MessagePostProcessor processor =
new
MessagePostProcessor() { - @Override
public
Message postProcessMessage(Message message)
throws
AmqpException {- message.getMessageProperties().setExpiration(30000 + "");
return
message;- }
- };
- rabbitTemplate.convertAndSend(ExchangeConfig.DEFAULT_EXCHANGE, ExchangeConfig.DEFAULT_DEAD_LETTER_QUEUE_NAME,
- JSON.toJSONString(logCarrier), processor);
- }
- }
這裡指定了超時時間為30秒
6.創建controller去調用
[java] view plain copy
- @RestController
public
class
DeadLetterController {- <span stylex="white-space:pre;"> </span>@Autowired
- <span stylex="white-space:pre;"> </span>
private
DeadLetterService deadLetterService; - <span stylex="white-space:pre;"> </span>@GetMapping("deadLetter")
- <span stylex="white-space:pre;"> </span>
public
void
direct()throws
InterruptedException { - <span stylex="white-space:pre;"> </span>
long
i = 0; - <span stylex="white-space:pre;"> </span>
while
(i<10) { - <span stylex="white-space:pre;"> </span>LogCarrier contract =
new
LogCarrier(); - <span stylex="white-space:pre;"> </span>contract.setId(i++);
- <span stylex="white-space:pre;"> </span>contract.setType("direct");
- <span stylex="white-space:pre;"> </span>deadLetterService.send(contract);<span stylex="white-space:pre;"> </span>
- <span stylex="white-space:pre;"> </span>}
- <span stylex="white-space:pre;"> </span>System.out.println("消息發送時間:"+
new
Date()); - <span stylex="white-space:pre;"> </span>}
- }
這樣,生產者端的基本都配置完成。
消費者端配置,消費者端的配置很簡單:
1.依賴
[java] view plain copy
- <dependencies>
- <!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency> -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.41</version>
- </dependency>
- </dependencies>
2.application.properties
[java] view plain copy
- server.port=0
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.publisher-confirms=
true
- spring.rabbitmq.virtual-host=/
3.AMQP配置
[java] view plain copy
- @Configuration
- @EnableRabbit
public
class
ConsumerConfigimplements
RabbitListenerConfigurer {- @Bean
public
DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {- DefaultMessageHandlerMethodFactory factory =
new
DefaultMessageHandlerMethodFactory(); - factory.setMessageConverter(
new
MappingJackson2MessageConverter()); return
factory;- }
- @Bean
public
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {- SimpleRabbitListenerContainerFactory factory =
new
SimpleRabbitListenerContainerFactory(); - factory.setConnectionFactory(connectionFactory);
- factory.setPrefetchCount(1);//設置預讀取數,可以進行有效的負載均衡。
- factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//自動ask
return
factory;- }
- @Override
public
void
configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {- registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
- }
- }
4.監聽service
[java] view plain copy
- /**
- * 死信隊列
- *
- * @author cfyj 2017年11月24日 下午3:11:05
- *
- */
- @Service
public
class
CustomService4 {private
static
int
num = 0;- @RabbitListener(queues = "kshop.repeat.trade.queue")
- @RabbitHandler
public
void
process(String obj) {- LogCarrier logCarrier= JSON.parseObject(obj, LogCarrier.
class
); - System.out.println(num+":------消息接收時間"+
new
Date()+logCarrier); - }
- }
傳輸實體:
[java] view plain copy
package
com.cfyj.demo.domain;public
class
LogCarrier {private
Long id;private
String type;public
String getType() {return
type;- }
public
void
setType(String type) {this
.type = type;- }
public
Long getId() {return
id;- }
public
void
setId(Long id) {this
.id = id;- }
- @Override
public
String toString() {return
"LogCarrier [id=" + id + ", type=" + type + "]";- }
- }
這樣生產者和消費者都配置完成了。注意生產者和消費者端傳輸的對象實體類信息必須一致。
這樣就開始測試吧,測試之前我們帶著幾個問題去測試:
只啟動生產者,然後向死信隊列發送信息,消息失效後會怎麼樣?
如果指定交換機的類型為fanout,沒有消費者監聽是否會將信息直接丟棄呢?
1.測試,啟動生產者和消費者(先啟動生產者來聲明交換機和隊列)
發送消息的時間
死信隊列中的消息數
延遲隊列的消息,這時因為過期時間還沒到,所以死信隊列中的信息還沒有到達延遲隊列中
消費者收到延遲隊列的時間
接收-發送的時間正好為過期時間30s,這樣就實現了消息的延遲消費,在到達過期時間後,死信隊列的消息會發送到指定x-dead-letter-exchange的交換機中,由交換機發送到設置的延遲隊列。
2.當我們只啟動生產者時,發送消息,消息會怎麼樣?(topic類型和direct類型的測試結果一致)
發送請求時間:
死信隊列中的消息:
當達到過期時間後,延遲隊列的消息(注意兩個隊列收到消息的時間):
當只啟動生產者服務然後發送消息到死信隊列時,消息會先堆積到死信隊列,然後到達過期時間後重發到延遲隊列中。
3.如果指定交換機的類型為fanout,沒有消費者監聽是否會將信息直接丟棄呢?
發送信息後,消息會先進入死信隊列中,並沒有直接丟棄消息
死信隊列,注意接收消息的時間:
死信隊列中的消息到達過期時間後:
延遲隊列:
測試結果與direct類型相同。
![](https://pic.pimg.tw/zzuyanan/1488615166-1259157397.png)
![](https://pic.pimg.tw/zzuyanan/1482887990-2595557020.jpg)
※解決 nodejs callback無限嵌套(回調地獄)問題
※MeanShift濾波演算法與實現
TAG:程序員小新人學習 |