admin 管理员组文章数量: 1087652
Kafka 消费组位移
Kafka 消费组位移
- 消费者 API
- 命令行
Kafka : 基于日志结构(log-based)的消息引擎
- 消费消息时,只是从磁盘文件上读取数据,不会删除消息数据
- 位移数据能由消费者控制,能很容易修改位移的值,实现重复消费历史数据
重设位移的两个维度 :
- 位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值
- 时间维度 : 将位移调整成 > 该时间的最小位移
7 种重设策略 :
维度 | 策略 | 含义 |
---|---|---|
位移维度 | Earliest | 位移调整当前最早位移处 |
Latest | 位移调整到当前最新位移处 | |
Current | 位移调整到当前最新提交位移处 | |
Specified-Offset | 位移调整成指定位移 | |
Shift-By-N | 位移调整到当前位移 + N 处 | |
时间维度 | DataTime | 位移调整到大于给定时间的最小位移处 |
Duration | 位移调整到离当前时间指定间隔的位移处 |
Earliest 策略 : 将位移调整到主题当前最早位移处
- 不一定是 0,很久的消息会被 Kafka 自动删除,所以当前最早位移可能是 > 0 的值
- 当要重新消费主题的所有消息,就用 Earliest 策略
Latest 策略 : 把位移重设成最新末端位移
- 当向某个主题发送 15 条消息,最新末端位移是 15
- 当要跳过所有历史消息,从最新的消息处开始消费,就用 Latest 策略
Current 策略 : 将位移调整成消费者当前提交的最新位移
- 修改消费者代码,并重启消费者,结果发现代码有问题,回滚前的代码变更,还把位移重设为消费者重启时的位置
Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处
- 消费者处理某条错误消息时 , 能手动跳过此消息的处理
- 生产中 , 出现 corrupted 消息无法被消费时,消费者会抛出异常,无法继续工作
Specified-Offset 策略 : 指定位移的绝对数值
Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离
- 把位移重设成当前位移的前 100 条位移处,指定 N 为 -100
DateTime : 指定一个时间,将位移重置到该时间之后的最早位移处
- 使用场景 : 重新消费昨天的数据,并使用该策略重设位移到昨天 0 点
Duration 策略 : 相对的时间间隔,将位移调整到距离当前给定时间间隔的位移处,格式是 PnDTnHnMnS
- Java 8 引入 Duration : 以字母 P 开头,后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)
- 将位移调回到 15 分钟前,就指定
PT0H15M0S
消费者 API
Earliest 策略 :
Properties consumerProperties = new Properties();
// 禁止自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 重设的消费者组的组 ID
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic = "test"; // 要重设位移的 Kafka 主题
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo ->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
}
Latest 策略 :
consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
Current 策略 : 获取当前提交的最新位移 :
consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).forEach(tp -> {long committedOffset = consumermitted(tp).offset();consumer.seek(tp, committedOffset);
});
Specified-Offset 策略 :
long targetOffset = 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset);
}
实现 Shift-By-N 策略
for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset = consumermitted(tp).offset() + 123L; consumer.seek(tp, targetOffset);
}
DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点
long ts = LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> ts));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}
Duration 策略 : 将位移调回 30 分钟前
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}
命令行
Earliest 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-earliest –execute
Latest 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-latest --execute
Current 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-current --execute
Specified-Offset 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-offset <offset> --execute
Shift-By-N 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--shift-by <offset_N> --execute
DateTime 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--to-datetime 2019-06-20T20:00:00.000 --execute
Duration 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--by-duration PT0H30M0S --execute
本文标签: Kafka 消费组位移
版权声明:本文标题:Kafka 消费组位移 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/p/1700323918a396984.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论