RabbitMQ入門-Routing直連模式
Hello World模式,告訴我們如何一對一發送和接收消息;
Work模式,告訴我們如何多管齊下高效的消費消息;
Publish/Subscribe模式,告訴我們如何廣播消息
那麼有沒有靈活強一點的既可以高效消費,又可以同時送達多個消費者的模式?
有,這就是Routing模式,我又稱之為Direct直連模式。
Routing模式一個生產者P,一個交換機X,多個消息隊列Q以及多個消費者C
在Exchange和Queue中,我們看到了不同的規則,也就是Routing Key
顯然從圖中的說明,我們就知道這是一個log日誌根據級別派發消息的例子。熟悉Log日誌系統的應該都知道,一般的log系統分為error、info、warn和debug等。從圖中我們可以看出,將日誌級別為error的定向的派發到第一個消息隊列,將error、warn和info級別的日誌派發到第一個消息隊列。
該模型首先實現了定向派發,而不再是訂閱模式那種廣播式的派發。同一條消息既可以派發給一個Queue,也可以同時派發給兩個或者多個Queue,這就是該模式的靈活之處。下面來看看實例
發送端/**
* Created by jackie on 17/8/7.
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory;
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection;
Channel channel = connection.createChannel;
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent "" + severity + "":"" + message + """);
channel.close;
connection.close;
}
private static String getSeverity(String[] strings){
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings){
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0 ) return "";
if (length < startIndex ) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString;
}
}
String severity = getSeverity(argv);通過程序參數賦值給Routing Key,作為發送消息的規則
String message = getMessage(argv);通過程序參數賦值作為消息實體發送到Queue
在run configurations中配置argv
- 運行後,可以在RabbitMQ管理應用中看到exchange,但是此時沒有綁定queue,所以即使發送消息也沒有queue會存儲或者消費。
/**
* Created by jackie on 17/8/7.
*/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory;
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection;
Channel channel = connection.createChannel;
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare.getQueue;
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received "" + envelope.getRoutingKey + "":"" + message + """);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);表示使用的exchange類型為Direct類型
綁定的queue的名稱也是通過program arguments指定的
啟動接收端代碼,我們可以看到生成了Queue名稱為amq.gen-ugjKo6t4y0PXPwoh3CeubA的隊列,同時有routingKey=info和routingKey=error的綁定到了Exchange上。
這時候起送發送端給routingkey為info發送消息「hello world」,我們可以看到在接收端確實能夠收到消息「hello world」,同理,這時候發送routingkey為error的消息,該隊列同樣能夠接收到,因為隊列同時綁定了兩個routing key
這個就是Routing直連模式。
如果您覺得閱讀本文對您有幫助,請點一下「推薦」按鈕,您的「推薦」將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。
※設計模式之單例模式
※NET中解決KafKa多線程發送多主題的問題
※設計模式解密(18)- 原型模式
※Spring初始化ApplicationContext線程託管實際運用架構構思
TAG:科技優家 |
※rabbitMQ系列高級整合應用rabbitTemplate
※Spring Cloud Stream 使用延遲消息實現定時任務(RabbitMQ)
※Kafka、ActiveMQ、RabbitMQ、RocketMQ 區別以及高可用原理
※springboot與rabbitMQ實現延遲載入
※SpringBoot | 第十二章:RabbitMQ 的集成和使用
※rabbitMQ系列高級整合應用Spring AMQP
※RabbitMQ學習系列教程六:Fanout交換機的使用
※RabbitMQ實戰:運行和管理RabbitMQ
※RabbitMQ之消息確認機制(事務+Confirm)
※RabbitMQ學習系列教程五四:Topic的使用
※RabbitMQ系列教程 高級篇五 return消息機制
※RabbitMQ基本概念
※RabbitMQ在分散式系統中的應用
※ASP.NET Core Web API下事件驅動型架構的實現(三):基於RabbitMQ的事件匯流排
※RabbitMq運行原理淺析
※RabbitMQ 高級篇八 消費端ACK與重回隊列
※RabbitMQ實戰:插件介紹與系列總結
※RabbitMQ實戰:消息通信模式和最佳實踐
※深入理解消息中間件技術之RabbitMQ服務
※史上最透徹的 RabbitMQ 可靠消息傳輸實戰