在上一章中,我们完善了我们的日志系统,用direct交换器替换了fanout交换器,使得我们可以有选择性地接收消息。尽管如此,仍然还有限制:不能基于多个标准进行路由。在我们的日志系统中,我们可能不仅希望根据日志等级订阅日志,还希望根据日志来源订阅日志。这个概念来自于unix工具syslog,它不仅可以根据日志等级(info/warn/crit...)来路由日志,同时还可以根据设备(auth/cron/kern...)来路由日志。这将更加灵活,我们可能希望只监听来自'cron'的error级别日志,同时又要接收来自'kern'的所有级别的日志。我们的日志系统如果要实现这个功能,就需要使用到另外一种交换器:主题交换器(Topic Exchange)。
1、主题交换器(Topic Exchange)
发送到主题交换器的消息不能有任意的routing key,必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。比如以下是几个有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的单词可以有很多,最大限制是255 bytes。
binding key必须与routing key模式一样。Topic交换器的逻辑与direct交换器有点相似:使用特定路由键发送的消息将被发送到所有使用匹配绑定键绑定的队列,然而,绑定键有两个特殊的情况,如下:
- "*" 表示匹配任意一个单词
- "#" 表示匹配任意一个或多个单词
下图很好地表示这这两个通配符的用法:
在这个例子中,我们将发送所有跟动物有关的消息,这些消息将会发送到由三个单词,两个点号组成的routing key,第一个单词了表示的是速度,第二个单词表示颜色,第三个单词表示种类:
"<speed>.<colour>.<species>"。
我们创建三个绑定关系:队列Q1绑定到绑定键*.orange.* ,队列Q2绑定到*.*.rabbit和lazy.#。
总结下来就是:
- 队列Q1对橘黄色(orange)颜色的所有动物感兴趣;
- 队列Q2对所有的兔子(rabbit)和所有慢吞吞(lazy)的动物感兴趣。
一个路由为 "quick.orange.rabbit"的消息,将会被转发到这两个队列,路由为"lazy.orange.elephant"的消息也被转发给这两个队列,路由为 "quick.orange.fox"的消息将只被转发到Q1队列,路由为 "lazy.brown.fox"的消息将只被转发到Q2队列。"lazy.pink.rabbit" 只被转发到Q2队列一次(虽然它匹配绑定键*.*.rabbit和lazy.#),路由为 "quick.brown.fox"的消息与任何一个绑定键都不匹配,因此将会被丢弃。
如果我们发送的消息的的路由是由一个单词“orangle"或4个单词”quick.orangle.male.rabbit“将会怎样?会因为与任何一个绑定键不匹配而被丢弃。
另一方面,路由为 "lazy.orange.male.rabbit"的消息,因为匹配"lazy.#"绑定键,因而会被转发到Q2队列。
Topic交换器非常强大,可以像其他类型的交换器一样工作:
当一个队列的绑定键是"#"是,它将会接收所有的消息,而不再考虑所接收消息的路由键,就像是fanout交换器一样;
当一个队列的绑定键没有用到”#“和”*“时,它又像direct交换一样工作。
2、完整的代码
下面是在我们日志系统中采用Topic交换器的完整代码,我们要发送的日志消息的路由由两个单词组成:"<facility>.<severity>"。
EmitLogTopic.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private final static String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost);
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "A critical kernel error";
String routingKey = "kern.critical";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
ReceiveLogsTopic.java
import com.rabbitmq.client.*;
public class ReceiveLogsTopic {
private final static String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
if (args.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : args) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
启动4个接收者,分别传入绑定键:#、kern.、.critical、kern.* *.critical。
启动生产者:发送一条路由为“kern.critical”的消息,消息内容为:“A critical kernel error”,分别查看接收情况:
可以看到,所有绑定键的队列都正常接收到了消息。
3、SpringBoot实现
工程如下图:
一、生产者
application.properties
#RabbitMq
spring.rabbitmq.host=localhost
rabbitmq.exchange.topic=topic_logs2
rabbitmq.exchange.topic.routing.key=kern.critical
EmitLogTopic.java
1 import org.springframework.amqp.core.AmqpTemplate;
2 import org.springframework.beans.factory.annotation.Autowired;
3 import org.springframework.beans.factory.annotation.Value;
4 import org.springframework.stereotype.Component;
5
6 @Component
7 public class EmitLogTopic {
8
9 @Value("${rabbitmq.exchange.topic}")
10 private String exchangeName;
11
12 @Value("${rabbitmq.exchange.topic.routing.key}")
13 private String routingKey;
14
15 @Autowired
16 private AmqpTemplate template;
17
18 public void sendMessage(Object message) {
19 System.out.println("发送消息:" + message);
20 template.convertAndSend(exchangeName,routingKey,message);
21 }
22 }
EmitLogTopicRunner.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class EmitLogTopicRunner implements ApplicationRunner {
@Autowired
private EmitLogTopic emitLogTopic;
@Override
public void run(ApplicationArguments args) throws Exception {
emitLogTopic.sendMessage("A critical kernel error");
}
}
二、消费者
application.properties
#RabbitMq
spring.rabbitmq.host=localhost
rabbitmq.exchange.topic=topic_logs2
rabbitmq.topic.queue=topic_queue
rabbitmq.exchange.topic.binding.key=kern.critical
server.port=8081
ReceiveLogsTopic.java
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${rabbitmq.topic.queue}",autoDelete = "true"),
exchange = @Exchange(value = "${rabbitmq.exchange.topic}",type = ExchangeTypes.TOPIC),
key = {"#","kern.*","*.critical"}
)
)
public class ReceiveLogsTopic {
@RabbitHandler
public void reeive(Object message) {
System.out.println("接收到消息:" + message);
}
}
启动查看控制台输出:
生者产输出:
消费者输出: