消息类型及属性
消息的几种类型,包括TextMessage、StreamMessage、ObjectMessage、MapMessage、BytesMessage、BlobMessage。属性包括boolean、byte、double、float、int、long、object、short、string。下面看看在代码中是怎么发送和接收的。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; MessageProducer producer = null; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("test.message"); producer = session.createProducer(destination); sendProperty(session, producer); sendTextMessage(session, producer); sendStreamMessage(session, producer); sendObjectMessage(session, producer); sendMapMessage(session, producer); sendBytesMessage(session, producer); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
private static void sendBytesMessage(Session session, MessageProducer producer) throws JMSException { BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes("hello".getBytes()); producer.send(bytesMessage); }
private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name", "张三"); producer.send(mapMessage); }
private static void sendObjectMessage(Session session, MessageProducer producer) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(new Person("张三", 18)); producer.send(objectMessage); }
private static void sendStreamMessage(Session session, MessageProducer producer) throws JMSException { StreamMessage message = session.createStreamMessage(); message.writeString("test.stream"); producer.send(message); }
public static void sendProperty(Session session, MessageProducer producer) throws JMSException { Message message = session.createMessage(); message.setStringProperty("name", "张三"); producer.send(message); }
public static void sendTextMessage(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("this is test.message"); producer.send(textMessage); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); ((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.message"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (message.propertyExists("name")) { System.out.println("property receive:" + message.getStringProperty("name")); } if (message instanceof TextMessage) { System.out.println("TextMessage receive:" + ((TextMessage) message).getText()); } if (message instanceof StreamMessage) { System.out.println("StreamMessage receive:" + ((StreamMessage) message).readString()); } if (message instanceof ObjectMessage) { ObjectMessage objectMessage = (ObjectMessage) message; Person person = (Person) objectMessage.getObject(); System.out.println("ObjectMessage receive:" + person); } if (message instanceof MapMessage) { System.out.println("MapMessage receive:" + ((MapMessage) message).getString("name")); } if (message instanceof BytesMessage) { byte[] b = new byte[1024]; BytesMessage bytesMessage =(BytesMessage) message; System.out.println("BytesMessage receive:" +new String(b, 0, bytesMessage.readBytes(b))); } } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } finally {
} }
|
客户端收到消息结果如下:
1 2 3 4 5 6
| property receive:张三 TextMessage receive:this is test.message StreamMessage receive:test.stream ObjectMessage receive:Person{name='张三', age=18} MapMessage receive:张三 BytesMessage receive:hello
|
消息选择器
提供通过对消息头的消息进行过滤,使消费者对自己感兴趣的事件进行订阅,我们看看下面的例子:
生产者发送张三18岁,李四20岁,王五22岁的信息。消费者1接收名字是张三的信息,消费者2接收年龄大于18岁的信息,消费者3接收名字是李四、王五且年龄大于20岁的信息。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("test.selector"); producer = session.createProducer(destination); sendPerson1(session, producer); sendPerson2(session, producer); sendPerson3(session, producer); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
public static void sendPerson1(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("张三的个人信息"); textMessage.setStringProperty("name", "张三"); textMessage.setIntProperty("age", 18); producer.send(textMessage); }
public static void sendPerson2(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("李四的个人信息"); textMessage.setStringProperty("name", "李四"); textMessage.setIntProperty("age", 20); producer.send(textMessage); }
public static void sendPerson3(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage(); textMessage.setText("王五的个人信息"); textMessage.setStringProperty("name", "王五"); textMessage.setIntProperty("age", 22); producer.send(textMessage); }
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个消费者 String selector = "name='张三'"; consumer = session.createConsumer(destination, selector); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个消费者 String selector = "age>18"; consumer = session.createConsumer(destination, selector); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer2 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
消费者3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.selector"); // 创建一个消费者 String selector = "name in ('李四','王五') and age>20"; consumer = session.createConsumer(destination, selector); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer3 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
选择器有点跟sql语法一样。