单机版 Docker 搭建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_PORT: 9092 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
|
注意事项:
- 如果想要 java 客户端能够正常连接上 kafka, 需要配置宿主机的 host
1 2 3
| sudo vim /etc/hosts
172.20.10.6 kafka
|
- 如何使用 kafka 自带的 kafka-console-producer 测试发送消息?
1
| kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test
|
集群版 + kafka manager
kafka 集群 docker-compose
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092" KAFKA_LISTENERS: "PLAINTEXT://kafka1:9092" KAFKA_PORT: 9092 kafka2: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9093:9093" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9093" KAFKA_LISTENERS: "PLAINTEXT://kafka2:9093" KAFKA_PORT: 9093 kafka3: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9094:9094" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9094" KAFKA_LISTENERS: "PLAINTEXT://kafka3:9094" KAFKA_PORT: 9094
|
注意事项:
- 如果想要 java 客户端能够正常连接上 kafka, 需要配置宿主机的 host
1 2 3 4 5
| sudo vim /etc/hosts
172.20.10.6 kafka1 172.20.10.6 kafka2 172.20.10.6 kafka3
|
- 如何使用 kafka 自带的 kafka-console-producer 测试发送消息?这里假设是进入到 kafka3 容器中
1 2
| kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 3 --topic test2 kafka-console-producer.sh --bootstrap-server kafka3:9094 --topic test
|
kafka-manager docker-compose
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| version: "2" services: kafka-manager: image: kafkamanager/kafka-manager container_name: kafka-manager ports: - "9000:9000" external_links: - kafka_kafka1_1 - kafka_kafka2_1 - kafka_kafka3_1 environment: ZK_HOSTS: kafka_zookeeper_1:2181 networks: default: external: name: kafka_default
|
注意事项
kafka-manager、与 kafka 集群不在同一个 compose 中。因此这里需要使用 networks 连接到 kafka 集群的网络中
基本操作
以下均在 docker 内操作
- 创建主题
创建了 1 个副本 1 个分区的主题
1
| kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test2
|
- 查看主题
1
| kafka-topics.sh --zookeeper zookeeper:2181 --list
|
- 查看主题详情
1
| kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test2
|
- 发送消息
1
| kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test2
|
- 消费消息
1
| kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning
|
springboot 连接 kafka
maven
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
</dependencies>
|
yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| server: port: 9009 spring:
kafka: bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer group-id: goup1
|
生产者
1 2 3
| @Autowired private KafkaTemplate<String,Object> kafkaTemplate; kafkaTemplate.send("test2","qweqwe");
|
消费者
1 2 3 4 5
| @KafkaListener(topics = "test2") public void onMsg(String msg) { log.error("kafka {}" ,msg); System.out.println(msg); }
|