admin 管理员组

文章数量: 1086019

中间件

中间件-消息队列

英文全称:message queue

将 MQ 掰开了揉碎了来看,就是「一发一存一消费」,再直白点就是一个「转发器」。生产者先将消息投递到「队列」中,然后再从这个「队列」中取出消息,最后再转发给消费者。

可以在同一进程内调用,例如

MQ与RPC异同:

1、引入 MQ 后,由之前的一次 RPC 变成了现在的两次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在。

2、多了一个中间节点「队列」进行消息转储,相当于将同步变成了异步。

应用场景

电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。

引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。

改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再扩展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,降低了维护成本。

这个改造还带来了另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另一个典型应用场景:异步通信。

除此以外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为 “漏斗” 进行限流保护,即所谓的流量削峰。

我们还可以利用队列本身的顺序性,来满足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……

异步通信解耦

限流削峰

数据收集

流处理spark,批处理haddop

应用场景
异步通信解耦
限流削峰
延迟通知
最终一致性保证
顺序消息
流式处理

消息模型

Queue 队列模式

一对一,它允许多个生产者往同一个队列发送消息。但是,如果有多个消费者,实际上是竞争的关系,也就是一条消息只能被其中一个消费者接收到,读完即被删除。

它是一个严格意义上的Queue队列。消息按照什么顺序写进去,就按照什么顺序读出来,队列读这个操作就是出队,从队头中 “删除” 这个消息。即消费者获得数据后会删除这个消息。

publish/subscribe 发布/订阅模式

一对多,生产者发布数据到topic中,同时多个消费者订阅topic,此时订阅了的消费者都可以消费。

生产者就是发布者,队列就是主题,消费者就是订阅者

发布publish/订阅subscribe特点优点缺点
pull消费者主动拉取队列消息消费速度完全由消费者自己决定消费者需要常轮询导致性能开销
push队列主动推送消息给消费者每个消费者的消费速度能力不同

常见协议

JMS

JMS:Java Messaging Service(Java消息服务)。

是Java平台上有关MOM(Message OrientedMiddleware,面向消息的中间件 PO/OO/AO)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。ActiveMQ是该协议的典型实现。

STOMP

STOMP:Streaming Text Orientated Message Protocol(面向流文本的消息协议)。

是一种MOM设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。ActiveMQ是该协议的典型实现,RabbitMQ通过插件可以支持该协议。

AMQP

AMQP:Advanced Message Queuing Protocol(高级消息队列协议)。

一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种MOM设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。 RabbitMQ是该协议的典型实现。

MQTT

MQTT:Message Queuing Telemetry Transport(消息队列遥测传输)。

是IBM开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗IoT(物联网)设备间的通信。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。 RabbitMQ通过插件可以支持该协议。

架构

Producer生产者

MQ 的客户端之一,调用 Broker 提供的 RPC 接口发送消息。

Broker服务端

MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护等。

Consumer消费者

MQ 的另外一个客户端,调用 Broker 提供的 RPC 接口接收消息,同时完成消费确认。

技术难点

难点1:RPC 通信解决的是 Broker 与 Producer 以及 Consumer 之间的通信问题。

如果不重复造轮子,直接利用成熟的 RPC 框架 Dubbo 或者 Thrift 实现即可,这样不需要考虑服务注册与发现、负载均衡、通信协议、序列化方式等一系列问题了。当然,你也可以基于 Netty 来做底层通信,用 Zookeeper、Euraka 等来做注册中心,然后自定义一套新的通信协议(类似 Kafka),也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)。对比直接用 RPC 框架,这种方案的定制化能力和优化空间更大。

难点2:高可用设计高可用主要涉及两方面:Broker 服务的高可用、存储方案的高可用。可以拆开讨论。Broker 服务的高可用,只需要保证 Broker 可水平扩展进行集群部署即可,进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费消息时的 ack 机制来保证。存储方案的高可用有两个思路:1)参考 Kafka 的分区 + 多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 Zab、Raft等协议),并实现自动故障转移;2)还可以用主流的 DB、分布式文件系统、带持久化能力的 KV 系统,它们都有自己的高可用方案。

难点3:存储设计消息的存储方案是 MQ 的核心部分,可靠性保证已经在高可用设计中谈过了,可靠性要求不高的话直接用内存或者分布式缓存也可以。这里重点说一下存储的高性能如何保证?这个问题的决定因素在于存储结构的设计。目前主流的方案是:追加写日志文件(数据部分) + 索引文件的方式(很多主流的开源 MQ 都是这种方式),索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳转表、二份查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能。如果不追求很高的性能,也可以考虑现成的分布式文件系统、KV 存储或者数据库方案。

难点4:消费关系管理为了支持发布-订阅的广播模式,Broker 需要知道每个主题都有哪些 Consumer 订阅了,基于这个关系进行消息投递。由于 Broker 是集群部署的,所以消费关系通常维护在公共存储上,可以基于 Zookeeper、Apollo 等配置中心来管理以及进行变更通知。

难点5:高性能设计存储的高性能前面已经谈过了,当然还可以从其他方面进一步优化性能。比如 Reactor 网络 IO 模型、业务线程池的设计、生产端的批量发送、Broker 端的异步刷盘、消费端的批量拉取等等。

手写MQ

1、需要从功能性需求(收发消息)和非功能性需求(高性能、高可用、高扩展等)两方面入手。

2、功能性需求不是重点,能覆盖 MQ 最基础的功能即可,至于延时消息、事务消息、重试队列等高级特性只是锦上添花的东西。

3、最核心的是:能结合功能性需求,理清楚整体的数据流,然后顺着这个思路去考虑非功能性的诉求如何满足,这才是技术难点所在。

需要解决的问题

1、高并发场景下,如何保证收发消息的性能?

2、如何保证消息服务的高可用和高可靠?

3、如何保证服务是可以水平任意扩展的?

4、如何保证消息存储也是水平可扩展的?

5、各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?

数据一致性----分布式事务

6、消息重复消费、消息丢失、消息的顺序消费等等

三高:高并发,高性能,高扩展。

关键词ActiveMQRabbitMQKafkaRocketMQ
开发语言JavaErLangJavaJava
单机吞吐量万级万级十万级十万级
Topic--百级Topic时会影响系统吞吐量千级Topic时会影响系统吞吐
社区活跃度

RocketMQ

异步:生产者与消费者异步去处理数据,加快系统的响应速度和吞吐量

解耦:服务之间的耦合,提高系统的可靠性和稳定性

削峰:突发的流量冲击

RocketMQ存储文件

存储文件
commitLog消息存储目录
consumerQueue消息消费队列存储目录
config配置文件目录XXX.json
index消息索引存储目录key的hash存储目录
abort如果存在改文件则Broker非正常关闭
checkpoint文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。

commitLog 消息存储目录

存消息本体文件。

所有消息顺序存储在一个文件,文件名称为20个0,默认大小为1G。

consumerQueue 队列存储目录

topic名称目录

0队列目录 message索引文件

1队列目录 message索引文件

2队列目录 message索引文件

3队列目录 message索引文件

message索引文件:8偏移量+4消息大小+8tag

例子:从xxx_topic中消费q1的第二条消息。

topic名称目录 ->

1 队列目录 ->

message索引文件 ->

20bytes * (2-1) ~ 20bytes * 2 ->

8偏移量+4消息大小+8tag ->

commitLog消息本体文件【偏移量,偏移量+消息大小】找到该条message消息,然后通过tag进行消息过滤。

分布式事务

  1. 生产者发送half半事务消息到MQ服务端,并得到成功的响应。
  2. 生产者执行本地事务
  3. 60s后MQ服务端收到 成功则Commit推送到B系统,失败则Rollback丢弃,如果是未知状态UNKNOW,则对生产者进行事务回查。
  4. 成功则Commit推送到B系统,失败则Rollback丢弃,如果是未知状态UNKNOW,则60s后对生产者进行事务回查,循环往复。
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//step0.创建自定义的事务监听器TransactionListener transactionListener = new TransactionListenerImpl();//step1.创建消息生产者producer,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("TRANS_GROUP");//生产者回查线程producer.setExecutorService(executorService);//监听器producer.setTransactionListener(transactionListener);//step2.指定Nameserver地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//step3.启动消息生产者producer.start();String[] tags = new String[]{"TagA", "TagB", "TagC"};for (int i = 0; i < 3; i++) {try {//step4.创建消息对象,指定主题Topic、Tag和消息体Message msg = new Message("RMQ_SYS_TRANS_HALF_TOPIC", tags[i],	   "KEY".getBytes(RemotingHelper.DEFAULT_CHARSET));//step5:半事务发送SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);} catch (MQClientException | UnsupportedEncodingException e) {//step6:半事务发送失败。回滚rollbacke.printStackTrace();}}//step6:半事务发送成功。业务逻辑;for (int i = 0; i < 3; i++) {}//step7.关闭生产者producerproducer.shutdown();}
}
public class TransactionListenerImpl implements TransactionListener {/*** 执行本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {if (StringUtils.equals("TagA", msg.getTags())) {// 本地事务成功return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("TagB", msg.getTags())) {// 本地事务失败return LocalTransactionState.ROLLBACK_MESSAGE;} else {// 本地事务还在处理中return LocalTransactionState.UNKNOW;}}/*** 事务回查* 默认是60s检查一次*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {if (StringUtils.equals("TagA", msg.getTags())) {// 执行本地事务成功return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TagB", msg.getTags())) {// 执行本地事务失败return LocalTransactionState.ROLLBACK_MESSAGE;} else {// 执行本地事务中return LocalTransactionState.UNKNOW;}}
}

使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

最终一致性

1)生产者要保证100%的消息投递。事务消息机制。

2)消费者要保证幂等消费。全局唯一ID。

RocketMQ架构

启动流程:

RocketMQ服务端由两部分组成NameServer和Broker,NameServer是服务的注册中心,Broker会把自己的地址注册到NameServer,生产者和消费者启动的时候会先从NameServer获取Broker的地址,再去从Broker发送和接受消息。

Producer生产者

用途:生产并发送消息至Broker。

public class DefaultMQProducerImpl implements MQProducerInner {public void start(boolean startFactory) throws MQClientException {switch(this.serviceState) {// 第一次创建case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查生产组。1)生产组不能为null。2)生产组不能与系统冲突this.checkConfig();// 更改当前instanceName为进程ID。if (!this.defaultMQProducer.getProducerGroup().equals("CLIENT_INNER_PRODUCER")) {this.defaultMQProducer.changeInstanceNameToPID();}//客户端ID=ip@实例名@unitName// 根据客户端ID获取MQ客户端实例。this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, this.rpcHook);boolean registerOK = this.mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("/"), (Throwable)null);} else {this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {this.mQClientFactory.start();}this.log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;}default:this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();return;// 执行完毕case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("/"), (Throwable)null);}}}

Consumer消费者

用途:从Broker拉取消息进行消费,消费完进行ack。

NameServer服务注册与发现

用途:Broker的管理者。

Broker主机

用途:消息的管理。收发消息、持久化消息等。

public class BrokerStartup {public static BrokerController createBrokerController(String[] args) {// 配置信息// 根据配置信息创建controllerfinal BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// controller初始化boolean initResult = controller.initialize();}
}
public class BrokerController {public boolean initialize() throws CloneNotSupportedException {// 加载config目录下的json文件// 存储组件启动(零拷贝技术之MMAP)DefaultMessageStoreresult = result && this.messageStore.load();}}//BrokerController加载config目录下的json文件,区分是主题topic,偏移量,订阅组等,生成ConcurrentMap哈希表
private final ConcurrentMap<String, TopicConfig> topicConfigTable =new ConcurrentHashMap<String, TopicConfig>(1024);public class DefaultMessageStore implements MessageStore {// load Commit Log result = result && thismitLog.load();// load Consume Queueresult = result && this.loadConsumeQueue();
}public class MappedFileQueue {// 写时复制容器存储private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
}public class MappedFile extends ReferenceResource {private void init(final String fileName, final int fileSize) throws IOException {//文件通道fileChannelthis.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();// fileChannel.map方法得到mappedByteBuffer(零拷贝技术之内存映射MMAP)// 将一个文件从偏移量开始的size区域,映射在内存中this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);}}
消息写入流程

SendMessageProcessor#processRequest收到消息

DefaultMessageStore#asyncPutMessage判断消息是批量还是非批量

CommitLog#asyncPutMessage

MappedFile#appendMessage

CompletableFuture异步调用

用途:提升同步双写数倍性能

启动一个线程来处理接收到的数据,不会阻塞主线程

Commitlog文件写入

生产者有多个,但是Commitlog只有一个,为了保护数据安全性只能上锁。使用lock

lock默认是false自旋锁。

同步刷盘使用true可重入锁,异步刷盘使用false自旋锁。因为同步刷盘锁的竞争很激烈,使用自旋锁会导致一直在空轮询。

AtomicBoolean。

可重入锁

有可能阻塞线程,即上下文切换。

自旋锁

不会有上下文切换。

Do {

}

While()

提升文件读写性能

一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

  • read:读取本地文件内容;
  • write:将读取的内容通过网络发送出去。

零拷贝

零拷贝:Zero-copy。不是不需要拷贝,是指减少cpu的拷贝。因为CPU拷贝是最耗时的。

计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。

零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率。

零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上下文切换而带来的开销

MMAP 内存映射

MMAP是零拷贝技术的一种实现。直接将文件从硬盘拷贝到虚拟内存。比传统的网络传输,性能提升了近一倍。

传统方式:4次拷贝=2次cpu + 2次DMA

MMAP:3次拷贝=1次cpu + 2次DMA

MappedByteBuffer使用限制:

一次只能映射1.5~2G 的文件,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了。

刷盘

刷盘:消息从Broker写入到磁盘。

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:

flushDiskType:SYNC_FLUSH, ASYNC_FLUSH

同步刷盘SYNC_FLUSH

在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。

优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致

缺点:性能比异步的低

异步刷盘ASYNC_FLUSH

生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式。

优点:性能高

缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

高可用集群

Broker Master主节点

Master-Slave对应关系通过指定相同BrokerName。BrokerId=0代表Master。

Broker Slave从节点

Master-Slave对应关系通过指定相同BrokerName。BrokerId!=0代表Slave。

工作流程

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。

不建议线上环境使用,可以用于本地测试。

多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
多Master多Slave模式

broker 集群由多个 master 构成,每个 master 又配置了多个 slave。

同步双写 + 异步刷盘

brokerClusterName = DefaultCluster
# 同一个broker
brokerName = broker-a
# 0代表Master,非0代表Slave
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
#同步双写SYNC_MASTER,SLAVE。异步复制ASYNC_MASTER,SLAVE
brokerRole = ASYNC_MASTER
#同步刷盘SYNC_FLUSH。异步刷盘ASYNC_FLUSH
flushDiskType = ASYNC_FLUSH
主从同步复制SYNC_MASTER

Master与Slave都写成功才响应。

  • 优点:如果Master出故障,Slave上有全部的备份数据,容易恢复,消费者仍可以从Slave消费, 消息不丢失
  • 缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个Master的响应时间会略高

同步双写=同步复制+同步刷盘。

消息不会丢失,1)主节点与从节点都有数据。2)磁盘上也有数据

主从异步复制ASYNC_MASTER

Master写成功就响应。

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:较低的延迟和较高的吞吐量. Master宕机之后,消费者仍可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式几乎一样
  • 缺点:如果Master出了故障,有些数据因为没有被写入Slave,而丢失少量消息。

Spring实现

  • 导入MQ客户端依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>

生产者Producer

public class SyncProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.启动producerproducer.start();for (int i = 0; i < 10; i++) {//4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容。字节数组*/Message msg = new Message("消息主题Topic", "消息标签Tag", ("消息体").getBytes());//5.方式一:发送同步消息SendResult result = producer.send(msg);System.out.println("发送结果:" + result);//5.方式二:发送异步消息producer.send(msg, new SendCallback() {/*** 发送成功回调函数* @param sendResult*/public void onSuccess(SendResult sendResult) {System.out.println("发送结果:" + sendResult);}/*** 发送失败回调函数* @param e*/public void onException(Throwable e) {System.out.println("发送异常:" + e);}});//5.方式三:发送单向消息,没有任何返回结果producer.sendOneway(msg);}//6.关闭生产者producerproducer.shutdown();}
}
1.创建消息生产者producer,指定生产组producer_group
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer常见方法
setDefaultTopicQueueNums(8);默认主题在一个broker队列数量(对新创建主题有效)
setSendMsgTimeout(100 * 3);发送消息默认超时时间,默认为3s。
setCompressMsgBodyOverHowmuch(1024 * 4);消息体超过该值则启用压缩,默认为4k
setRetryTimesWhenSendFailed(2);同步方式发送消息重试次数,默认为2,总共执行3次
setRetryTimesWhenSendAsyncFailed(2);异步方式发送消息重试次数,默认为2,总共执行3次
setRetryAnotherBrokerWhenNotStoreOK(false);消息重试时选择另一个broker时(默认没有存储成员是否发送到另一个broker)。默认false
setMaxMessageSize(1024 * 1024 * 4);允许发送的最大消息长度。默认为4m
2.指定NameServer地址
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
3.启动producer
//3.启动producer
producer.start();
4.创建消息对象,指定消息主题Topic、 消息标签Tag和消息体body
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容。字节数组
*/
Message msg = new Message("消息主题Topic", "消息标签Tag", ("消息体body").getBytes());
延时消息发送

应用场景:比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。电影票选座位未支付

//delayTimeLevel:(1~18个等级)1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 设置延时等级3,这个消息将在10s之后发送。
message.setDelayTimeLevel(3);
批量消息发送

批量发送消息能显著提高传递小消息的性能。

限制:

1)这批消息应该有相同的topic。

2)不能是延时消息。

3)这批消息的总大小不应超过4MB。

// 实现Collection接口的子类即可。例如:List,Set,Queue,Vector
List<Message> messages = new ArrayList<>();
// Set<Message> messages = new HashSet<>();
messages.add(new Message("消息主题Topic", "消息标签Tag", "key1", "消息体body1".getBytes()));
messages.add(new Message("消息主题Topic", "消息标签Tag", "key2", "消息体body2".getBytes()));
messages.add(new Message("消息主题Topic", "消息标签Tag", "key3", "消息体body3".getBytes()));//如果这批消息的总长度可能大于4MB时,这时候最好把这批消息进行分割。把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();// 5.发送消息} catch (Exception e) {e.printStackTrace();}
}
public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加日志的开销20字节if (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
过滤消息发送
String[] tags = new String[]{"TAGA","TAGB","TAGC"};
for (i=0; i<10; i++) {Message msg = new Message("消息主题Topic",tags[i],"消息体body".getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置消息的一些属性msg.putUserProperty("SQL过滤属性", String.valueOf(i));// 5.发送消息
}
5.发送消息
//5.方式一:发送同步消息
SendResult result = producer.send(msg);
System.out.println("发送结果:" + result);//5.方式二:发送异步消息
producer.send(msg, new SendCallback() {/*** 发送成功回调函数* @param sendResult*/public void onSuccess(SendResult sendResult) {System.out.println("发送结果:" + sendResult);}/*** 发送失败回调函数* @param e*/public void onException(Throwable e) {System.out.println("发送异常:" + e);}
});//5.方式三:发送单向消息,没有任何返回结果
producer.sendOneway(msg);
发送同步消息

发送消息,同步响应。

使用场景:重要的消息通知,短信通知。

//5.发送消息
SendResult result = producer.send(msg);
发送异步消息

发送消息,异步回调响应。

使用场景:对响应时间敏感,即发送端不能容忍长时间地等待Broker的响应。

//5.发送异步消息
producer.send(msg, new SendCallback() {/*** 发送成功回调函数* @param sendResult*/public void onSuccess(SendResult sendResult) {System.out.println("发送结果:" + sendResult);}/*** 发送失败回调函数* @param e*/public void onException(Throwable e) {System.out.println("发送异常:" + e);}
});
单向发送消息

只发送消息,无响应结果。

使用场景:不关心发送结果。例如日志收集。

//5.发送单向消息
producer.sendOneway(msg);
顺序消息发送

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

全局顺序消息发送

一个topic只有一个queue

部分顺序消息发送

通过标识,把消息发送到topic不同的queue中。

//5.发送全局顺序消息
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;  //根据订单id标识选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}
}, orderList.get(i).getOrderId());//订单id标识
6.关闭生产者producer
//6.关闭生产者producer
producer.shutdown();

消费者Consumer

public Consumer() throws MQClientException {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(JmsConfig.CONSUMER_GROUP);//设定消费模式:负载均衡|广播模式consumer.setMessageModel(MessageModel.CLUSTERING);  //负载均衡//        consumer.setMessageModel(MessageModel.BROADCASTING);    //广播模式//消费者从哪里开始消费:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//2.指定Nameserver地址consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);//3.订阅主题和Tag标签( * 代表所有标签)consumer.subscribe(JmsConfig.TEST_TOPIC, "*");// 4.设置回调函数,消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {//消费者获取消息String body = new String(msg.getBody(), "utf-8");}} catch (UnsupportedEncodingException e) {// 消费失败则放到重试队列中return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();
}
1.创建消费者Consumer,指定消费者组
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(JmsConfig.CONSUMER_GROUP);
consumer常见方法
setConsumeThreadMax(20);消费者最小线程数量(默认20)
setConsumeThreadMin(20);消费者最大线程数量(默认20)
setPullInterval(0);推模式下任务间隔时间
setPullBatchSize(32);推模式下任务批量拉取条数,默认32
setMaxReconsumeTimes(-1);消息重试次数,-1代表16次(超过次数成为死信消息)
setConsumeTimeout(15);消息消费超时时间(消息可能阻塞在正在使用的线程的最大分钟)
消费者从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
ConsumeFromWhere.常量用途
CONSUME_FROM_LAST_OFFSET从消费队列上次消费偏移量开始消费。
CONSUME_FROM_MAX_OFFSET从消费队列最大偏移量开始消费。
CONSUME_FROM_FIRST_OFFSET从消费队列最小偏移量开始消费。
CONSUME_FROM_TIMESTAMP启动时间戳,默认为消费者启动之前的30分钟处开始消费。可以通过DefaultMQPushConsumer#setConsumeTimestamp。
消费者如何消费消息
负载均衡模式CLUSTERING

同一个消费组中消费者共同消费消息,每个消费者处理的消息不同。默认模式。

//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式BROADCASTING

同一个消费组中消费者消费相同的消息。

//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
2.指定NameServer地址
//2.指定Nameserver地址
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
3.订阅消息主题Topic和消息标签Tag
//3.订阅主题和Tag标签( * 代表所有标签)
consumer.subscribe(JmsConfig.TEST_TOPIC, "*");
消息过滤
//3.订阅主题和Tag标签( * 代表所有标签)
consumer.subscribe("消息主题Topic", "*");
consumer.subscribe("消息主题Topic", MessageSelector.byTag("tagA|tagB");
consumer.subscribe("消息主题Topic", MessageSelector.bySql("a between 0 and 3");
4.设置回调函数,接收处理消息
// 4.设置回调函数,消费消息MessageListenerConcurrently
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {//消费者获取消息String body = new String(msg.getBody(), "utf-8");}} catch (UnsupportedEncodingException e) {// 消费失败则放到重试队列中return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
过滤消息消费

SQL过滤消息

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE
// 4.设置回调函数,过滤消息消费MessageSelector.bySql
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("消息主题Topic", MessageSelector.bySql("a between 0 and 3");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {//消费者获取消息String body = new String(msg.getBody(), "utf-8");String tags = msg.getTags();String property = msg.getProperty("SQL过滤属性");}} catch (UnsupportedEncodingException e) {// 消费失败则放到重试队列中return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
顺序消息消费MessageListenerOrderly

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

全局顺序消息消费

一个topic只有一个queue。

部分顺序消息消费

通过标识消费topic中不同queue的消息。

//5.设置回调函数,部分顺序消息消费MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {try {for (MessageExt msg : msgs) {String body = new String(msg.getBody(), "utf-8");System.out.println("线程名:" + Thread.currentThread().getName() + body);}}catch (Exception e) {// 消费失败,为了防止破坏消息消费的顺序,不放到重试队列里,而是等一会再处理这批消息return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}
});
5.启动消费者consumer
//5.启动消费者consumer
consumer.start();

Domain Model存储模型

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

CommitLog:存储消息的元数据

CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home} \store$ { commitlog} \ $ { fileName}。在CommitLog 中,一个消息的存储长度是不固定的, RocketMQ采取一些机制,尽量向CommitLog 中顺序写 ,但是随机读。commitlog 文件默认大小为lG ,可通过在 broker 置文件中设置 mappedFileSizeCommitLog属性来改变默认大小。

Commitlog文件存储的逻辑视图如下,每条消息的前面4个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。

每台Rocket只会往一个commitlog文件中写,写完一个接着写下一个。

indexFile 和 ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上。

ConsumerQueue:存储消息在CommitLog的索引

ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件, 文件地址在$ {$storeRoot} \consumequeue$ {topicName} $ { queueld} $ {fileName}。

IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

过期文件删除

由于 RocketMQ 操作 CommitLog,ConsumeQueue文件是基于内存映射机制并在启动的时候会加载 commitlog,ConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。

删除过程分别执行清理消息存储文件( Commitlog )与消息消费 队列文件( ConsumeQueue 文件), 消息消费队列文件与消息存储文件( Commitlog )共用一套过期文件机制。

RocketMQ 清除过期文件的方法是 :如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除, RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42小时(不同版本的默认值不同,这里以4.4.0为例) ,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。

触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s执行一次。

从下面模型可以看出,要解决消费并发,就是要利用Queue,一个Topic可以分出更多的queue,每一个queue可以存放在不同的硬件上来提高并发。

Message与 Topic是多对一的关系,一个Topic可以有多个Message.

Topic到Queue是一对多的关系,这个也是方便横向拓展,也就是消费的时候,这里可以有很多很多的Queue.

一个Queue只有一个消费位点(Offset),所以Topic和Offset也是一对多的关系

Topic和Group也是多对多的关系。

Message 消息

生产或消费的数据,对于RocketMQ来说,消息就是字节数组。

messageId是全局唯一的。MessageKey是业务系统(生产者)生成的,所以如果要结合业务,可以使用MessageKey作为业务系统的唯一索引。

另外Message中的equals方法和hashCode主要是为了完成消息只处理一次(Exactly-Once)。

Exactly-Once是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

Group 分组

业务场景中,如果有一堆发送者,一堆消费者,所以这里使用Group的概念进行管理。

**生产者:**标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息

**消费者:**标识一类Consumer的集合名称,这类Consumer通常消费一类消息(也称为Consumer Group),且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。

Topic 主题

消息的类别。

Tags是在同一Topic中对消息进行分类

subTopics==Message Queue,其实在内存逻辑中,subTopics是对Topics的一个拓展,尤其是在MQTT这种协议下,在Topic底下会有很多subTopics。

Tag 标签

Topic的二级目录。如Topic是银行,Tag则可以是招商银行,工商银行,农业银行等。

同一个topic的消息虽然逻辑管理是一样的。但是消费同一个topic时,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。

Queue Topic分区

Queue是消息物理管理单位,比如在RocketMQ的控制台中,就可以看到每一个queue中的情况(比如消息的堆积情况、消息的TPS、QPS)

Offset 偏移量

对于每一个Queue来说都有Offset,这个是消费位点。

Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset

Consumer offset可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是****下次拉取的offset位置****。

常见问题

消息顺序消费

要确保消息的顺序,生产者、队列、消费者最好都是一对一的关系。但是这样设计,并发度就会成为消息系统的瓶颈(并发度不够)

RocketMQ不解决这个矛盾的问题:

1、 乱序的应用实际大量存在

2、 队列无序并不意味着消息无序

消息重复消费

造成消息重复的根本原因是:网络不可达(网络波动)。所以如果消费者收到两条一样的消息,应该是怎么处理?

RocketMQ不保证消息不重复,如果你的业务要严格确保消息不重复,需要在自己的业务端进行去重。

1、 消费端处理消息的业务逻辑保持幂等性

2、 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现

消息积压

消息丢失

消息幂等性

MQ Admin管理工具

使用方式

进入RocketMQ安装位置,在bin目录下执行./mqadmin {command} {args}

命令

Topic命令
名称含义命令选项说明
updateTopic创建更新Topic配置-bBroker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-ccluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询)
-h-打印帮助
-nNameServer服务地址,格式 ip:port
-p指定新topic的读写权限( W=2|R=4|WR=6 )
-r可读队列数(默认为 8)
-w可写队列数(默认为 8)
-ttopic 名称(名称只能使用字符 1+$ )
deleteTopic删除Topic-ccluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询)
-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称(名称只能使用字符 2+$ )
topicList查看 Topic 列表信息-h打印帮助
-c不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数
-nNameServer 服务地址,格式 ip:port
topicRoute查看 Topic 路由信息-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
topicStatus查看 Topic 消息队列offset-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
topicClusterList查看 Topic 所在集群列表-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
updateTopicPerm更新 Topic 读写权限-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
-bBroker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-p指定新 topic 的读写权限( W=2|R=4|WR=6 )
-ccluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令
updateOrderConf从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic,键
-vorderConf,值
-mmethod,可选get、put、delete
allocateMQ以平均负载算法计算消费者列表负载消息队列的负载结果-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
-iipList,用逗号分隔,计算这些ip去负载Topic的消息队列
statsAll打印Topic订阅关系、TPS、积累量、24h读写总量等信息-h打印帮助
-nNameServer 服务地址,格式 ip:port
-a是否只打印活跃topic
-t指定topic
集群命令
名称含义命令选项说明
clusterList查看集群信息,集群、BrokerName、BrokerId、TPS等信息-m打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-h打印帮助
-nNameServer 服务地址,格式 ip:port
-i打印间隔,单位秒
clusterRT发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。-aamount,每次探测的总数,RT = 总时间 / amount
-s消息大小,单位B
-c探测哪个集群
-p是否打印格式化日志,以|分割,默认不打印
-h打印帮助
-m所属机房,打印使用
-i发送间隔,单位秒
-nNameServer 服务地址,格式 ip:port
Broker命令
名称含义命令选项说明
updateBrokerConfig更新 Broker 配置文件,会修改Broker.conf-bBroker 地址,格式为ip:port
-ccluster 名称
-kkey 值
-vvalue 值
-h打印帮助
-nNameServer 服务地址,格式 ip:port
brokerStatus查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面)-bBroker 地址,地址为ip:port
-h打印帮助
-nNameServer 服务地址,格式 ip:port
brokerConsumeStatsBroker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息-bBroker 地址,地址为ip:port
-t请求超时时间
-ldiff阈值,超过阈值才打印
-o是否为顺序topic,一般为false
-h打印帮助
-nNameServer 服务地址,格式 ip:port
getBrokerConfig获取Broker配置-bBroker 地址,地址为ip:port
-nNameServer 服务地址,格式 ip:port
wipeWritePerm从NameServer上清除 Broker写权限-bBroker 地址,地址为ip:port
-nNameServer 服务地址,格式 ip:port
-h打印帮助
cleanExpiredCQ清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址,地址为ip:port
-c集群名称
cleanUnusedTopic清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址,地址为ip:port
-c集群名称
sendMsgStatus向Broker发消息,返回发送状态和RT-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBrokerName,注意不同于Broker地址
-s消息大小,单位B
-c发送次数
消息命令
名称含义命令选项说明
queryMsgById根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。-imsgId
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByKey根据消息 Key 查询消息-kmsgKey
-tTopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByOffset根据 Offset 查询消息-bBroker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到)
-iquery 队列 id
-ooffset 值
-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByUniqueKey根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果-h打印帮助
-nNameServer 服务地址,格式 ip:port
-iuniqe msg id
-gconsumerGroup
-dclientId
-ttopic名称
checkMsgSendRT检测向topic发消息的RT,功能类似clusterRT-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic名称
-a探测次数
-s消息大小
sendMessage发送一条消息,可以根据配置发往特定Message Queue,或普通发送。-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic名称
-pbody,消息体
-kkeys
-ctags
-bBrokerName
-iqueueId
consumeMessage消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic名称
-bBrokerName
-o从offset开始消费
-iqueueId
-g消费者分组
-s开始时间戳,格式详见-h
-d结束时间戳
-c消费多少条消息
printMsg从Broker消费消息并打印,可选时间段-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic名称
-c字符集,例如UTF-8
-ssubExpress,过滤表达式
-b开始时间戳,格式参见-h
-e结束时间戳
-d是否打印消息体
printMsgByQueue类似printMsg,但指定Message Queue-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic名称
-iqueueId
-aBrokerName
-c字符集,例如UTF-8
-ssubExpress,过滤表达式
-b开始时间戳,格式参见-h
-e结束时间戳
-p是否打印消息
-d是否打印消息体
-f是否统计tag数量并打印
resetOffsetByTime按时间戳重置offset,Broker和consumer都会重置-h打印帮助
-nNameServer 服务地址,格式 ip:port
-g消费者分组
-ttopic名称
-s重置为此时间戳对应的offset
-f是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系
-c是否重置c++客户端offset
消费者、消费组命令
名称含义命令选项说明
consumerProgress查看订阅组消费状态,可以查看具体的client IP的消息积累量-g消费者所属组名
-s是否打印client IP
-h打印帮助
-nNameServer 服务地址,格式 ip:port
consumerStatus查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand-h打印帮助
-nNameServer 服务地址,格式 ip:port
-gconsumer group
-iclientId
-s是否执行jstack
getConsumerStatus获取 Consumer 消费进度-g消费者所属组名
-t查询主题
-iConsumer 客户端 ip
-nNameServer 服务地址,格式 ip:port
-h打印帮助
updateSubGroup更新或创建订阅关系-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker地址
-c集群名称
-g消费者分组名称
-s分组是否允许消费
-m是否从最小offset开始消费
-d是否是广播模式
-q重试队列数量
-r最大重试次数
-i当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费
-w如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1
-a当消费者数量变化时是否通知其他消费者负载均衡
deleteSubGroup从Broker删除订阅关系-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker地址
-c集群名称
-g消费者分组名称
cloneGroupOffset在目标群组中使用源群组的offset-nNameServer 服务地址,格式 ip:port
-h打印帮助
-s源消费者组
-d目标消费者组
-ttopic名称
-o暂未使用
连接命令
名称含义命令选项说明
consumerConnec tion查询 Consumer 的网络连接-g消费者所属组名
-nNameServer 服务地址,格式 ip:port
-h打印帮助
producerConnec tion查询 Producer 的网络连接-g生产者所属组名
-t主题名称
-nNameServer 服务地址,格式 ip:port
-h打印帮助
NameServer命令
名称含义命令选项说明
updateKvConfig更新NameServer的kv配置,目前还未使用-s命名空间
-kkey
-vvalue
-nNameServer 服务地址,格式 ip:port
-h打印帮助
deleteKvConfig删除NameServer的kv配置-s命名空间
-kkey
-nNameServer 服务地址,格式 ip:port
-h打印帮助
getNamesrvConfig获取NameServer配置-nNameServer 服务地址,格式 ip:port
-h打印帮助
updateNamesrvConfig修改NameServer配置-nNameServer 服务地址,格式 ip:port
-h打印帮助
-kkey
-vvalue
其他命令
名称含义命令选项说明
startMonitoring开启监控进程,监控消息误删、重试队列消息数等-nNameServer 服务地址,格式 ip:port
-h打印帮助

注意事项

  • 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
  • 几乎所有命令都可以通过-h获取帮助
  • 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令;如果不配置Broker地址,则对集群中所有主机执行命令

Kafka

文件的方式来发布和订阅消息流,实时数据的管道

Kafka 是一个分布式的基于发布/订阅模式消息队列。???

Kafka官网主页

Kafka官方文档

架构

组件名称用途
Producer生产者向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。
Kafka ClusterKafka集群集群由多个 Broker 组成。
Broker经纪人一台 Kafka 服务器就是一个 Broker。一个 Broker可以容纳多个 Topic。
Topic主题通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息,同一个Topic下的消费者和生产者的数量并不是一对一的。
Partition存储分区Topic物理上的分组。 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
partition物理上由多个segment组成。
Replicatio n副本为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
Leader主节点每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
Follower容灾备份从节点每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。
Consumer Group (CG)消费者组一个组内的消费者不能重复消费消息;即消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。用来提高并发能力并防止集群重复消费。
什么时候消费速度最快?组内成员=分区数
Consumer消费者消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

Kafka分片和索引

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的 offset。 consumer组中的每个consumer, 都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。(producer -> log with offset -> consumer(s))

topic = N partition

partition = log

log = N segment

segment = .index + .log

由于生产者生产的消息会不断追加到 log 文件末尾, 为防止 log 文件过大导致数据定位效率低下, Kafka 采取了分片索引机制,将每个 partition 分为多个 segment。

每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下, 该文件夹的命名规则为: topic 名称+分区序号。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log文件的结构示意图。

“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

分区的原因
方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应适合的数据了;
可以提高并发,因为可以以 Partition 为单位读写了。(联想到ConcurrentHashMap在高并发环境下读写效率比HashTable的高效)

分区的原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。

生产者ISR

为保证 producer 发送的数据,能可靠的发送到指定的 topic, topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据。

何时发送ack?

确保有follower与leader同步完成,leader再发送ack,这样才能保证leader挂掉之后,能在follower中选举出新的leader。

多少个follower同步完成之后发送ack?

半数以上的follower同步完成,即可发送ack继续发送重新发送
全部的follower同步完成,才可以发送ack

副本数据同步策略

方案优点缺点
半数以上完成同步, 就发送 ack延迟低选举新的 leader 时,容忍 n 台节点的故障,需要 2n+1 个副本。(如果集群有2n+1台机器,选举leader的时候至少需要半数以上即n+1台机器投票,那么能容忍的故障,最多就是n台机器发生故障)容错率:1/2
全部完成同步,才发送ack选举新的 leader 时, 容忍 n 台节点的故障,需要 n+1 个副本(如果集群有n+1台机器,选举leader的时候只要有一个副本就可以了)容错率:1延迟高

Kafka 选择了第二种方案,原因如下:

同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1 个副本,而 Kafka 的每个分区都有大量的数据, 第一种方案会造成大量数据的冗余。
虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。

ISR
采用第二种方案之后,设想以下情景: leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,就会给 leader 发送 ack。如果 follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。 Leader 发生故障之后,就会从 ISR 中选举新的 leader。

生产者ACK

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。

所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

acks 参数配置:

0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;

-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 ISR 的follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前, leader 发生故障,那么会造成数据重复。

助记:返ACK前,0无落盘,1一落盘,-1全落盘,(落盘:消息存到本地)

数据一致性问题

LEO:(Log End Offset)每个副本的最后一个offset
HW:(High Watermark)高水位,指的是消费者能见到的最大的 offset, ISR 队列中最小的 LEO
follower 故障和 leader 故障
follower 故障:follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
leader 故障:leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

ExactlyOnce

消费者分区分配策略

消费者offset的存储

由于 consumer 在消费过程中可能会出现断电宕机等故障, consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

Kafka 0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

修改配置文件 consumer.properties,exclude.internal.topics=false。
读取 offset
0.11.0.0 之前版本 - bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter “kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter” --consumer.config config/consumer.properties --from-beginning
0.11.0.0 及之后版本 - bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” --consumer.config config/consumer.properties --from-beginning

高效读写&ZK作用

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。 官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间

零复制技术

NIC network interface controller 网络接口控制器

Zookeeper 在 Kafka 中的作用

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。Reference

Controller 的管理工作都是依赖于 Zookeeper 的。

以下为 partition 的 leader 选举过程:

事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

Producer 事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的 PID。

为了管理 Transaction, Kafka 引入了一个新的组件 Transaction Coordinator。 Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。 Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer 事务

上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

API生产者流程

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

相关参数:

batch.size: 只有数据积累到 batch.size 之后, sender 才会发送数据。
linger.ms: 如果数据迟迟未达到 batch.size, sender 等待 linger.time 之后就会发送数据。

Properties props = new Properties();
//zk服务器的地址  xxxx:2181
props.put("zookeeper.connect", zookeeper);
//组的名称,区别于其他group否则会接收不到数据
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "8000");
props.put("zookeeper.connection.timeout.ms", "20000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("automit.interval.ms", "5000");
props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "60000");
props.put("automit.enable", "true");
//重点参数,是否每次都从offset最前面开始读起
props.put("auto.offset.reset", "smallest");
#查看所有的topic
bin/kafka-topics.sh --zookeeper zk1.test-inf-zk.data.m:2181/octopus,zk2.test-inf-zk.data.m:2181/octopus,zk3.test-inf-zk.data.m:2181/octopus --list#查看topic的偏移量
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic xiuxiu_sync_search_big_data --time -1 --broker-list 192.168.199.11:9092 --partitions 0#查看topic的状态
bin/kafka-topics.sh --zookeeper 192.168.199.11:2181 --topic xiuxiu_sync_search_big_data --descr#系统上kafka目录
cd /home/install/kafka_2.12-2.4.1/bin/./kafka-consumer-groups.sh --bootstrap-server 10.45.50.66:9092 --list./kafka-consumer-groups.sh --bootstrap-server 10.45.50.66:9092 --group QLJ_0507_LK --describe./zkCli.sh

kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。


  1. a-zA-Z0-9_- ↩︎

  2. a-zA-Z0-9_- ↩︎

本文标签: 中间件