在消费端,当确认或者拒绝了消息后,rabbitmq才会把消息从消息里删除掉,在发送端,会有以下问题:
发送给不存在的交换器
发送给路由不到的队列
网络故障导致中途丢失
事务 确保消息不丢失的唯一方法是使用事务,将每个消息或一组消息发布、提交的信道设置为事务性的。 在rabbitmq中,加事务也比较简单,就是调用txSelect()开启事务,调用txCommit()提交事务,调用txRollback()回滚事务。下面的例子中,如果有一条信息异常,则整个都不能发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final static String QUEUE_NAME = "queue.transaction" ;public final static String EXCHANGE_NAME = "exchange.transaction" ;public final static String ROUTE_NAME = "route.transaction" ;public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false , false , null ); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME); channel.txSelect(); for (int i = 0 ; i < 3 ; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, null , "transaction" .getBytes()); } channel.txCommit(); } }
虽然可以确保消息不丢失,但是吞吐量降低了250倍。为了解决这个问题,我们还有其他方式。
mandatory Channel的basicPublish方法中,有个参数mandatory,当为true的时候,可以监听到不可路由的消息。
1 2 void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException ;
例子如下,消息发送给交换器为exchange.mandatory,路由为route.mandatory,但是没有队列绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final static String QUEUE_NAME = "queue.mandatory" ;public final static String EXCHANGE_NAME = "exchange.mandatory" ;public final static String ROUTE_NAME = "route.mandatory" ;public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory (); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false , false , null ); channel.addReturnListener(new ReturnListener () { @Override public void handleReturn (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("body:" + new String (body)); } }); channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true , null , "mandatory" .getBytes()); TimeUnit.SECONDS.sleep(10 ); } }
控制台消息如下: 如果路由不到队列的话,会发送到备用交换器,如果同时设置备用交换器和mandatory呢?这个要看备用交换器是否有可以路由的队列,如果没有,mandatory为true的时候,才会监听到消息无法路由。
发送方确认 mandatory只能监听到消息是否路由失败,如果有对应的队列,是否成功发送给队列是监听不到的,所以我们还需要发送方确认机制。 当信道设置为confirm模式的时候,每个消息都会一个唯一的ID,用于消息确认。在rabbitmq中,发送方调用confirmSelect()方法,rabbitmq收到后返回Confirm.SelectOk告知信道已经准备就绪接收发送方确认消息。事务和发送方模式不能共存,事务信道不能进入确认模式,一旦信道进入确认模式,就不能进行事务处理。 下面代码中同时用mandatory和发送方确认模式,如果没有对应的队列,则被监听到,如果有队列,但是没有收到确认,则发送不成功。注意,我们这边并没有消费者,发送是否成功,是以发送到队列为准。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static void main(String [] args) throws IOException , TimeoutException , InterruptedException { ConnectionFactory factory = new ConnectionFactory () ; try (Connection connection = factory.newConnection() ; Channel channel = connection.createChannel() ) { channel.confirmSelect() ; channel.exchangeDeclare(EXCHANGE_NAME , BuiltinExchangeType .DIRECT , false , false , null); channel.queueDeclare(QUEUE_NAME , false , false , false , null); channel.queueBind(QUEUE_NAME , EXCHANGE_NAME , ROUTE_NAME ); channel.addReturnListener(new ReturnListener () { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP .BasicProperties properties, byte[] body) throws IOException { System .out.println("replyCode:" + replyCode); System .out.println("replyText:" + replyText); System .out.println("exchange:" + exchange); System .out.println("routingKey:" + routingKey); System .out.println("body:" + new String (body)); } }); channel.basicPublish(EXCHANGE_NAME , ROUTE_NAME , true , null, "mandatory" .getBytes() ); if (channel.waitForConfirms() ) { System .out.println("发送成功" ); } else { System .out.println("发送失败" ); } TimeUnit .SECONDS .sleep(10 ); } }
批量确认 如果我们每次发送都要确认,就会影响到吞吐量,所以我们可以用批量确认,发送消息后调用waitForConfirmsOrDie()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public final static String QUEUE_NAME = "queue.confirm" ; public final static String EXCHANGE_NAME = "exchange.confirm" ; public final static String ROUTE_NAME = "route.confirm" ; public static void main(String [] args) throws IOException , TimeoutException , InterruptedException { ConnectionFactory factory = new ConnectionFactory () ; try (Connection connection = factory.newConnection() ; Channel channel = connection.createChannel() ) { channel.confirmSelect() ; channel.exchangeDeclare(EXCHANGE_NAME , BuiltinExchangeType .DIRECT , false , false , null); channel.queueDeclare(QUEUE_NAME , false , false , false , null); channel.queueBind(QUEUE_NAME , EXCHANGE_NAME , ROUTE_NAME ); channel.addReturnListener(new ReturnListener () { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP .BasicProperties properties, byte[] body) throws IOException { System .out.println("replyCode:" + replyCode); System .out.println("replyText:" + replyText); System .out.println("exchange:" + exchange); System .out.println("routingKey:" + routingKey); System .out.println("body:" + new String (body)); } }); for (int i=0 ;i<3 ;i++){ channel.basicPublish(EXCHANGE_NAME , ROUTE_NAME , true , null, "mandatory" .getBytes() ); } channel.waitForConfirmsOrDie() ; TimeUnit .SECONDS .sleep(10 ); } }
异步确认 不管是单个确认还是批量确认,都是同步的,虽然批量会相对提高吞吐量,但是还是有一定的影响,我们可以用异步的模式来确认。 异步就是采取监听的模式,调用Channel的addConfirmListener方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public final static String QUEUE_NAME = "queue.confirm" ;public final static String EXCHANGE_NAME = "exchange.confirm" ;public final static String ROUTE_NAME = "route.confirm" ;public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory (); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.confirmSelect(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false , false , null ); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME); channel.addConfirmListener(new ConfirmListener () { @Override public void handleAck (long deliveryTag, boolean multiple) throws IOException { System.out.println("handleAck:" +deliveryTag); } @Override public void handleNack (long deliveryTag, boolean multiple) throws IOException { System.out.println("handleNack:" +deliveryTag); } }); channel.addReturnListener(new ReturnListener () { @Override public void handleReturn (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("body:" + new String (body)); } }); for (int i=0 ;i<3 ;i++){ channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true , null , "mandatory" .getBytes()); } TimeUnit.SECONDS.sleep(10 ); } }