0%

rocketmq

MQ(message Queue):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入…

RocketMQ

概述

什么是MQ?

MQ(message Queue):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为消息中间件。通过利用高效可靠的消息传递机制进行平台无关的数据交流,并给予数据通信来进行分布式系统的集成。

MQ有哪些?

老牌的ActiveMQ、RabbitMQ、当今热门的Kafka,阿里巴巴自主开发的RocketMQ等。

不同的MQ的特点

ActiveMQ是由Apache出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。ActiveMQ实现了JMS 1.1 并提供了很多附加的特性,比如JMX管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等。主要特性如下:

  • 支持Java、C、C++、C#、Ruby、Perl、Python、PHP等多种语言的客户端和协议,如OpenWire、STOMP、AMQP、MQTT协议。
  • 提供了像消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化之类的高级特性。
  • 完全支持JMS 1.1 和 J2EE 1.4 规范 (包括持久化、分布式事务消息、事务)
  • 支持Spring框架,ActiveMQ 可以通过Spring 的配置文件方式很容易嵌入Spring应用中。
  • 通过了常见的J2EE服务器测试,比如TomEE、Geronimo、JBoss、GlassFish、WebLogic。
  • 连接方式多样化,ActiveMQ 提供了多种连接方式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA。
  • 支持通过使用JDBC 和 Journal 实现消息的快速持久化。
  • 为高性能集群、客户端-服务器、点对点通信等场景而设计。
  • 提供了技术和语言中立的REST API 接口。
  • 支持以AJAX 方式调用 ActiveMQ。
  • ActiveMQ 可以轻松地与CXF、Axis 等 WebService 技术整合,以提供可靠的消息传递。
  • 可以作为内存中的JMS 提供者,非常适合 JMS 单元测试。

作用

限流削峰

MQ可以将系统中超量的请求暂时存在其中,以便于后期可以慢慢处理,从而避免了请求的丢失或者系统被大流量打垮。

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量和并发度,且系统耦合度太高。异步调用则会解决这个问题。

数据收集

通过MQ可以收集业务日志,用户行为等数据。

基本概念

消息模型

RocketMQ由Producer、Consumer、Broker三部分组成,Produce负责生产消息,Consumer负责消费消息,Broker负责存储消息,Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic消息可以分片存储于不同的Broker。Message Queue用于存储消息的物理地址,每个Topic中的消息地址存储于多个Message Queue中。Consumer Group由多个Consumer实例构成。

消息

泛指所传输信息的物理载体,生产和消费的最小数据单位,每条消息必须属于一个主题。RocketMQ中每个消息都拥有唯一的MessageID,并且可以携带具有业务标识的key,系统提供了通过Message ID和Key查询消息的功能。

Topic主题

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

Tag

消息的标签,用于区分同一主题下的不同类型的消息,来自同一业务单元的消息,可以根据不同业务目的在同一个主题下设置不同的标签。标签能够有效保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现不同子主题的不同消费逻辑,实现更好的扩展性。

Producer

一个消息生产者会把业务应用系统里产生的消息发送到broker服务器,RocketMQ提供多种发送方式。同步发送,异步发送,顺序发送,单向发送。同步和异步方式均需要broker返回确认消息,单向发送不需要。

消息生产者支持分布式集群方式部署。通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

Consumer

一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息,并将消息提供给应用程序。从用户的角度是提供了两种方式,拉取式消费和推送式消费。支持分布式集群方式部署,支持push推和pull拉两种方式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求.

Broker Server

代理服务器,负责存储转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储,同时为消费者的拉取请求做准备。代理服务器页存储消息相关的元数据信息(消息进度偏移量,主题,队列消息等)。

Name Server

名称服务充当路由信息的提供者。生产者或消费者能够通过名字服务查找各个主题相应的Broker IP列表。多个NameServer实例组成的集群,但是相互独立,没有信息交换。类似于Dubbo中的zookeeper,支持Broker的动态注册与发现,主要包括两个功能,一个是Broker管理,另一个是路由信息管理.NameServer接收Broker集群的注册信息并且保存下来作为路由信息的基本数据.然后提供心跳检测机制,检查Broker是否还能存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息然后Producer和Consumer提供NameServer就能知道整个Broker的路由信息,从而进行投递和消费.NameServer通常也是集群部署,各个实例之间不进行通信.Broker是向每台NameServer注册自己的路由信息,所以每个NameServer实例上面都保存一份完整的路由信息.当某个NameServer因为某种原因下线了,Broker仍然可以向其他NameServer同步路由信息,Producer和Consumer仍然可以动态感知Broker的路由信息。

拉取式消费

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉取消息,主动权由应用控制,一旦获取了批量信息,应用就会启动消费过程。

推动式消费

Consumer的另一种消费方式,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

生产者组

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致,如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会练习同一生产者实例以提交或者回溯消息。

消费者组

同类Consumer的集合。要求这类消费者消费逻辑一致,消费的是同一类消息。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易,要注意的是,消费者组的消费者实例必须订阅完全相同的Topic,RocketMQ支持两种消息模式:集群消费和广播消费。

集群消费

相同Consumer Group的每个Consumer实例平均分摊消息。

广播消费

相同Consumer Group的每个Consumer实例都全量接收消息。

普通顺序消息

在该模式下,消费者通过同一个消息队列收到的消息是有顺序的,不同的消息队列收到的消息则可能是无顺序的。

严格顺序消息

消费者收到的消息都是严格有序的。

特性

消息的订阅与发布

消息的发布是指某个生产者向某个topic发送消息,消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。

消息的有序

消息的有序是指一类消息消费时,能按照发送的顺序来消费,例如:一个订单残生了三个消息分别是创建订单,订单付款,订单完成,消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格保证消息的有序。

顺序消息分为全局顺序消息和分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一个组消息被顺序消费就可以。

全局顺序的适用场景:对于指定的topic,所有消息按照严格的FIFO发布和消费,适用于性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景。

分区顺序的适用场景:针对的是同一个Topic中根据sharding key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding key是用来区分不同分区的关键字段,和不同的key是不同的概念,适用于性能要求很高,在同一个区块中严格按照FIFO原则进行消息发布和消费的场景。

消息的过滤

RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤,消息过滤目前是Broker端实现的,有点事减少对于Consumer无用的网络传输,缺点是增加了Broker的负担,而且实现相对复杂。

对消息可靠性的影响因素

  1. Broker非正常关闭
  2. Broker异常Crash
  3. 操作系统Crash
  4. 机器掉电但是能立即恢复供电情况
  5. 机器无法开机(可能是CPU,主板,内存等关键设备损坏)
  6. 磁盘设备损坏

1、2、3、4四种情况属于硬件资源可以例子恢复的情况,在这几种情况下可以保证消息不丢失或者丢失少量消息。5,6情况属于单点故障且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可以保证99%的消息不丢但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场景,例如对金额的实时计算。

至少一次(At least once)

至少一次是指每个消息至少投递一次,Consumer先拉取到本地,消费完之后才会返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持该特性。

回溯消费

回溯消费是指Consumer已经消费成功,但由于业务需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留,并且重新消费一般是按照时间维度,例如Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度回退消费进度,回退的时间维度精确到毫秒。

事务消息和定时消息

事务消息是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败,RocketMQ的事务消息提供X/Open XA的分布式功能,通过事务消息能达到分布式事务的最终一致性。

定时消息(延迟队列)是指消息发送到Broker后,不会立即被消费,等待特定时间投递给真正的topic,broker由配置项messageDelayLevel,默认是18个等级,“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,这个延后时间等级可以自定义,需要注意的是messageDelayLevel是Broker的属性,不属于某个topic,发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。

  • 当level==0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1秒
  • 当level>maxLevel,则level==maxLevel

定时消费会暂存在名为SCHEDULE_TOPIC_XXX的topic中,并根据delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,即一个queue只存在相同的延迟消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息还在第一次写入和调度写入时都会计数,因此发送数量,tps都会变高。

消息重试

Consumer消费失败之后,要提供重试机制,让消息再消费一次。消息消费失败的原因

  • 由于消息本身的原因,例如序列化失败,消息数据本身因为业务无法处理等,这种错误通常要跳过这个消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,过10s后再重试。
  • 由于依赖的下有服务不可用也会造成消费失败,这种情况即使跳过这条消息,其他消息也会报错,应该建议应用sleep30s,再消费下一条消息,这样可以减轻Broker重试消费的压力。

消息重投

消息重投保证消息尽可能发送成功,不丢失但可能会造成重复消费,消息重复在RocketMQ是无法避免的问题,消息重复在一般情况下不会发生当出现消息量大,网络抖动,消息重复就会是大概率事件,另外生产者主动重发,consumer负载变化也会导致重复消息。

消息重试策略

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed+ 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

流量控制

生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流量控制:

  1. commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认是1000ms,返回流控。
  2. 如果开启transientStorePollEnable==true,且broker为异步刷盘的主机,transientStorePool中资源不足,拒绝当前send请求,返回流控。
  3. Broker每个10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsLnSendQueue,默认200ms,拒绝当前send请求,返回流控。
  4. broker通过拒绝send请求方式实现流量控制

生产者流控不会尝试消息重投。

消费者流量控制:

  • 消费者本地缓存消息数超过pillThresholdForQueue时,默认1000。
  • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB
  • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认是2000。

消费者流量控制的结果是拉取频率。

什么是死信队列

死信队列是处理无法被正常消费的消息。当一条消息初次消费失败后,消息队列会对进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确的消费该消息,此时,消息队列不会立刻丢弃这条消息,而是将他存储在死信队列中。RocketMQ可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

顺序消息

消息有序是指可以按照消息的发送顺序来消费。RocketMQ分给分区有序和全局有序。

顺序消费的原理:在默认情况下消息发送会采用轮询的方式将消息发送到不同的queue(分区队列);而消费消息的时候从多个queue中拉取消息,这种情况发送和消费是不能够保证有序,但是如果发送的消息只发送到同一个queue,消费的时候只从这个queue中取消息,就能保证顺序,当发送和消费参与的queue只有一个,则是全局有序;如果有多个queue参与,则为分区有序,也就是相对每个queue,消息都是有序的。

延时消息

使用场景:比如提交一个订单之后就可以发送一个延时消息,1小时候后检查这个订单状态,如果还是未支付就释放库存。

RocketMQ并不支持任意时间的延时,需要设置几个固定的延时等级,

1
2
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

消息发送的时间和设置的等级与重试次数有关。

想要批量发送消息的条件是这些消息应该有相同的topic,相同的waitStoreMsgOK,并且不能是延时消息,此外这批消息的总大小不应该超过4MB。

根据标签过滤消息

1
2
3
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
// 消费者将接收TAGA,TAGB,TAGC的消息,但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用,在这种情况下,可以使用SQL表达式筛选消息。

只有使用push模式的消费者才能用使用SQL92标准的sql语句

public void subscribe(final String topic, final MessageSelector messageSelector)

生产者可以通过putUserProperty()方法来设置消息的属性。

消息的存储

整体架构采用的是混合型存储结构,单个Broker实例下的所有队列共用一个commitlog。也就是多个topic的内容都存储到一个CommitLog中,只要Broker端采用同步或者异步的方式将消息保存到CommitLog中就不会丢失消息。服务端支持长轮询模式,如果一个消息拉去请求没有拉到消息,Broker允许等待30s,只要在这段时间内有新消息到达,将直接返回给消费端。

  1. CommitLog:消息主题以及元数据的存储主题,存储生产者写入的消息主题内容,消息内容不是定长的,单个文件大小默认是1G,文件名长度20位,左边补充0,剩余是起始偏移量。消息是顺序写入到日志文件的,当文件写满之后再写下一个文件。顺序写入磁盘是随机写效率的6000倍。
  2. ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,主要用来存储消息在commitlog的索引。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件根据topic检索是非常低效的,ConsumeQueue作为消费消息的索引,保存指定topic下的队列消息在commitlog中的其实物理偏移量offset,消息大小size,tag的hashcode值,consumequeue文件可以看成是基于topic的commitlog索引文件,所以具体存储路径是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},consumequeue文件采取定长设计,每个条目一共20个字节,单个文件由30w个条目组成,可以像数组一样随机访问每个条目,每个ConsumeQueue文件大小大约5.72M。
  3. IndexFile:索引文件提供一种可以通过key或者时间区间来查询消息的方法。Index文件的存储位置是$HOME \store\index${fileName},文件名是以创建时间戳命名的,固定单个IndexFile文件大小约400M,一个IndexFile可以保存2000W个索引,索引文件底层实现是hash索引。

页缓存:是操作系统对文件的缓存,用于加速对文件的读写,一般来讲,顺序读写的速度几乎接近于内存的读写速度,主要原因就是使用了PageCache机制对读写访问操作进行了优化,将一部分内存用作页缓存。写的时候先写入到PageCache,随后通过异步的方式同步到物理磁盘上,对于读取操作,如果未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对相邻的数据预读取。

零拷贝:RocketMQ利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种mmap的方式减少了一次从操作系统内核缓冲区到用户应用地址空间的缓冲区的拷贝),从而极大底稿了文件的读写效率。

消息刷盘

  • 同步刷盘:只有消息真正持久化到磁盘后,Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来讲是一种不错的保障,但是性能上会有较大影响,但是这种相比于异步刷盘更安全,因此常用于金融业务。
  • 异步刷盘:只要消息写入到PageCache,即可将成功的ACK返回给Producer端,消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提供性能和吞吐量。

消息的高可用性

RocketMQ分布式集群是通过MASTER和Slave的配合达到高可用性的。

Master和Slave的区别:在Broker的配置文件中,参数brokerId的值为0表明这个Broker是Master,大于0表名这个是Slave。brokerRole参数也会说明这个Broker是Master还是Slave。

Producer只能和Master角色的Broker连接写入消息;Consumer两者都可以连接读取消息。

消费端高可用

当Master繁忙或者不可用的时候,Consumer可以自动切换到Slave读,有了自动切换Consumer,可以保证消费端的高可用。

发送端高可用

producer发送消息时,将Topic的多个Message Queue创建在多个Broker组上(Broker名字相同,Id不同),当一个Broker组的Master不可用之后,Producer仍然可以发送消息。RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要手动停止Slave,更改配置文件,用新的配置文件启动Broker。

消息的主从复制

同步复制:Master和Slave均要写成功之后才会反馈给客户端。

如果Master出现故障,Slave上全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统的吞吐量。

异步复制:只要Master写成功就会反馈给客户端写成功状态。异步复制具有较低的延迟和高吞吐量,但是当Master出现故障,这时Slave上的数据有可能是不完整的,因此会出现消息丢失,不可靠。

配置文件:brokerRole=ASYNC_MASTER/SYNC_MASTER/SLAVE;异步,同步都是在主Broker配置的,如果是从Broker配置成Slave。

通常使用时,会将刷盘方式配置成异步刷盘,主从复制配置成同步复制。

通信机制

rocketMQ的基本通讯过程如下

  1. Broker启动之后需要完成一次将自己注册到NameServer的操作;随后每隔30s时间向NameServer上报Topic信息。
  2. 消息生产者作为客户端发送消息的时候,需要根据消息的topic从本地缓存的TopicPublishInfoTable中获取路由信息,如果TopicpushinfoTable中没有信息就会从NameServer上重新拉起一次消息并更新TopicPublishInfoTable。
  3. producer从路由信息中选择一个队列进行消息发送;Broker作为消息的接受者接收消息并落盘存储。
  4. 消息消费者获取路由信息并完成客户端的负载均衡后,选择其中的一个或者几个消息队列来拉取消息并进行消费。

通信类的结构

img

协议与编码

为了高效的传输信息和读取收到的消息,就需要对消息进行编解码。RemotingCommand这个类在消息传输过程中对所有数据内容进行了封装,不但包含了所有的数据结构还有编解码操作。

消息头:

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC的标志 区分是普通RPC还是onewayRPC的标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息

消息体

img

消息总长度:自个字节,占用一个int类型

序列化类型和消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度。

消息头数据:经过序列化后的消息头数据;

消息主体数据:消息主体的二进制字节数据内容。

通信方式:同步,异步,单向三种,其中单向一般用在发送心跳包不关注response。

负载均衡

RocketMQ的负载均衡都在Client端完成。分为Producer发送消息的负载均衡和Consumer端订阅消息的负载均衡。

Producer的负载均衡

生产者发送消息的时候,会根据topic找到指定的TopicPublishInfo,在获取TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue方法会从TopicPublishInfo中选一个队列发消息。具体的容错策略都是在MQFaultStrategy这个类中定义。MQFaultStrategy中有一个sendLatencyFaultEnable 的私有属性,默认是false,当这个开关打开后会在随机递增基础上过滤掉不可靠的broker代理。是对之前失败的,按一定时间做回退。latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

Consumer的负载均衡

在RocketMQ中,Consumer端的两种消费模式其实都是基于pull拉模式获取消息的,Push模式只是对pull模式的一种封装,其本质上是为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又马不停蹄的继续向服务器再次尝试拉取消息。如果未拉取消息,则延迟一下又会继续拉取。在两种基于拉取模式的消费模式中,都需要Consumer端在知道从Broker端的哪一个消息队列中获取消息,即是Broker端中多个MessageQueue分配到同一个消费组中对应给哪些Consumer消费。

1、consumer端的心跳包发送

在consumer启动之后,他就会通过定时任务不断的向RocketMQ集群中的所有broker实例发送心跳包。broker收到心跳消息之后会在本地维护一个consumerTable,同时将封装好的客户端网络通道信息保存到本地channelInfoTable中为以后做consumer端的负载均衡提供可以依据的数据信息。

2、consumer端实现负载均衡的核心类是RebalanceImpl。

consumer实例启动流程中会启动MQClientInstance实例,会完成负载均衡服务线程RebalanceService的启动,RebalanceService线程的run方法最终会调用RebalanceImpl的rebalanceByTopic方法,这个方法是负载均衡的核心实现。会根据消费者通信类型(广播和集群模式)的不同做不同的处理。

集群模式下的主要处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void rebalanceByTopic(String topic, boolean isOrder) {
Set mqSet;
switch(this.messageModel) {
//广播模式
case BROADCASTING:
mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
}
break;
case CLUSTERING://集群模式
//从本地缓存变量topicSubscribeInfoTable获取该topic主题下的消费队列集合
mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
//根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者Id列表的RPC通信请求
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
if (null == mqSet && !topic.startsWith("%RETRY%")) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList();
mqAll.addAll(mqSet);
//先对消息消费队列,消费者ID进行排序
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List allocateResult = null;

try {
//计算待拉取的消息队列
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable var10) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), var10);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
}

}

img

  • 上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;
  • 上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列

事务消息

rocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

img

事务消息的发送与提交流程。

  1. 发送half消息。
  2. 服务端响应消息写入结果。
  3. 根据发送结果执行本地事务,如果写入失败,该half消息对业务不可见
  4. 根据本地事务状态执行commit或者rollback。commit会生成消息索引对业务可见。

补偿机制主要是解决当commit或者rollback执行失败之后的挽救措施。

  1. 对没有commit或者rollback的消息,服务端会进行回查。
  2. 生产者检查本地事务消息的状态。
  3. 根据状态重新commit或者rollback。

RocketMQ事务消息不可见的实现策略:写入的事务消息,会对topic和queue等属性进行替换,正因为消息主题被替换了,所以消息不会转发到原先主题的消费队列中,所以消费者无法感知这个消息的存在。

Op消息的引入

如果第二阶段是进行Rollback操作,就需要撤销第一阶段的消息,但是不是真正意义上的撤销,因为是顺序写入文件的,但是区别是这条消息没有确定状态,需要一个操作表示这个状态,因此引入了Op消息的概念。Op消息是用来表示事务消息已经确定的状态(要么是Commit要么是Rollback)。如果一条消息没有对应的Op消息,就代表该事务消息还无法确定。

Op消息的存储和对应关系

rocketMQ将Op消息写入到一个内部的Topic中,这个topic不会被用户消费。Op消息的内容是对应的Half消息的存储的Offset,这样通过Op消息能定位到Half消息,进行后续的操作。

img

Half消息的索引构建

事务消息的第二次提交其实是利用第一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入的流程。

2PC失败时消息该如何处理

例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

消息的查询

RocketMQ支持两种查询方式,一种是按照messageID查询,另一种是按照message Key查询。

  1. MessageID的长度一共有16个字节,其中包含了消息存储的主机地址,消息CommitLog offset。按照MessageId查询消息“在RocketMQ中具体的做法是:Client端从MessageId解析出Broker的底子好和CommitLog的偏移量之后封装成一个RPC请求通过通信层发送。”
  2. 按照Message Key查询消息。这种是基于IndexFile索引文件来实现的。索引文件的结构类似于HashMap。40 Byte 的Header用于保存一些总的统计信息,4500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。202000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

img

常见面试题

MQ的作用?

降低系统耦合,异步处理提升响应时间,请求到达峰值后,可以保持固定消费速率用于流量削峰。

RocketMQ的角色组成以及各个角色的作用?

  1. Nameserver:无状态,动态列表;这也是和zookeeper的重要区别之一。zookeeper是有状态的。
  2. Producer:消息生产者,负责发消息到Broker。
  3. Broker:就是MQ本身,负责转发消息、持久化消息等。
  4. Consumer:消息消费者,负责从Broker上拉取消息进行消费,消费完进行ack。

消息被消费后会立即删除吗

不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度消息,当有消息消费后CommitLog的offerset就更新了。

消息堆积后什么时候会清理过期消息,如何处理

4.6版本默认48小时候会删除不再使用的commitLog文件。检查这个文件最后的访问时间,并判断是否大于过期时间,指定时间删除,默认凌晨4点。

消息堆积会不会进入到死信队列

不会,消息在消费失败之后会进入重试队列,默认重试16次才会进入死信队列。

死信队列的作用以及队列里消息的保存时间

死信队列用于存储消费失败的消息,消息消费失败重试次数达到阈值默认是16次会放入死信队列,死信队列中的消息默认保存时间是72小时,你可以在消息队列控制台查询消息以及重发死信队列中的消息。

  1. 代码正常执行返回消息状态为CONSUME_SUCCESS,执行异常返回RECONSUME_LATER
  2. 状态为RECONSUME_LATER的消息会进入到重试队列,重试队列的名称为 %RETRY% + ConsumerGroupName;
  3. 重试16次消息任然没有处理成功,消息就会进入到死信队列%DLQ% + ConsumerGroupName;

消息消费的模式

  • 集群消费:一条消息只会被一个Group中的一个Consumer消费
  • 广播消费:多个Group同时消费一个Topic时,每个Group中的Consumer都会消费这条数据。

Broker如何处理pull请求的

Consumer首次请求Broker

  • Broker中是否有符合条件的消息

  • 有 ->

    • 响应Consumer
    • 等待下次Consumer的请求
  • 没有

    • DefaultMessageStore#ReputMessageService#run方法
    • PullRequestHoldService 来Hold连接,每个5s执行一次检查pullRequestTable有没有消息,有的话立即推送
    • 每隔1ms检查commitLog中是否有新消息,有的话写入到pullRequestTable
    • 当有新消息的时候返回请求
    • 挂起consumer的请求,即不断开连接,也不返回数据

如何做负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说,主要可以分为Producer端发送消息时的负载均衡和Consumer端订阅消息的负载均衡。消费消息的方式push&pull实际上均由pull模式实现,所以具体负载均衡是在客户端完成的。核心类—RebalanceImpl 在Consumer启动的过程中会完成RebalanceService启动,每20s执行一次,最终调用RebalanceImpl来实现。

如何避免重复消费

  • 数据库表中定义为以约束,如果有重复数据插入报唯一键冲突
  • Redis设置全局唯一业务ID,处理前查一下,如果有的话就不处理,否则在处理后面逻辑。
  • 分布式锁

如何保证有序消费

首先多个队列只能保证单个队列中的消息有序,同一个topic同一个队列发消息的时候一个线程发送,消费的时候单线程消费。

发送消息通过id对队列个数取余保证同一个消息发送到同一个queue。

如何保证消息不丢失

  • Producer:采取send()同步发送消息,发送结果是同步感知的,发送失败可以重试,默认重试3次。
  • broker:修改刷盘策略为同步刷盘,默认是异步刷盘。集群部署,主从模式,高可用
  • consumer:完全消费后手动ACK确认。

RocketMQ分布式事务的原理

事务消息处理流程

img

事务消息的生命周期

img

事务消息的使用限制

  • 事务消息仅支持在MessageTypeTransaction的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。
  • RocketMQ事务消息保证最终一致性,但是上下游系统事务之间的中间状态不一定一致。
  • 如果Half Message发送到Broker之后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

如何优化性能(Producer和Consumer)

  • 同一个消费者组,多台机器部署,并行消费
  • 单个Consumer提高消费线程个数
  • 批量消费(消息批量拉取,业务逻辑批量处理)
  • Page Cache,JVM调优,多线程

NameServer的作用

Broker的信息(IP,topic对应)会注册到所有的NameServer。

Broker宕机了怎么办

Broker主从架构以及多副本策略,Master收到消息后同步给Slave,这样消息不止一份,可以保证MQ的可靠性和高可用。

参考

kafka和RocketMQ对比

github:RocketMQ

https://www.cnblogs.com/javazhiyin/p/13327925.html

https://help.aliyun.com/document_detail/440244.html