0%

kafka

kafka主要应用场景以及相关问题

kafka主要的应用场景

kafka是一个分布式流式处理平台。提供消息持久化机制。与其他消息队列相比,kafka的性能更好(批处理和异步,每秒可以处理千万级别的消息),生态系统比较完善(大数据领域和流式计算)。

应用场景

  1. 消息队列:建立实时数据管道,可在系统或者应用程序之间传递数据。特点(异步,解耦,流控)
  2. 数据处理:构建实时的数据处理程序处理数据流

kafka的消息模型

发布订阅模型,producer发布一条消息,通过topic传递给所有的订阅者。

kafka组件的基本概念

  • producer:消息发生产发送者
  • consumer:消费者,负责从kafka接收消息并消费
  • broker:代理服务器,是kafka集群中的一个节点,每个broker可以存储多个topic和paration的数据,并且和其他broker协调工作。
  • topic:主题,是一类消息的集合,每个topic可以被分为多个分区,每个分区可以存储一定数量的消息。
  • paration:分区,每个paration可以有多个副本,以提高可用性和容错性。
  • consumer group:消费者组,通过group id区分,每个组相互独立。一个消费者组可以消费多个paration,实现高吞吐。
  • replication:副本,同一条消息被拷贝到多台机器提供数据冗余,副本分为leader replication 和follower replication。每个paration可配置多个副本实现高可用。
  • offset:表示分区中每条消息的位置信息,是一个单调递增的值。叫做偏移量或者位移,一旦被写入到分区日志,将不会被修改。

多副本机制有什么好处

kafka中一个topic可以对应多个paration,各个paration可以分布在不同的broker上,这样可以提供更好的并发能力。因为paration可以指定多个副本,这就提高了消息存储的安全性和容灾能力,不过也增加了所需要的存储空间。

leader副本和follower副本的区别

只有leader副本才能对外提供读写服务,相应client端的请求。follower副本只是采用pull的方式被动同步leader副本中的数据,并且在leader副本宕机时,随时代替leader副本。

在kafka2.4版本开始,社区通过引入新的broker端参数,允许follower副本有限度的提供服务。

通常情况下,很多因素可能会造成leader和follower之间的不同步(程序异常,broker,网络问题等),如果长时间不通过就需要深入排查了。

注意:之前确保一致性的主要手段是高水位机制(HW),但高水位值无法保证Leader连续变更场景下的数据一致性,因此,社区引入了Leader Epoch机制,来修复高水位值的弊端。

zookeeper在kafka中的作用是什么

  • 提供元数据管理能力。
  • 成员管理是指broker节点的注册,注销,属性变更等。
  • controller选举是指选举集群controller,而其他管理类任务包括但不限于topic的删除,参数配置等。

kafka2.8.x以后剔除掉了对zookeeper的依赖,使用@metadata topic用来管理元数据信息。kraft里面存储新的配置文件。

去掉zookeeper之后的优点

  • 部署更简单
  • 性能更好
  • 监控更便捷

kafka如何保证消息的消费顺序是有序的

对于kafka,producer可以在写数据的时候指定一个key,比如userId, orderId,遇到相同id的数据会被分发到同一个paration,消费者从paration中取出来的数据也是有序的。

对于多线程消费时,如何保证有序?

预先设置N个队列,拥有相同key的数据放到同一个内存队列中,然后开启N个线程分别去消费对应的队列。

kafka如何保证发送的消息不丢失

broker丢消息

  • 设置 replication.factor >= 3

  • 设置request.require.acks=-1(默认是1,等ISR列表里面的所有follower paration 都写成功才返回给producer写成功)

  • 设置min.insync.replicas >= 2 (默认值是1,表示ISR列表里面至少有1个副本)

  • 设置unclean.leader.election.enable=false,这是Broker端的参数,如果一个Broker落后消息相比leader太多,就禁止让他成为新leader。

producer端丢消息

  • **使用producer.send(msg,callback)**,回调函数判断是否发送成功,如果因为网络或者其他原因发送失败,一般会进行重试,重试次数一般是3,并设置重试时间间隔,间隔太小重试效果不明显

consumer端丢失消息

  • 设置enable.auto.commit=false,默认情况下consumer拉取到分区的某个消息之后会自动提交offset,如果在消费前consumer挂掉会导致丢消息,所以可以设置成手动提交。(手动提交要注意保证消费端的幂等性)

kafka如何保证消息不重复消费

生产者:

  • pid:producerID,每个新的producer在初始化的时候会被分配一个唯一的pid
  • 序列号:对于每个PID该producer发送的每个paration都对应一个从0开始单调递增的sequence number。

broker在缓存中保存了序列号,对于接收的每个消息,如果序号大于broker缓存中的序号则接收否则就丢弃,这样可以保证单个producer对于同一个paration的幂等。

消费者:

  • 在数据库种根据主键或者唯一约束进行去重过滤。
  • 或者通过redis去重
  • 通过producer发送消息带上全局业务唯一key

kafka默认模式是at least once,但是这种模式可能产生重复的消费问题,所以业务逻辑必须做幂等处理。

kafka为什么那么快

  • 使用cache pageCache缓存
  • 顺序写入:磁盘的顺序写入比随机写入内存更快
  • 零拷贝:减少拷贝次数
  • 批处理消息:合并小的请求,然后以流的方式进行交互,直到网络上限。
  • pull模式:使用拉去模式进行消息的获取消费。

producer发送数据,ack设置的含义

request.required.acks 有三个值分别是0 1 -1

  • 0 :producer不会等待broker的ack,延迟最低但是会丢数据
  • 1:发一条消息,当leader paration写入成功以后才算写入成功,不过这种方式也有丢数据的可能。
  • -1:当ISR列表里面的所有副本写成功之后才返回ack

kafka中的ISR,AR表示什么

ISR(in-sync-replicas),副本同步集合,在这个集合中的follower会和leader副本进行同步,不在的不会进行同步。这是一个动态调整的集合。

AR:表示所有副本,包含OSR和ISR。

ISR是由leader维护,follower从leader同步数据有一定延迟((延迟时间)replica.lag.time.max.ms和(延迟条数,0.10已取消)replica.lag.max.message),超过任意一个阈值就会将follower从ISR中剔除存储到OSR列表(新加入的follower也会放在这里)

分区策略有哪些

分区的作用是提供负载均衡的能力,实现系统的高伸缩性。分区策略是决定将producer发送的消息分配到哪个分区的算法。

  • 轮询策略:按照分区顺序分配
  • 随机策略:将消息随机放到任意一个分区上
  • key-ordering策略:消息中的业务key可以保证同一个key的所有消息都进入同一个分区,因为消息处理在同一个分区是有序的,因此也被叫做按消息key顺序策略。

哪些场景使用了零拷贝

mmap的索引:索引是基于MappedByteBuffer的,也是让用户态和内核态共享的数据缓冲区,此时,数据不需要复制到用户态空间。

日志文件读写用到的TransportLayer:TransportLayer是kafka传输层的接口,它的实现类使用了fileChannel的transferTo方法,该方法底层使用sendfile实现了零拷贝。如果I/O通道使用普通的PLAINTEXT,那么,Kafka就可以利用Zero Copy特性,直接将页缓存中的数据发送到网卡的Buffer中,避免中间的多次拷贝。相反,如果I/O通道启用了SSL,那么,Kafka便无法利用Zero Copy特性了。

sendFile和mmap的区别

sendfile:在Linux2.1版本提供sendfile()函数

1
2
3
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
// 参数:目标文件描述符,原文件描述符,源文件偏移量,拷贝数据的长度

Linux内核2.4版本,对于支持网卡SG-DMA技术下,sendfile系统调用的过程发生了变化。可以直接将内核缓冲区的描述符和长度发送到socket缓冲区,不需要拷贝,这样就减少了一次数据拷贝。

sendfile只需要两次上下文切换,2次数据拷贝,拷贝都是用DMA(直接内存访问的方法)来完成不需要CPU参与。

mmap:需要4次上下文切换,3次拷贝。

1
2
buf = mmap(file, len);
write(sockfd, buf, len);

mmap调用后,DMA会将数据从磁盘拷贝到内核缓冲区,然后应用进程和操作系统共享这个缓冲区。

进程调用write,操作系统将内核缓冲区的数据拷贝到socket缓冲区,上下文切换到内核态,由CPU来完成搬运数据。

最后再由DMA将socket缓冲区的数据拷贝到网卡缓冲区。

sendFile相当于是原汁原味地读写,直接将硬盘上的文件送给网卡,但这种方式并不一定对所有场景都适用,比如如果你需要从硬盘上读取文件,然后经过一定修改之后再送给网卡的情况下,就不适合用sendFile,,需要使用mmap。

leader选举策略有哪些

  • OfflinePartition Leader选举:每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。
  • ReassignPartition Leader选举:当你手动运行kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。
  • PreferredReplicaPartition Leader选举:当你手动运行kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。
  • ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。

这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader。

kafka为什么不支持读写分离?

主要是如果主写从读有两个明显的缺点

1、数据一致性问题:数据从主节点转移到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。例如在某时刻,主节点和从节点中的A=X,之后主节点更新了A=Y,在同变更通知到从节点之前,从节点还是旧数据,这时候如果读从节点就会导致数据不一致问题。

2、延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

kafka如何保证高可用?

kafka多个broker组成一个集群,每个broker是一个节点,当创建一个topic时,这个topic会被划分成多个paration,每个paration可以存在不同的broker上,每个paration只存放一部分数据。

这就是天然的分布式消息队列,一个topic的数据分散在多台机器上,每个机器只存放一部分数据。

在0.8版本之前是没有高可用机制的,任何一台broker宕机就会提供读写服务。

在0.8以后提供了HA机制,每个paration上的数据会同步到其他机器形成自己的多个replication副本,所有replication会选举一个leader,其他副本是follower,读写都是在leader上完成,数据写完之后leader会同步到follower。这样提供容错性。

拥有了replica副本机制,如果某个broker宕机了,这个broker的paration还在其他机器上存有副本,如果leader宕机就会重新从follower中选取新的leader继续提供读写服务。

写数据的时候,生产者只将数据写入leader节点,leader节点会将数据写入到本地磁盘,然后其他follower会主动从leader拉取最新的数据,follow同步完成之后发送ack到leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。

消费数据的时候,consumer只从leader上取数据,但是只有当一个消息已经被所有follower同步完成后,这个消息才会被消费。

Consumer Rebalance机制

发生的条件

  1. 消费者组成员数发生变化
  2. 订阅的topic的分区数发生变化
  3. topic数量发生变化

rebalance事件通过消费端的心跳线程通知到其他consumer实例。

消费者组的状态流转

image.png

状态的含义

image.png

重平衡的流程

  1. 选择组协调器。每个消费者组都会选一个broker作为自己组的组协调器,负责监控消费者组里所有消费者的心跳判断是否宕机,然后开启消费组的Rebalance。
  2. 消费者向GroupCoordinator发送加入消费者组 JoinGroup请求,然后第一个加入到消费者组的consumer就是leader,leader负责指定分区方案。
  3. 等待leader consumer分配方案。leader通过给组协调者发送SyncGroup请求,然后组协调者将分区分配方案发送给各个consumer。

Leader Consumer的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。

消费者组Rebalance分区分配策略

主要有三种策略,range,round-robin,sticky。客户端通过设置参数paration.assignment.strategy来设置消费者和订阅topic之间的分区分配策略。

默认情况下是range策略。

  • range:按照分区序号排序,n=分区数$/$消费者数,m=​分区数%消费者数​,前m个consumer每个分配n+1个paration,剩下的分配n个paration。
  • round-robin:轮寻分配
  • sticky:分配分区尽可能均匀,如果一个consumer挂了,他所负责消费的分区会均分给剩下活跃的consumer

kafka和RocketMQ的对比

单机吞吐量

  • Kafka大概17.3w/s,rocketMQ吞吐量在11.6w/s。(Kafka吞吐量高的原因是Producer端将多个小消息合并再序列化后批量发送到Broker,RocketMQ没有这么做的原因是在发送端缓存过多消息GC是个问题)

数据可靠性

  • RocketMQ支持同步,异步刷盘,异步复制

  • Kafka使用异步刷盘,同步复制/异步复制

消息投递实时性

  • kafka使用短路安迅方式拉去消息,实时性取决于间隔时间,0.8以后版本支持长轮询。
  • RocketMQ使用长轮询,消息投递延时通常是在毫秒级别。

消息失败重试

  • kafka消费失败不支持重试
  • RocketMQ消费失败支持定时重试,每次重试时间间隔顺延

严格的顺序

  • Kafka和RocketMQ都支持顺序消息

定时消息

  • Kafka不支持定时消息,RocketMQ支持两类定时消息,开源版本支持秒级定时,阿里云版本支持毫秒级别延时

分布式事务消息

  • Kafka不支持事务消息,RocketMQ支持分布式事务消息

消息查询

  • Kafka不支持消息查询
  • RocketMQ支持根据消息标识查询消息,也支持根据消息内容查询消息

消息并行度

  • Kafka小的并行度依赖Topic配置的分区数
  • RocketMQ并行度在顺序消费时和Kafka一致,伦旭取决于Consumer线程数,如Topic配置10个队列,10台Consumer,每台机器100个线程那么并行度就是1000