当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换器来实现。
大概原理如下,如下图所示,消息发送给交换器,交换器发现没有可路由的队列,于是消息发给备用交换器,备用交换器再发给队列2,由队列2的消费者来处理消息。
示例
交换器的定义,需要一个参数,可以通过参数的方式,来指定备用交换器。参数的key是
alternate-exchange
,value是交换器的名称。通常备用交换器的类型是fanout
。
生产者中,定义一个交换器和备用交换器,此时并没有响应路由的队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare("back.exchange", BuiltinExchangeType.FANOUT); Map<String, Object> map = new HashMap<>(); map.put("alternate-exchange", "back.exchange"); channel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT, false, false, map); String[] routingKeys = {"exchange.no.found"}; for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish("exchange", routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); } }
|
消费者,订阅的是备用交换器信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "back.queue";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, "back.exchange", ""); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("back.queue Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
运行结果如下,收到了备用交换器发送来的消息