
kafka 组件:

在 kafka 中消息以 topic 进行组织,在逻辑上 topic 是一个 queue,通过 topic 实现发布-订阅机制。
实际上每个 topic 由多个 partition 组成,消息发送时按照 key 确定发送至不同的 partition。consumer 消费 topic 的消息是 partition 有序,topic 无序:发送到同一个 partition 的消息会被有序消费,不同 partition 中的消息消费顺序不可预知。
每个 partition 对应磁盘上的一个文件,同时为了保证容错,每个 partition 会有多个副本,副本位于不同的 broker 上。副本分为 leader 和 follower,leader 负责处理读写,follower 则同步 leader 数据:
producer 发送消息给 leader,follower 从 leader 中同步消息,在同一时刻,所有副本中的消息不完全相同,也就是说同步期间,follower 相对于 leader 而言会有一定程度上的滞后,这个滞后程度是可以通过参数来配置的。
那么,我们就可以厘清了它们三者的关系:AR = ISR + OSR。
leader 负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 出现滞后太多或者失效时,leader 将会把它从 ISR 集合中剔除。
当然,如果 OSR 集合中有 follower 同步范围追上了 leader,那么 leader 也会把它从 OSR 集合中转移至 ISR 集合。
一般情况下,当 leader 发送故障或失效时,只有 ISR 集合中的 follower 才有资格被选举为新的 leader,而 OSR 集合中的 follower 则没有这个机会(不过可以修改参数配置来改变)。
kafka 动态维护了一个同步状态的副本的集合(a set of In-SyncReplicas),简称 ISR,在这个集合中的结点都是和Leader保持高度一致的,任何一条消息只有被这个集合中的每个结点读取并追加到日志中,才会向外部通知“这个消息已经被提交”。
kafka 通过配置 producer.type 来确定是 producer 向 broker 发送消息是异步还是同步,默认是同步:
BlackingQueue 队列中然后就返回了。producer 再开启一个线程 ProducerSendTread 不断从队列中取出消息,通过同步发送消息的接口将消息发送给 broker。producer 的这种在内存缓存消息,当累计达到阀值时批量发送请求,小数据I/O太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。但是如果在达到阀值前,producer不可用了,缓存的数据将会丢失。
producer 向 broker 发送消息时可以通过配置 acks 属性来确认消息是否成功投递到了 broker:
0:表示不进行消息接收是否成功的确认。延迟最低,但持久性可靠性差。不和 kafka 进行消息接收确认,可能会因为网络异常,缓冲区满的问题,导致消息丢失1:默认设置,表示当 leader 接收成功时的确认。只有 leader 同步成功而 follower 尚未完成同步,如果 leader 挂了,就会造成数据丢失。此机制提供了较好的延迟和持久性的均衡-1:表示 leader 和 follower 都接收成功的确认。此机制持久性可靠性最好,但延时性最差。在消息写入 leader 后,follower 同步 leader 的消息,以及 consumer 消费 leader 中的消息,producer 向 leader 继续写入消息,这一系列的机制时通过 hw 和 leo 实现的:
HW(High Watermark)。高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位 offset 之前的消息
LEO(Log End Offset)。标识当前日志文件中下一条待写入的消息的 offset。在 ISR 集合中的每个副本都会维护自身的 LEO,且HW==LEO。

基本流程:
拦截器、序列化器、分区器,然后将处理好的消息发送到消息累加器中消息累加器每个分区会对应一个队列,在收到消息后,将消息放到队列中ProducerBatch 批量的进行消息发送到 Sender 线程处理(这里为了提高发送效率,减少带宽),ProducerBatch 中就是我们需要发送的消息,其中消息累加器中可以使用 Buffer.memory 配置,默认为 32MBSelector,Selector 发送消息到 kafka 集群consumer 采用 pull 模式从 broker 批量拉取消息。
pull 模式可以让 consumer 根据自身消息消费能力决定拉取速率,防止消息拉取速率超出 consumer 处理能力。同时 consumer 也可以自主决定是否采用批量 pull。
pull 模式的缺点是 consumer 不知道 topic 是否有新消息到达时需要不断地轮询 broker,直到新的消息到达。为了避免这点,kafka 有个参数可以让 consumer 阻塞直到新消息到达(当然也可以阻塞直到新消息数量达到阈值这样就可以批量 pull)。
parititon 内消息有序,只需要确保消息发送至同一个 partition 即可:
ProducerRecord丢失原因:
解决办法:
retries = Integer.MAX_VALUE。重试次数,需大于 0max.in.flight.requests.per.connection = 1。为保证消息发送重试时依然有序,需设置此参数retry.backoff.ms。重试间隔,默认为 100ms#send() 方法的 future 对象设置回调,当发生异常时进行重试unclean.leader.election.enable。表示哪些 follower 可以选举为 leader。设置为 false,表示落后太多的 follower 不可选举为 leaderreplication.factor。分区副本的个数,建议设置为 >=3 个min.insync.replicas。该参数表示消息至少要被写入成功到 ISR 多少个副本才算已提交。推荐设置成:replication.factor =min.insync.replicas +1, 最大限度保证系统可用性enable.auto.commit = false重复原因:
解决办法:
消息的投递语义主要分为三种:
kafka 主要实现流计算场景下的 exactly once 能力,数据必须是从 kafka 读取,计算结果在写入 kafka 中。如果流计算中的状态存储依赖外部系统,则无法在系统出现故障崩溃时保证 exactly once。比如消费者消费一批数据后,在崩溃前没有提交消费位点,重启后可能会消费到重复的消息。flink 的 exactly once 语义下是将 kafka 消费位点保存到 checkpoint 或 savepoint 中,当flink 重启后读取 checkpoint 或 savepoint 中的 kafka 消费位点重新消费,则不会出现重复消费,所以可能重复消费的原因是任务没有把消费位点提交到 kafka 中,也没有自己额外存下来,做不到从崩溃前的位点消费。
kafka 通过 幂等性(Idempotence)和事务(Transaction)实现 exactly once。
幂等性是指可以安全地进行重试,而不会对系统造成破坏。kafka 中 producer 默认不是幂等性的,可以通过参数开启:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)enable.idempotence 被设置成 true 后,producer 自动升级成幂等性 producer,其他所有的代码逻辑都不需要改变,kafka 自动帮你做消息的重复去重,kafka 通过空间去换时间的思路在 broker 多保存一些字段记录消息信息进行去重,当 producer 发送相同字段值的消息后,broker 可以识别消息重复发送,会丢弃掉这些重复消息。
kafka 在底层设计架构中引入了 producerID 和 SequenceNumber。
producer 需要做的只有两件事:
broker 收到消息后会以 producerID 为单位存储 SequenceNumber,也就是说即时 Producer 重复发送了, Broker 端也会将其过滤掉。
实现比较简单,同样的限制也比较大:
首先,它只能保证单分区上的幂等性
。即一个幂等性 producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
其次,它只能实现单会话上的幂等性
。不能实现跨会话的幂等性。当你重启 producer 进程之后,这种幂等性保证就丧失了。
kafka 自 0.11 版本开始也提供了对事务的支持,支持 read committed 隔离级别。它能保证 producer 将多条消息原子性地写入到目标分区(可写入多个分区),同时也能保证 consumer 只能看到事务成功提交的消息。事务型 producer 重启后 kafka 依然可以保证发送消息的精确一次处理。
设置事务型 producer:
enable.idempotence = true。transactional. id。最好为其设置一个有意义的名字。同时 producer 发送事务消息代码也与普通消息不同,需要加入事务处理:
consumer 读取事务消息时也需要做一些配置,设置 isolation.level 参数:
kafka 中 topic 中的 partition 分配给 consumer group 中的 consumer,需确定 group 中的 consumer 消费哪几个 partition。当 consumer group 发生变动时需重新进行分配,这个过程就叫做 rebalance。
rebalance 影响:rebalance 期间 consumer 不消费消息,会造成应用消费 kafka 消息 tps 抖动,数据延迟以及 kafka topic 消息积压。
rebalance 触发原因:
rebalance 目的:
rebalance 策略:
rebalance 过程:
GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。然后 GroupCoordinator 从一个consumer group 中选择一个加入 group 的 consumer 作为 leader (消费组协调器),把 consumer group 情况发送给这个 leader,接着这个 leader 会负责制定分区方案参考链接:
RocketMQ 在 4.x 版本只支持18个延迟级别,在 5.x 版本支持任意延迟等级的延迟队列,能够满足业务高吞吐、低延迟的要求。RocketMQ 4.x 的延迟队列主要在 client 完成,5.x 版本在 server 完成,基于 Kafka 实现延迟队列,主要参考 RocketMQ 4.x 的实现。
在 RocketMQ 中定义了 18 个延迟级别,分别是 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间,如 setDelayTimeLevel(level = 3) 也就是延迟 10s,因为 level = 3 对应 10s 延迟。
当用户定义一个延迟 topic 如 topic_order_delay,当向 topic_order_delay 发送消息时,broker 并不会将消息直接写入 topic_order_delay 而是写入一个内部队列,如 1s 写入 1s 对应的队列,5s 写入 5s 对应的队列等等。也即 RockmeMQ 整个集群的所有延迟 topic 的延迟消息都会放入对应级别的一个内部队列,放入同一个内部队列的好处是:
1s 或 5s。每个内部队列都有一个定时任务按照固定频率扫描到期的消息,当检测到期后重新发送入对应的 topic。

基于 kafka 的实现也类似,首先在 kafka 中定义一系列对应的队列,每个队列对应一个延迟级别,在为每个队列设置一个消费者,通过 pull 方式拉取到期的消息,然后发送到真实的 topic。应用消费真实的 topic 即可。
在延迟调度服务中,需要将延迟级别对应的 topic 中的到期消息取出并投递到对应的业务 topic,延迟调度服务需要准确和低延迟地完成内部延迟队列和业务 topic 的投递。
延迟调度服务设计为 kafka topic 的消费者,消息处理逻辑将收到的消息检测是否到期,如果到期则发送入业务 topic,如果没有到期则 sleep 一段时间后重新消费,继续进行检测:
通过 sleep 机制可以确保延迟队列消息消费的及时性。
因为 kafka topic 下有多个 partition,如果 consumer group 中的某个 consumer 负责其中几个 partition 的消费,如果 consumer sleep,则几个 partition 的消费都会停止。不同 partition 中的消息有快有慢,直接按照收到的消息的到期时间设置 sleep 时长会导致比较快的 partition 的消息无法及时投递,造成投递延迟。解决思路是将延迟队列 topic 的 partition 数目设置 1,这样就确保延迟队列 topic 的消息是全局有序的,第一条消息就是最早到期的。
但是延迟队列 topic 的 partition 只有 1 个的话,消息吞吐量可能会有上限,不支持水平扩展。处理方式是将对应级别的延迟队列 topic 设置为多个,比如 5s 延迟级别可以设置 10 个 topic(这 10 个延迟队列 topic 的 partition 数量都是 1),每个 topic 都有各自的 consumer 来进行上述到期发送逻辑。sdk 发送延迟消息时只需要从 10 个 topic 中选择一个投递即可。这样就可以增加吞吐量。
同时延迟级别也可以按需设置,可以设置很多的延迟级别出来。