在Rabbitmq中,消息发送给交换器,交换器根据一定的规则把消息发给队列,broker再把消息发送给消费者,或者发送至主动从队列拉去消息。前面几张讲了队列的相关东西,这篇看看交换器是如何把消息发送给队列的。
交换器
交换器接收消息并将其路由到零个或多个队列,它支持四种交换类型:Direct
、Fanout
、Topic
、Headers
。还还声明了一些属性,其中最重要的是:交换器的名称、交换器类型、是否持久化、是否自动删除、参数。
是否持久化,决定了rabbitmq重启后,交换器是否存在。是否自动删除,决定了当最后一个队列被解除绑定时,交换器是否被删除。
1 2 3 4 5 6
| Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
|
默认的交换器
默认的交换器名称为""
,即空字符串。当我们没有定义的时候,看起来就像消息直接发送到队列一样。
Direct
根据消息路由键将消息传递到队列。主要的步骤如下:
- 通过路由键K把队列绑定到交换器
- 当带有路由键R的新消息到达交换器时,如果K = R,交换器将其路由到队列
生产者代码,通过channel.basicPublish
把带有路由键(“images.archive”, “images.crop”, “images.resizer”)的消息发送给交换器images。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(Constant.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String[] routingKeys = {"images.archive", "images.crop", "images.resizer"}; for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes()); channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); } }
|
ArchiveRec1消费者,通过channel.queueBind
把交换器images、路由键images.archive、队列archive1绑定一起。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "archive1";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_NAME, "images.archive"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ArchiveRec1 Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
ArchiveRec1消费者,通过channel.queueBind
把交换器images、路由键images.archive、队列archive2绑定一起。除了队列名称,其他与上面代码雷同,就不贴了。
CropRec消费者,通过channel.queueBind
把交换器images、路由键images.crop、队列cropper绑定一起。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "cropper";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_NAME, "images.crop"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("cropper Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
运行ArchiveRec1、ArchiveRec2各一次,运行CropRec两次。启动消费者后,再运行生产者DirectExchange。
运行结果如下,ArchiveRec1和ArchiveRec2各消费2条数据,两个CropRec一共消费2条数据,说明同一个队列两个消费者,他们是轮询消费的。
ArchiveRec1
ArchiveRec2
CropRec
fanout
把消息发送给交换器所有的队列上,忽略的路由键的影响。也就是说,当多个队列跟这个交换器绑定的时候,交换器每收到一条消息,就会群发给这些队列。虽然Direct上面的例子中,也可以通过多个队列和路由键交换器绑定,达到部分群发的功能,但是fanout对于群发的功能还是更方便些。
照着direct的例子改一下,把交换器、队列名称改一下,再把交换类型改为fanout。
ArchiveRec1代码如下,ArchiveRec2雷同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "fanout_archive1";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_FANOUT_NAME, "images.archive"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ArchiveRec1 Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
CropRec代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "fanout_cropper";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_FANOUT_NAME, "images.crop"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("cropper Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
运行ArchiveRec1、ArchiveRec2各一次,运行CropRec两次。启动消费者后,再运行生产者FanoutExchange。
运行结果如下,ArchiveRec1和ArchiveRec2各消费6条数据,其他路由键的信息都收到了。两个CropRec一共消费6条数据,fanout_cropper队列也收到了6条信息,被两个消费者分别消费3条。
ArchiveRec1
ArchiveRec2
CropRec
Topic
通过通配符,来消费所要的消息。名称和activemq的发布订阅一样,但是特性和activemq的通配符差不多。
通配符有#
和*
。#
是匹配一个或多个,*
是匹配一个。
AllRec用来接收images开头所有的消息,ArchiveRec用来接收images.archive开头所有的消息,ARec用来接收images开头a结尾的消息,CropRec用来接收images.cropRec开头所有的消息。
生产者分布向这三个路由键images.archive.a,images.archive.b,images.crop.a发送消息
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(Constant.EXCHANGE_TOPIC_NAME, BuiltinExchangeType.TOPIC); String[] routingKeys = {"images.archive.a", "images.archive.b", "images.crop.a"}; for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(Constant.EXCHANGE_TOPIC_NAME, routingKeys[i], null, routingKeys[i].getBytes()); } System.out.println("Sent complete"); } }
|
AllRec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "all";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.#"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("AllRec Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
ArchiveRec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "archive";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.archive.*"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ArchiveRec Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
ARec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "a";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.*.a"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ARec Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
CropRec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final static String QUEUE_NAME = "crop";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.crop.*"); System.out.println("Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("CropRec Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
|
运行结果如下:
AllRec收到了全部消息
ArchiveRec收到了2条images.archive开头的消息
ARec收到了2条images开头a结尾的消息
CropRec收到了1条images.crop开头的消息