Spring Cloud Stream 使用细节

自定义消息通道

上篇文章我们提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,来定义一个自己的消息通道。

首先我们定义一个接口叫做MySink,如下:

1
2
3
4
5
6
public interface MySink {
String INPUT = "mychannel";

@Input(INPUT)
SubscribableChannel input();
}

这里我们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。然后,我们再定义一个名为MySource的接口,如下:

1
2
3
4
public interface MySource {
@Output(MySink.INPUT)
MessageChannel output();
}

@Output注解中描述了消息通道的名称,还是mychannel,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。

最后我们定义一个消息接收类,如下:

1
2
3
4
5
6
7
8
9
@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

@StreamListener(MySink.INPUT)
public void receive(Object playload) {
logger.info("Received:" + playload);
}
}

OK,我们在这里绑定消息通道,然后监听自定义的消息通道,最后来一个单元测试测试一下,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StreamHelloApplication.class)
@EnableBinding(MySource.class)
public class StreamHelloApplicationTests {

@Autowired
private MySource mySource;

@Test
public void contextLoads() {
mySource.output().send(MessageBuilder.withPayload("hello 123").build());
}
}

运行单元测试,我们可以看到如下日志,表示消息发送成功了:

图片

如果想要发送对象也可以直接发送,不用进行对象转换,如下:

发送:

1
2
Book book = new Book(1l, "三国演义", "罗贯中");
mySource.output().send(MessageBuilder.withPayload(book).build());

接收:

1
2
3
4
@StreamListener(MySink.INPUT)
public void receive(Book playload) {
logger.info("Received:" + playload);
}

如果我们想要在接收成功后给一个回执,也是OK的,如下:

1
2
3
4
5
6
@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定义回执发送的消息通道
public String receive(Book playload) {
logger.info("Received:" + playload);
return "receive msg :" + playload;
}

方法的返回值就是回执消息,回执消息在系统默认的output通道中,我们如果想要接收这个消息,当然就要监听这个通道,如下:

1
2
3
4
@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
System.out.println("msg:"+msg);
}

当然要记得Source类也要在@EnableBinding注解中进行绑定。此时运行结果如下:

图片

消费组

由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。方式很简单,给项目配置消息组和主题,如下:

1
2
spring.cloud.stream.bindings.mychannel.group=g1
spring.cloud.stream.bindings.mychannel.destination=dest1

这里我们设置该工程都属于g1消费组,输入通道的主题名则为dest1。这里配置完成之后,我们在消息发送方做如下配置:

1
spring.cloud.stream.bindings.mychannel.destination=dest1

也配置消息主题名为dest1(如果发送和接收就在同一个应用中,则这里可以不配置)。OK,此时我们将我们的项目启动两个实例,注意两个实例的端口不一样,此时如果我们再发送消息,则只会被两个实例中的一个接收到,另外一个应用则接收不到,但是到底是两个实例中的哪一个接收,则是不确定的。

消息分区

有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,如果我们只是单纯的使用消费组则无法实现功能,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了,配置方式如下(这里的配置都是在消费组的配置基础上完成的):

在消费者上添加如下配置:

1
2
3
spring.cloud.stream.bindings.mychannel.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0

关于这个配置我说三点:

1.第一行表示开启消息分区
2.第二行表示当前消息者的总的实例个数
3.第三行表示当前实例的索引,从0开始,当我们启动多个实例时,需要在启动时在命令行配置索引

然后在消息生产者上添加如下配置:

1
2
spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.mychannel.producer.partitionCount=2

第一行配置设置了分区键的表达式规则,第二行则设置了消息分区数量。

OK,此时我们再次启动多个消费者实例,然后重复发送多条消息,这些消息都将被同一个消费者处理掉。