Kafka 教程 | 基础架构
基本架构
了解Kafka之前,您必须了解Topic,Brocker,Producer和 Consumer 等主要术语。
一个 Topic 对应一个消息队列。
Producer 主要负责将数据发送到 kafka 对应的 topic 中去。Consumer 主要负责从 Topic 中去接收消息。
Partition 概念
我们知道 Kafka 的目标是大数据,如果将消息存在一个“中心”队列中,势必缺少可伸缩性。无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽机器的性能或存储。
因此,Kafka 在概念上将一个 Topic 分成了多个 Partition,写入 Topic 的消息会被(平均)分配到其中一个 Partition。Partition 中会为消息保存一个Partition内唯一的 ID ,一般称为偏移量(offset) 。这样当性能/存储不足时 Kafka 就可以通过增加 Partition 实现横向扩展。
为了防止数据损坏, 每个 Partition 可以设置多个 Replication,每个Replication将分散在不同的Broker上维护。
每一个Partition有且只有一个Replica可以作为Leader,除了 Leader 以外的所有 Replica 均为follower
Leader 主要负责处理所有Producer、Consumer的请求, follower 不处理任何来自客户端的请求;只通过Fetch Request拉取leader replica的数据进行同步。
多个Producer和Consumer可以同时发布和检索消息。
Broker
Kafka 集群通常由多个Broker组成以保持负载平衡。 Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka Broker实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。 Kafka leader 选举可以由ZooKeeper完成。
Zookeeper
Apache Kafka 的一个关键依赖是 Apache Zookeeper,它是一个分布式配置和同步服务。 Zookeeper 是Kafka Brocker 和 Customer 之间的协调接口。 Kafka 服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于 Topic,Brocker,Custermer 偏移(队列读取器)等的信息。 由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态,一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。
Producer
多个Producer可以同时将数据Push给Broker。 Producer 会将每一条消息发送到指定Topic 的指定 partition 中去。
关于 partition 的获取分为三种情况 :
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
生产者支持同步异步的消息写入方式,提供了快速写入的能力。
Consumers
Consumer 采用 Pull 的方式从集群中拉取消息
因为 Kafka Broker 是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。
对于消费者来说,当消费者消费 Topic 消息时, Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
在Kafka 中引入了一个叫 Consumer Group(消费组) 的概念, 每一个消费组都是按照 发布-订阅消息模型 来消费消息。 而在同一个消费组的内部,每一个消费者是按照消息队列模型来消费消息的。
- 队列模式(也叫点对点模式)。多个消费者共同消费一个队列,每条消息只发送给一个消费者。
- 发布/订阅模式。多个消费者订阅主题,每个消息会发布给所有的消费者。
Kafka 读写原理与存储结构
那么,对于写入一条消息,它的基本流程是怎样的呢?
下面,我们讲沿着:Producer发布消息 -> 消息存储格式 -> Consumer消费消息这三个步骤展开。
在消息发布之前首先要创建一个 topic, 创建时用户可以指定 topic 名称
, 分区数
, 副本数
等。
Controller 控制器
Kafka集群中的其中一个Broker会被选举为 Controller ,主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。
而创建Partition,就需要Controller的协调。 创建topic的过程,客户端的操作很简单,它甚至不需要有Kafka集群的存在,因为kafka集群只负责存数据,至于它存的是哪个topic的数据,估计它自己都不知道,这些对应信息都存储在Zookeeper中。
因此客户端创建一个topic时,只需要和zookeeper通信,在对应的znode节点新建一个子节点即可。 但是有了topiic,还需要分配topic的partition,还要选出partition的leader,以及ISR等,而这些工作,都是由controller来完成。
1
2
3
4
5
6
7
1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. controller 从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
从上术的流程可以看出,它利用了zookeeper的watcher机制,动态监听topic节点的改变,从而进行分配partition,选择leader,设置ISR并反写到zookeeper中。
Producer发布消息
我们来回顾一下,要发送一条消息,首先一定要指定一个topic,表明他是那一种类的消息.
然后一条消息要发送到一个broker上的某一个partition中,由于需要支持HA,所以对这条消息进行持久化的时候,肯定要同时写入多个partition中,成功之后在回ack给client。
每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
写入消息分为下面几个步骤:
- 确定partition
这是第一步,由于一个topic对应这多个partitions,首先要确定和哪个partition进行通信并存储,发送到broker时,分区算法选择将其存储到哪一个 partition. 他遵循以下规则:
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
- patition 和 key 都未指定,使用轮询选出一个 patition。
- 找到partition的通信地址
由于系统实现了HA,因此,一个partition有多个replication,Producer 先从 Zookeeper 的 /brokers/…/state
节点找到该 partition 的 leader, 之后就只与该leader通信。
注意,这里的leader是指partition的leader,而事实上,kafka集群不像zookeeper一样,有leader管理,发起提议等,它是没有leader的。
- 数据传输
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
这里我们一定想到了各种意外的情况,比如说网络中断,某个partition宕机了等,因此,保证消息的传输有几种方式:
- At most once 消息可能会丢,但绝不会重复传输
- At least one 消息绝不会丢,但可能会重复传输
- Exactly once 每条消息肯定会被传输一次且仅传输一次
消息存储格式
一个 topic 有一个 partition ?和 broker 数一样?
这配置文件里会进行设置: server.properties 中的 num.partitions=3
一条消息的格式如下:
1
2
3
4
消息长度: 4 bytes (value: 1+4+n)
版本号: 1 byte
CRC校验码: 4 bytes
具体的消息: n bytes
一个topic的一个partition,物理上对应着一个文件夹,里面存储这所有该partition的消息。
每条消息都可以对应一个64位的offest,offset标注了这条消息在发送到这个分区的消息流中的起始位置,每个日志文件的名称都是这个文件第一条日志的offset.
所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名字的差就是一个数字S,S差不多就是配置文件中指定的日志文件的最大容量。
文件夹下有一个索引文件,和若干个.kafka文件。
Consumer订阅消息
消费者要比生产者复杂得多。由于kafka不会为消费者维护offset,因此,要么自己(Consumer)维护offset,低级API,要么交给Zookeeper维护, 高级API。
此外还有 Consumer Group 的概念,一个 Consumer Group 有多个Consumer,一条消息只会被其中的一个Consumer消费。
一般的,Consumer的数量不要大于Partition的数量,这是因为一个Partition只能被一个Consumer消费,Consumer多了就会有的轮空。
- Low Level API:
这种API维持了单一个broker的连接,它是没有状态的,因此每次去消费数据,都有带上offset,因此它可以通过重新设置offset多次消费同一份数据。 这种操作方式更灵活,但是也更繁琐。一般的,即使使用Low Level API,也会把offset手动同步到zookeeper上,当我们处理失败了,就不去更新offset。
此外,选择Partition的Leader,处理Leader的故障转移等也需要客户端去实现。
- High Level API:
更自动化的一种操作。consumer的offset会自动同步到zookeeper上,也会自动的去选取leader,leader失效时重试等。
但是它也有缺点,就是它保证了每次都取的都是下一条消息,不能够回搠去读,因此,无论处理这条消息是否成功,都不能重复读了,这显然有些不合理。
Consumer与Partition的关系
如果consumer比partition多,会阻塞,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化