kafka主要应用场景以及相关问题
kafka主要的应用场景
kafka是一个分布式流式处理平台。提供消息持久化机制。与其他消息队列相比,kafka的性能更好(批处理和异步,每秒可以处理千万级别的消息),生态系统比较完善(大数据领域和流式计算)。
应用场景
- 消息队列:建立实时数据管道,可在系统或者应用程序之间传递数据。特点(异步,解耦,流控)
- 数据处理:构建实时的数据处理程序处理数据流
kafka的消息模型
发布订阅模型,producer发布一条消息,通过topic传递给所有的订阅者。
kafka组件的基本概念
- producer:消息发生产发送者
- consumer:消费者,负责从kafka接收消息并消费
- broker:代理服务器,是kafka集群中的一个节点,每个broker可以存储多个topic和paration的数据,并且和其他broker协调工作。
- topic:主题,是一类消息的集合,每个topic可以被分为多个分区,每个分区可以存储一定数量的消息。
- paration:分区,每个paration可以有多个副本,以提高可用性和容错性。
- consumer group:消费者组,通过group id区分,每个组相互独立。一个消费者组可以消费多个paration,实现高吞吐。
- replication:副本,同一条消息被拷贝到多台机器提供数据冗余,副本分为leader replication 和follower replication。每个paration可配置多个副本实现高可用。
- offset:表示分区中每条消息的位置信息,是一个单调递增的值。叫做偏移量或者位移,一旦被写入到分区日志,将不会被修改。
多副本机制有什么好处
kafka中一个topic可以对应多个paration,各个paration可以分布在不同的broker上,这样可以提供更好的并发能力。因为paration可以指定多个副本,这就提高了消息存储的安全性和容灾能力,不过也增加了所需要的存储空间。
leader副本和follower副本的区别
只有leader副本才能对外提供读写服务,相应client端的请求。follower副本只是采用pull的方式被动同步leader副本中的数据,并且在leader副本宕机时,随时代替leader副本。
在kafka2.4版本开始,社区通过引入新的broker端参数,允许follower副本有限度的提供服务。
通常情况下,很多因素可能会造成leader和follower之间的不同步(程序异常,broker,网络问题等),如果长时间不通过就需要深入排查了。
注意:之前确保一致性的主要手段是高水位机制(HW),但高水位值无法保证Leader连续变更场景下的数据一致性,因此,社区引入了Leader Epoch机制,来修复高水位值的弊端。
zookeeper在kafka中的作用是什么
- 提供元数据管理能力。
- 成员管理是指broker节点的注册,注销,属性变更等。
- controller选举是指选举集群controller,而其他管理类任务包括但不限于topic的删除,参数配置等。
kafka2.8.x以后剔除掉了对zookeeper的依赖,使用@metadata topic用来管理元数据信息。kraft里面存储新的配置文件。
去掉zookeeper之后的优点
- 部署更简单
- 性能更好
- 监控更便捷
kafka如何保证消息的消费顺序是有序的
对于kafka,producer可以在写数据的时候指定一个key,比如userId, orderId,遇到相同id的数据会被分发到同一个paration,消费者从paration中取出来的数据也是有序的。
对于多线程消费时,如何保证有序?
预先设置N个队列,拥有相同key的数据放到同一个内存队列中,然后开启N个线程分别去消费对应的队列。
kafka如何保证发送的消息不丢失
broker丢消息
设置 replication.factor >= 3
设置request.require.acks=-1(默认是1,等ISR列表里面的所有follower paration 都写成功才返回给producer写成功)
设置min.insync.replicas >= 2 (默认值是1,表示ISR列表里面至少有1个副本)
设置unclean.leader.election.enable=false,这是Broker端的参数,如果一个Broker落后消息相比leader太多,就禁止让他成为新leader。
producer端丢消息
- **使用producer.send(msg,callback)**,回调函数判断是否发送成功,如果因为网络或者其他原因发送失败,一般会进行重试,重试次数一般是3,并设置重试时间间隔,间隔太小重试效果不明显
consumer端丢失消息
- 设置enable.auto.commit=false,默认情况下consumer拉取到分区的某个消息之后会自动提交offset,如果在消费前consumer挂掉会导致丢消息,所以可以设置成手动提交。(手动提交要注意保证消费端的幂等性)
kafka如何保证消息不重复消费
生产者:
- pid:producerID,每个新的producer在初始化的时候会被分配一个唯一的pid
- 序列号:对于每个PID该producer发送的每个paration都对应一个从0开始单调递增的sequence number。
broker在缓存中保存了序列号,对于接收的每个消息,如果序号大于broker缓存中的序号则接收否则就丢弃,这样可以保证单个producer对于同一个paration的幂等。
消费者:
- 在数据库种根据主键或者唯一约束进行去重过滤。
- 或者通过redis去重
- 通过producer发送消息带上全局业务唯一key
kafka默认模式是at least once,但是这种模式可能产生重复的消费问题,所以业务逻辑必须做幂等处理。
kafka为什么那么快
- 使用cache pageCache缓存
- 顺序写入:磁盘的顺序写入比随机写入内存更快
- 零拷贝:减少拷贝次数
- 批处理消息:合并小的请求,然后以流的方式进行交互,直到网络上限。
- pull模式:使用拉去模式进行消息的获取消费。
producer发送数据,ack设置的含义
request.required.acks 有三个值分别是0 1 -1
- 0 :producer不会等待broker的ack,延迟最低但是会丢数据
- 1:发一条消息,当leader paration写入成功以后才算写入成功,不过这种方式也有丢数据的可能。
- -1:当ISR列表里面的所有副本写成功之后才返回ack
kafka中的ISR,AR表示什么
ISR(in-sync-replicas),副本同步集合,在这个集合中的follower会和leader副本进行同步,不在的不会进行同步。这是一个动态调整的集合。
AR:表示所有副本,包含OSR和ISR。
ISR是由leader维护,follower从leader同步数据有一定延迟((延迟时间)replica.lag.time.max.ms和(延迟条数,0.10已取消)replica.lag.max.message),超过任意一个阈值就会将follower从ISR中剔除存储到OSR列表(新加入的follower也会放在这里)
分区策略有哪些
分区的作用是提供负载均衡的能力,实现系统的高伸缩性。分区策略是决定将producer发送的消息分配到哪个分区的算法。
- 轮询策略:按照分区顺序分配
- 随机策略:将消息随机放到任意一个分区上
- key-ordering策略:消息中的业务key可以保证同一个key的所有消息都进入同一个分区,因为消息处理在同一个分区是有序的,因此也被叫做按消息key顺序策略。
哪些场景使用了零拷贝
mmap的索引:索引是基于MappedByteBuffer的,也是让用户态和内核态共享的数据缓冲区,此时,数据不需要复制到用户态空间。
日志文件读写用到的TransportLayer:TransportLayer是kafka传输层的接口,它的实现类使用了fileChannel的transferTo方法,该方法底层使用sendfile实现了零拷贝。如果I/O通道使用普通的PLAINTEXT,那么,Kafka就可以利用Zero Copy特性,直接将页缓存中的数据发送到网卡的Buffer中,避免中间的多次拷贝。相反,如果I/O通道启用了SSL,那么,Kafka便无法利用Zero Copy特性了。
sendFile和mmap的区别
sendfile:在Linux2.1版本提供sendfile()函数
1 |
|
Linux内核2.4版本,对于支持网卡SG-DMA技术下,sendfile系统调用的过程发生了变化。可以直接将内核缓冲区的描述符和长度发送到socket缓冲区,不需要拷贝,这样就减少了一次数据拷贝。
sendfile只需要两次上下文切换,2次数据拷贝,拷贝都是用DMA(直接内存访问的方法)来完成不需要CPU参与。
mmap:需要4次上下文切换,3次拷贝。
1 | buf = mmap(file, len); |
mmap调用后,DMA会将数据从磁盘拷贝到内核缓冲区,然后应用进程和操作系统共享这个缓冲区。
进程调用write,操作系统将内核缓冲区的数据拷贝到socket缓冲区,上下文切换到内核态,由CPU来完成搬运数据。
最后再由DMA将socket缓冲区的数据拷贝到网卡缓冲区。
sendFile相当于是原汁原味地读写,直接将硬盘上的文件送给网卡,但这种方式并不一定对所有场景都适用,比如如果你需要从硬盘上读取文件,然后经过一定修改之后再送给网卡的情况下,就不适合用sendFile,,需要使用mmap。
leader选举策略有哪些
- OfflinePartition Leader选举:每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。
- ReassignPartition Leader选举:当你手动运行kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。
- PreferredReplicaPartition Leader选举:当你手动运行kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。
- ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。
这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader。
kafka为什么不支持读写分离?
主要是如果主写从读有两个明显的缺点
1、数据一致性问题:数据从主节点转移到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。例如在某时刻,主节点和从节点中的A=X,之后主节点更新了A=Y,在同变更通知到从节点之前,从节点还是旧数据,这时候如果读从节点就会导致数据不一致问题。
2、延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→网络→从节点内存
这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘
这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
kafka如何保证高可用?
kafka多个broker组成一个集群,每个broker是一个节点,当创建一个topic时,这个topic会被划分成多个paration,每个paration可以存在不同的broker上,每个paration只存放一部分数据。
这就是天然的分布式消息队列,一个topic的数据分散在多台机器上,每个机器只存放一部分数据。
在0.8版本之前是没有高可用机制的,任何一台broker宕机就会提供读写服务。
在0.8以后提供了HA机制,每个paration上的数据会同步到其他机器形成自己的多个replication副本,所有replication会选举一个leader,其他副本是follower,读写都是在leader上完成,数据写完之后leader会同步到follower。这样提供容错性。
拥有了replica副本机制,如果某个broker宕机了,这个broker的paration还在其他机器上存有副本,如果leader宕机就会重新从follower中选取新的leader继续提供读写服务。
写数据的时候,生产者只将数据写入leader节点,leader节点会将数据写入到本地磁盘,然后其他follower会主动从leader拉取最新的数据,follow同步完成之后发送ack到leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。
消费数据的时候,consumer只从leader上取数据,但是只有当一个消息已经被所有follower同步完成后,这个消息才会被消费。
Consumer Rebalance机制
发生的条件
- 消费者组成员数发生变化
- 订阅的topic的分区数发生变化
- topic数量发生变化
rebalance事件通过消费端的心跳线程通知到其他consumer实例。
消费者组的状态流转
状态的含义
重平衡的流程
- 选择组协调器。每个消费者组都会选一个broker作为自己组的组协调器,负责监控消费者组里所有消费者的心跳判断是否宕机,然后开启消费组的Rebalance。
- 消费者向GroupCoordinator发送加入消费者组 JoinGroup请求,然后第一个加入到消费者组的consumer就是leader,leader负责指定分区方案。
- 等待leader consumer分配方案。leader通过给组协调者发送SyncGroup请求,然后组协调者将分区分配方案发送给各个consumer。
Leader Consumer的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
消费者组Rebalance分区分配策略
主要有三种策略,range,round-robin,sticky。客户端通过设置参数paration.assignment.strategy
来设置消费者和订阅topic之间的分区分配策略。
默认情况下是range策略。
- range:按照分区序号排序,n=分区数$/$消费者数,m=分区数%消费者数,前m个consumer每个分配n+1个paration,剩下的分配n个paration。
- round-robin:轮寻分配
- sticky:分配分区尽可能均匀,如果一个consumer挂了,他所负责消费的分区会均分给剩下活跃的consumer
kafka和RocketMQ的对比
单机吞吐量
- Kafka大概17.3w/s,rocketMQ吞吐量在11.6w/s。(Kafka吞吐量高的原因是Producer端将多个小消息合并再序列化后批量发送到Broker,RocketMQ没有这么做的原因是在发送端缓存过多消息GC是个问题)
数据可靠性
RocketMQ支持同步,异步刷盘,异步复制
Kafka使用异步刷盘,同步复制/异步复制
消息投递实时性
- kafka使用短路安迅方式拉去消息,实时性取决于间隔时间,0.8以后版本支持长轮询。
- RocketMQ使用长轮询,消息投递延时通常是在毫秒级别。
消息失败重试
- kafka消费失败不支持重试
- RocketMQ消费失败支持定时重试,每次重试时间间隔顺延
严格的顺序
- Kafka和RocketMQ都支持顺序消息
定时消息
- Kafka不支持定时消息,RocketMQ支持两类定时消息,开源版本支持秒级定时,阿里云版本支持毫秒级别延时
分布式事务消息
- Kafka不支持事务消息,RocketMQ支持分布式事务消息
消息查询
- Kafka不支持消息查询
- RocketMQ支持根据消息标识查询消息,也支持根据消息内容查询消息
消息并行度
- Kafka小的并行度依赖Topic配置的分区数
- RocketMQ并行度在顺序消费时和Kafka一致,伦旭取决于Consumer线程数,如Topic配置10个队列,10台Consumer,每台机器100个线程那么并行度就是1000