RocketMQ 安装

安装配置 jdk8

1. 上传jdk压缩文件

将文件jdk-8u212-linux-x64.tar.gz上传到 /root 目录
jdk

2. 解压缩

执行解压命令

1
2
3
4
5
6
# 将jdk解压到 /usr/local/ 目录
tar -xf jdk-8u212-linux-x64.tar.gz -C /usr/local/

# 切换到 /usr/local/ 目录, 显示列表, 查看解压缩的jdk目录
cd /usr/local
ll
阅读更多

Kafka 日志索引

kafka 的索引文件以稀疏索引的方式构造消息的索引,每个 segmentfault 文件,对应 2 个索引文件。偏移量索引文件(xx.index)用于建立消息偏移量到物理地址之间的映射关系;时间戳索引文件(xx.timeindex)根据指定的时间戳查找对应的偏移量信息。

.index、.timeindex 均保持严格单调递增,在查找时,都使用二分查找法,如果查不到,均返回比查找值要小的最大值。

日志切分

当日志分段文件满足以下几个条件任意之一,便会切分索引文件

阅读更多

Kafka 消费者分区策略

kafka 允许通过配置 partition.assignment.strategy 来改变消费组的分区策略。
kafka 提供了以下几个分区策略

  • RangeAssignor
  • RoundRobinAssignor
  • StickyAssignor

默认使用的是 RangeAssignor

同时,kafka 也允许我们自定义分区策略,只需要继承 AbstractPartitionAssignor 抽象类即可。

阅读更多

Kafka 消费者基本介绍

1、消费者食用DEMO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.err.println(record.toString());
}
}
阅读更多

Kafka 生产者客户端基本使用

必要的参数

  • bootstrap.servers

该参数为 broker 地址,不需要全部都填,因为 kafka 会从当前 broker 中获取其他 broker 信息。不过为了某个 broker 挂掉,一般填多个 broker 地址

  • key.serializer
阅读更多

Kafka

生产者发送消息的整体流程

img

消息追加器 RecordAccumulator

前面几个组件,在 3.1 的文章中,已经说清楚。现在来看 RecordAccumulator 组件

阅读更多

Kafka 基本概念

kafka 应用场景

  1. 应用监控
  2. 网站用户行为追踪
  3. 流数据
  4. 持久性日志

基本概念

在说基本概念前,先看一下 kafka 的系统架构
image.png

阅读更多

Kafka 搭建与实践

单机版 Docker 搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '2' 
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
阅读更多