Kafka 生产者客户端基本使用

必要的参数

  • bootstrap.servers

该参数为 broker 地址,不需要全部都填,因为 kafka 会从当前 broker 中获取其他 broker 信息。不过为了某个 broker 挂掉,一般填多个 broker 地址

  • key.serializer
阅读更多

Kafka

生产者发送消息的整体流程

img

消息追加器 RecordAccumulator

前面几个组件,在 3.1 的文章中,已经说清楚。现在来看 RecordAccumulator 组件

阅读更多

Kafka 基本概念

kafka 应用场景

  1. 应用监控
  2. 网站用户行为追踪
  3. 流数据
  4. 持久性日志

基本概念

在说基本概念前,先看一下 kafka 的系统架构
image.png

阅读更多

Kafka 搭建与实践

单机版 Docker 搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '2' 
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
阅读更多

ActiveMQ 延迟队列

activemq5.4版本后,可以在xml配置文件中,增加一个broker的schedulerSupport为true的属性,就可以启用延时和定时的消息投递。
阅读更多

ActiveMQ 死信队列

参考官网
当消息已过期或者无法再投递的时候,就会移动到死信队列(Dead Letter Queue)。
在以下情况,消息会重新发送到客户端:

  • 使用事务的时候,调用了session的rollback方法
  • 使用事务的时候,在会话关闭后再commit
  • 使用CLIENT_ACKNOWLEDGE消息确认机制的时候,调用session的recover方法
  • 客户端连接超时
阅读更多

ActiveMQ 虚拟 topics

当我们需要广播的时候,我们采用发布订阅模式。生产者往broker发送1,2,3,4,5,6,消费者1和消费者2都收到1,2,3,4,5,6。
image.png
当我们需要队列的时候,我们采用点对点模式,点对点模式中如果为了加快消息消费,我们就加多个消费者。生产者往broker发送1,2,3,4,5,6,消费者1收到1,3,5,消费者2收到2,4,6。
image.png
现在有个场景是这样的,在发布订阅模式中,由于生产者产生消息的速度超过了消费者1消费的速度,使消息一直积压,于是就想能不能既用发布订阅模式分发消息又用点对点模式加快消息消费呢?
image.png
activemq提供了一个虚拟topics的功能,首先元素的开头必须是VirtualTopic.开头的,消费端的格式是Consumer.<consumer name>.VirtualTopic.<VirtualTopicName>. ,比如生产者的是VirtualTopic.topic,那消费者A是Consumer.a.VirtualTopic.topic,a表示消费端组的名称,名字一样的话,会类似ptp一样消费。

阅读更多

ActiveMQ 通配符

消费者通过消息选择器来获取自己想要的消息,我们也可以通过另外通配符的方式,来获取自己想要的消息,当然两种还是有区别的。
通配符的方式有三种:

  • “.”:用于分隔路径名字
  • “*”:用于匹配路径的任何名字
  • “>”:用于匹配末尾的名称

示例

消费者1消费a.b.c的消息,消费者2消费a.*.c的消息,所以中间是什么他不在乎,消费者3消费a.开头的消息,所以a.后面是什么他不在乎。生产者往a.b.c和a.b.c发送数据。

阅读更多

ActiveMQ 发送多个消息到 Destination

有这样的一个业务场景,生产者发送消息后,一个以点对点的模式接收,一个已发布订阅的模式接收。当然我们可以做,生产者发送两个Destination,一个是点对点模式,一个是发布订阅模式。这样比较麻烦,activemq也提供了他的方法来处理这样的场景:

示例

生产者的主要代码如下:

1
session.createQueue("test.ptp,topic://test.topic");
阅读更多