Loading...
墨滴

zhanyl

2021/12/09  阅读:34  主题:蓝莹

KAFKA容错及高可用原理一

Kakfa中topic的基本组成

在kafka中以分区作为复制单元。每个topic由一个或多个分区组成,每个分区都包含一个leader副本及0个或多个follower副本。当你在创建topic时,需要指定分区数及复制因子。通常情况下一个复制因子是3的topic表明它有一个leader副本及两个follower副本。不论是一个leader副本还是一个follower副本都会被算作一个数据副本。

1个topic的4个分区分布在3个broker上
1个topic的4个分区分布在3个broker上

一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。一个broker可以包含一个topic的部分或者全部分区数据。

所有的读写操作都发生在leader副本所存在的broker上。Follower会周期性地向leader发出fetch请求以获取最新的信息。消费者不会从follower上消费数据,follower只为数据冗余及故障切换而存在。

Learder与follower
Learder与follower

分区的故障切换

当一个broker失效时,它上面很有可能包含了很多分区的leader副本。如果丢失了leader副本,将导致剩余节点上的follower副本被升级为leader副本。但事实上这种情况取决于follower是否与leader保持同步状态,如果不同步,就要看是否允许切换到不同步的分区上,这是一种unclean的切换。我们先来看一个简单的情况。

当broker 3失效时,对于当前topic的2号分区原leader也随之失效,同时将在broker 2上提升原follower副本而成为新的leader副本

Broker 3失效,broker 2上原分区2的follower被升级为leader
Broker 3失效,broker 2上原分区2的follower被升级为leader

之后broker 1也失效,1号分区也将丧失其leader副本,转而将leader切换到broker 2

Broker 1也失效,所有leader副本都切换到broker 2上,topic将不再有冗余数据副本
Broker 1也失效,所有leader副本都切换到broker 2上,topic将不再有冗余数据副本

然后broker 1恢复,重新上线,对于每个数据分区它会生成一个follower副本以增强其数据冗余。但是所有数据分区的leader副本仍然集中在broker 2上

Broker 1恢复后,所有leader副本仍然集中在broker 2上
Broker 1恢复后,所有leader副本仍然集中在broker 2上

紧接着broker 3也恢复,重新上线,但是所有的leader副本都仍然集中在broker 2上

在broker 1、3恢复后,leader副本出现了不均衡的情况
在broker 1、3恢复后,leader副本出现了不均衡的情况

对于这种情况kafka提供了一个比较好的功能来解决这种问题。kafka有个首选复制leader的概念。当kafka创建topic的分区时,它会尽量将每个分区的第一个leader副本均匀地分布在每个broker节点上,同时记录这些leader副本为首选复制leader。随着时间的推移,当发生服务器重启、服务器失效、网络分区等情况时,leader可能会在不同于首选复制leader的节点上启停,就像我们前面所举的例子一样。 为了解决这个问题,kafka提供了两个选项:

  • 在broker的配置中设置auto.leader.rebalance.enable=true。这样一个后台进程就会在leader.imbalance.check.interval.seconds所指定的时间间隔进行检查,如果leader的分布数量超过leader.imbalance.per.broker.percentage的值就会触发重新分布leader,控制节点会根据首选复制leader来重新分布leader,最终形成均匀地分布状态
  • 管理员也可以通过执行kafka-preferred-replica-election.sh脚本来手工执行,以达到我们期望的理想状态
leader副本再均衡
leader副本再均衡

这是简单情况下的leader副本故障切换。以此为开端,我们即将面临更复杂的场景,通过这种方式我们逐步引入更多的概念。接下来我们讲讲ISR(In-Sync Replicas)。

In-Sync Replicas(ISR)

ISR是一组保持在同步状态下的分区副本的集合。在ISR里面始终会有一个leader副本(在有可用数据副本的情况下)以及0个或多个follower副本。如果一个follower在设定的replica.lag.time.max.ms时间周期内时刻保持与leader的数据更新,则认为该follower是同步的,它将被leader保留在ISR列表中,反之将被从ISR中剔除。在acks=all写入数据时,只有ISR中所有的副本都返回ACK,这时leader才能commit该数据并认为该数据写入成功,然后返回生产者写入成功标记。处于同步状态时的follower副本意味着是leader的一个准确数据副本。

在下列场景中follower将会被移除ISR列表:

  • follower在replica.fetch.wait.max.ms指定的时间范围内不能向leader发出fetch操作请求(假定follower已死)
  • 最新数据的更新时间已经比replica.lag.time.max.ms指定的时间还要落后(被认为是一个慢follower)

follower应该在replica.fetch.wait.max.ms指定的时间周期内定时向leader发出fetch操作请求,其缺省值为500ms。如果follower没有发出任何fetch操作请求或者与leader的最新数据已经落后replica.lag.time.max.ms所指定的时间,那么follower会被leader移除ISR。replica.fetch.wait.max.ms的设置需要小于replica.lag.time.max.ms。replica.lag.time.max.ms在最新版本的缺省值为30000ms (30 seconds)。

为了更好地说明ISR的作用,我们从生产者的确认信息来看一些故障切换场景。在broker发出写入成功确认信息前,生产者端可以进行如下的设置:

  • acks=0,broker不需要返回任何确认信息(生产者只要发送完数据即认为已经写入成功)
  • acks=1,当leader副本在本地日志中已经写入数据,broker即可向生产者返回写入成功的确认信息(早期版本的缺省值)
  • acks=all,当所有在ISR列表中的副本都在本地日志中完成数据写入,broker才向生产者返回写入成功的信息确认(最新版本的缺省值)

在kafka的术语中,一旦ISR持久化消息,消息就会被提交。虽然Acks=all会导致数据处理延迟,但对于生产者来说却是最安全的一个选项。我们将通过两个故障切换的案例来举例说明在生产端对acks的设置是如何与ISR相互影响的。

Acks=1与ISR

在本案例中,我们可以看到在持久化数据的时候当leader不再等待follower的情况下,leader发生故障切换极有可能导致数据丢失。是否允许在故障切换中将新leader切换到数据不同步的follower可以通过配置参数unclean.leader.election.enable来控制,其缺省值是false,缺省不允许将leader切换到不同步的follower。

在本次测试中我们在生产端设置acks=1,这也是大多数生产端的缺省设置。我们的数据分区分布在所有的3个broker上。Broker 3已经出现数据延迟,它最新的数据是leader 8秒以前的,具体落后7456条数据。Borker 1相对来说要快的多,最新数据与leader只相差1秒,数据落后123条。由于我们设置acks=1,因此生产端在发送数据后,数据在leader副本所在的broker 2上写入成功生产端即可接收到ack,完全无需等待任何follower的操作结果,这是非常迅速的。

ISR中有三个数据副本
ISR中有三个数据副本

这个时候broker 2失效,生产端将出现连接错误。与此同时在丢失123条数据的情况下新的leader切换到broker 1。在发生故障切换时,虽然broker 1在ISR列表中,但由于数据不是完全同步,仍然会出现数据丢失。

故障切换中发生数据丢失
故障切换中发生数据丢失

由于在生产端的bootstrap.servers中配置了多个broker地址信息,因此生产端能够很容易地知道新的leader在哪个broker上。它将创建新的连接,并继续向新的leader发送数据。

短暂的中断后数据得以继续发送
短暂的中断后数据得以继续发送

Broker 3的延迟进一步加大。时间延迟达到14秒,数据延迟达到15286。它虽然仍在向leader发出fetch操作的请求,却始终不能跟上leader的脚步。这可能是由于两个broker之间的网络或者存储出现故障所导致。最终broker 3从ISR列表中被leader移除。现在在ISR中就只有一个数据副本存在,就是leader本身!生产端仍然继续发送数据同时接收确认。

Broker 3从ISR中被移除
Broker 3从ISR中被移除

这时Broker 1出现故障,在丢失15286条数据后新的leader被切换到broker 3!产生这样的结果是因为我们设置了unclean.leader.election.enable=true,会导致新leader被允许切换到并不在ISR列表中的broker节点,这意味着数据严重不同步的broker节点有可能成为新的leader,导致大量的数据丢失。如果unclean.leader.election.enable=false,那么这样的故障切换就不会发生,同时客户端所发出的所有读写请求都会被拒绝。在这种情况下我们只能把broker 1恢复起来才能继续工作。

Broker 1失效,发生故障切换将导致大量数据丢失
Broker 1失效,发生故障切换将导致大量数据丢失

生产端与最后一个broker建立连接,同时这个broker成为分区0当前的leader。生产端将数据发送到broker 3.

短暂的中断后数据可以继续发送到分区0
短暂的中断后数据可以继续发送到分区0

通过这个案例,我们可以看到,除了短暂的切换外,生产端可以发现新的leader并建立新的连接,数据仍然可以被正常发送。这样配置在获得数据可用性的同时,付出的代价是数据的一致性,数据的安全。kafka能够继续执行写入操作,但是丢失了大量已经确认过的数据。

Acks=all与ISR

让我们再重演一遍前面的场景,这一次我们将设置acks=all。Broker 3将保持与leader平均4 秒的滞后。生产者在设置acks=all后向kafka发送数据,但这次收到确认信息将会慢很多。leader为了要持久化数据必须等待ISR中所有的数据副本都返回确认信息。由于broker 3平均滞后4秒,因此也导致leader向生产端返回ack比无需等待follower返回多滞后4秒。

ISR中有三个数据副本,其中一个数据副本是慢节点,这将对每次写入操作都产生处理延迟
ISR中有三个数据副本,其中一个数据副本是慢节点,这将对每次写入操作都产生处理延迟

经过4秒的延迟以后,broker 2将确认信息返回给了生产者。所有的数据副本在这一时刻是一致的。

所有的数据都已经持久化,同时确认信息已返回
所有的数据都已经持久化,同时确认信息已返回

Broker 3的持续落后将最终导致其从ISR中被移除。由于慢节点的消失使处理延迟也得到缩短。Broker 2现在处理数据只需要等待broker 1 的响应即可,而broker 1平均只滞后250ms。因此也导致leader向生产端返回ack比无需等待follower返回多滞后250ms。

Broker 3从ISR中被移除
Broker 3从ISR中被移除

这时broker 2出现故障,导致leader切换到broker 1,这一过程没有出现数据丢失

Broker 2出现故障
Broker 2出现故障

生产端发现新的leader,同时将数据发送给它。数据操作延迟仍然较低,因为这时ISR中只有一个数据副本,即leader本身。因此即使我们设置acks=all,但当ISR中只有一个数据副本的时候我们仍然不能保证数据的冗余。

切换到broker 1,没有发生数据丢失
切换到broker 1,没有发生数据丢失

这时broker 1出现故障,在丢失14238条数据的情况下,新的leader切换到broker 3

Broker 1失效,在丢失大量数据的情况下发生切换
Broker 1失效,在丢失大量数据的情况下发生切换

当然我们可以选择不将unclean.leader.election.enable设置为true来避免这种切换。其缺省值也是false。由此可见我们在设置acks=all,unclean.leader.election.enable=true确实可以在保证数据可用性的同时提升一定的数据安全性。但是仍然不能避免数据的丢失。

但是如果我们需要继续提升数据的安全性呢?我们首先需要确保设置unclean.leader.election.enable = false,但这并不能完全避免丢失数据。如果leader节点出现了不可恢复的错误,则节点上的数据仍然将丢失,同时在恢复的过程中节点还不可用。

一个比较好的办法是在确保所有数据都有一定的同步冗余副本且在不满足这个条件的时候拒绝再写入。当数据存储的冗余性不能得到保证时,将拒绝新的数据写入,这样至少从broker层面来说,要导致数据丢失的情况出现至少需要两个及以上的故障点同时发生。

Acks=all, min.insync.replicas与ISR

通过在topic的层面设置min.insync.replicas,我们可以将数据的安全性提升一个档次。我们将前面的场景再过一遍,但是这次我们将min.insync.replicas设置为2。ISR中仅有broker 1和broker 2。

ISR中有2个副本
ISR中有2个副本

Broker 2出现故障,触发leader切换,新的leader切换到broker 1,期间没有发生数据丢失。这时ISR中将只包含一个数据副本broker 1。这已经低于确保数据冗余性的最小同步数据副本数(min.insync.replicas=2),因此在新数据写入时生产端将产生NotEnoughReplicas或NotEnoughReplicasAfterAppend的异常。

ISR中的副本数低于min.insync.replicas
ISR中的副本数低于min.insync.replicas

选择这种配置是为了达到数据一致性高于数据可用性。我们确保在收到写入成功返回前,至少有2个以上的数据副本是同时完成写入了的。这提高了生产端写入数据的可靠性。在本例中,如果要发生数据丢失就需要2个及以上数据副本同时出现故障,但是这种可能性是极低的。如果你还想继续提高数据可靠性,你可以将数据复制因子设置到5,同时将min.insync.replicas设置为3。这样只有当3个broker节点同时出现故障时才有可能出现数据丢失。当然这样做的代价是通过增加每次写入延迟来得到的。

当数据可用性需求高于数据安全性

但有的时候数据的可用性需求会高于数据的安全性,我们需要尽量避免服务不可用,而不是十分在意是否有数据的不一致或丢失。您可能会思考:

  • 我的数据发送程序是否可以简单地接收到kafka的处理错误,根据这个错误我可以进行重试操作?
  • 我发送的数据是否可以在本地进行持久化或者存储在数据库中,同时用于后续的重试操作?

如果答案是否定的,那么进行可用性的优化可能比提高数据安全性更好。因为无论如何最终都会丢失数据,但是与拒绝写入数据相比,通过优化可用性可以降低丢失的数据量。因此针对您的应用场景需要找到一个平衡点。

zhanyl

2021/12/09  阅读:34  主题:蓝莹

作者介绍

zhanyl