Kafka 搭建与实践

单机版 Docker 搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '2' 
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

注意事项:

  • 如果想要 java 客户端能够正常连接上 kafka, 需要配置宿主机的 host
1
2
3
sudo vim /etc/hosts

172.20.10.6 kafka
  • 如何使用 kafka 自带的 kafka-console-producer 测试发送消息?
1
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test

集群版 + kafka manager

kafka 集群 docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka1:
restart: always
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_PORT: 9092
kafka2:
restart: always
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9093"
KAFKA_LISTENERS: "PLAINTEXT://kafka2:9093"
KAFKA_PORT: 9093
kafka3:
restart: always
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9094"
KAFKA_LISTENERS: "PLAINTEXT://kafka3:9094"
KAFKA_PORT: 9094

注意事项:

  • 如果想要 java 客户端能够正常连接上 kafka, 需要配置宿主机的 host
1
2
3
4
5
sudo vim /etc/hosts

172.20.10.6 kafka1
172.20.10.6 kafka2
172.20.10.6 kafka3
  • 如何使用 kafka 自带的 kafka-console-producer 测试发送消息?这里假设是进入到 kafka3 容器中
1
2
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 3 --topic test2
kafka-console-producer.sh --bootstrap-server kafka3:9094 --topic test

kafka-manager docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
version: "2"
services:
kafka-manager:
image: kafkamanager/kafka-manager
container_name: kafka-manager
ports:
- "9000:9000"
external_links: # 连接本compose文件以外的container
- kafka_kafka1_1
- kafka_kafka2_1
- kafka_kafka3_1
environment:
ZK_HOSTS: kafka_zookeeper_1:2181
networks:
default:
external:
name: kafka_default

注意事项
kafka-manager、与 kafka 集群不在同一个 compose 中。因此这里需要使用 networks 连接到 kafka 集群的网络中

基本操作

以下均在 docker 内操作

1
cd /opt/kafka/bin
  1. 创建主题

创建了 1 个副本 1 个分区的主题

1
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test2
  1. 查看主题
1
kafka-topics.sh  --zookeeper zookeeper:2181 --list
  1. 查看主题详情
1
kafka-topics.sh  --zookeeper zookeeper:2181  --describe  --topic test2
  1. 发送消息
1
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test2
  1. 消费消息
1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning

springboot 连接 kafka

maven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.10.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

</dependencies>

yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 9009
spring:

kafka:
bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
group-id: goup1 # 消费组

生产者

1
2
3
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
kafkaTemplate.send("test2","qweqwe");

消费者

1
2
3
4
5
@KafkaListener(topics = "test2")
public void onMsg(String msg) {
log.error("kafka {}" ,msg);
System.out.println(msg);
}