1 Kafka介绍1

1.1 定义

01.发展
    Kafka 是基于发布与订阅的消息系统。
    它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。
    Kafka 是一个分布式的,可分区的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

02.说明
    在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能、低延迟的不停流转。
    传统的企业消息系统并不是非常适合大规模的数据处理。为了同时搞定在线应用(消息)和离线应用(数据文件、日志),Kafka 就出现了。

03.设计
    在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。

1.2 主要特点

01.优点
    应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
    异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
    流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
    消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
    日志处理 - 解决大量日志传输。
    ---------------------------------------------------------------------------------------------------------
    1.同时为发布和订阅提供高吞吐量。据了解,Kafka 每秒可以生产约 25 万消息(50MB),每秒处理 55 万消息(110MB)。
    2.可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL ,以及实时应用程序。通过将数据持久化到硬盘,以及replication ,可以防止数据丢失。
    3.分布式系统,易于向外扩展。所有的 Producer、Broker 和Consumer 都会有多个,均为分布式的。并且,无需停机即可扩展机器。
    4.消息被处理的状态是在 Consumer 端维护,而不是由 Broker 端维护。当失败时,能自动平衡。
    5.支持 online 和 offline 的场景。

02.缺点
    由于是批量发送,数据并非真正的实时;
    对于mqtt协议不支持;
    不支持物联网传感数据直接接入;
    仅支持统一分区内消息有序,无法实现全局消息有序;
    监控不完善,需要安装插件;
    依赖zookeeper进行元数据管理;

1.3 重点概念

01.概念
    Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
    Partition:Topic 物理上的分组(分区),一个 Topic 可以分为多个 Partition 。每个 Partition 都是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)。
    Message:消息,是通信的基本单位,每个 Producer 可以向一个Topic(主题)发布一些消息。
    Producers:消息和数据生产者,向 Kafka 的一个 Topic 发布消息的过程,叫做 producers 。
    Consumers:消息和数据消费者,订阅 Topic ,并处理其发布的消息的过程,叫做 consumers 。
    Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker 。
    ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 Topic、Partition 等元信息等。

02.单纯角色来说,Kafka 和 RocketMQ 是基本一致的(RocketMQ 从 Kafka 演化而来)。
   比较明显的差异是:
    1、Kafka 使用 Zookeeper 作为命名服务;RocketMQ 自己实现了一个轻量级的 Namesrv 。
    2、Kafka Broker 的每个分区都有一个首领分区;RocketMQ 每个分区的“首领”分区,都在 Broker Master 节点上。
    3、Kafka Consumer 使用 poll 的方式拉取消息;RocketMQ Consumer 提供 poll 的方式的同时,封装了一个 push 的方式。

1.4 使用场景

00.汇总
    a.日志收集
        一个公司可以用Kafka可以收集各种服务的log
        通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等
    b.消息系统
        解耦和生产者和消费者、缓存消息等。
    c.用户活动跟踪
        Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,
        这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘
    d.运营指标
        Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
    e.流式处理
        比如spark、streaming、Flink

1.5 数据均衡

01.介绍
    在线上程序运行的时候,有的时候因为上面副本的损坏,从而系统会自动选举出来一个新的leader并且分配到不同的节点上,
    有的时候这个leader的节点分布的并不是特别均匀,这个时候就需要进行均衡一下,使得每个broker的节点压力均衡

02.以下三个参数进行控制
    参数                                    解释                                               
    auto.leader.rebalance.enable            系统每隔300s会自动检查系统的leader分布是否均匀,如果不均匀会自动进行leader的切换 
    leader.imbalance.per.broker.percentage  broker上的leader比例超过10%认为不均衡                       
    leader.imbalance.check.interval.seconds 检查间隔300s默认值                                      
    ---------------------------------------------------------------------------------------------------------
    auto.leader.rebalance.enable 这个开关开启会自动选举或者切换leader节点,并且分布在不同的节点上,
    但是有的时候这个开关开启会影响系统性能,因为线上环境切换leader是比较繁琐的

03.说明
    a.一般我们会关闭这个开关并且选择手动切换均衡
        kafka-leader-election.sh --bootstrap-server nn1:9092 --topic topic_a --partition 1 --election-type preferred
    b.优先在ISR中选举出来新的leader进行负载,并且我们也可以自己进行副本的位置进行设定
        # 首先创建一个topic.json 输入如下内容
        {"topics":[{"topic":"topic_a"}],"version":1}
        # 整体代码命令如下
        kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --broker-list 0,1,2,3,4 --topics-to-move-json-file topic.json --generate
    c.使用这个均衡优化命令生成优化计划
        {"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[3,4,0],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[4,0,1],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
        修改其中副本的位置,并且设定ISR的优先顺序

1.6 选举机制?先到先得

00.总结
    a.旧版
        zookeeper可以实现controller的选举,并且记录topic和partition的元数据信息,帮助多个broker同步数据信息
    b.新版
        在新版本中的kafka模式中可以这个管理和选举可以用kafka自己完成,而不再依赖zookeeper
        ---------------------------------------------------------------------------------------------------------
        搭建kafka集群:
        首先要设定process.roles,可以人为指定每个人的角色,如果指定broker那么它永远是broker
        如果指定controller那么它一定是controller,但是这样的话其他人就没有办法作为主节点了
        所以一般选择broker,controller可以自动进行适配和选举
        一般一个集群中会设定大部分是broker,少量的几个是controller+broker方便选举
        设定哪几个机器作为协调的机器出现,帮助kafka自身选举
        还需要设定对外端口和每个节点的编号
        最后还要设定数据存储的位置
        log.dirs=/data/kafka-logs

01.controller选举
    首先第一个选举就是借助于zookeeper的controller的选举
    第一个就是controller的选举,这个选举是借助于zookeeper的独享锁实现的,先启动的broker会在zookeeper的/contoller节点
    上面增加一个broker信息,谁创建成功了谁就是主节点,其他的broker会启动watch监视器进行监听其中的数据变化,
    如果宕机了其他的节点会抢占这个节点选举为controller节点

02.partition leader选举
    每个topic都存在多个分区,每个分区又存在多个副本,其中有一个副本是主节点其他都是从节点,负责和主分区同步数据,
    并且生产者和消费者都是通过主节点进行操作kafka数据的
    ---------------------------------------------------------------------------------------------------------
    那么他们的选举是通过什么进行选择的呢?
    controller节点会按照分区的注册顺序,优先选择前面的节点进行选择
    ---------------------------------------------------------------------------------------------------------
    如果主节点宕机后是如何选择的呢?
    首先我们使用命令来查看其中一个topic的详细信息
    kafka-topics.sh --bootstrap-server nn1:9092 --topic topic_a --describe
    ---------------------------------------------------------------------------------------------------------
    其中我们会看到最后一列内容是ISR,这个叫做动态副本集,它的作用使用强大,在kafka中存储数据的时候首先存储数据到
    主分区中,然后主分区中的数据会同步到不同的副本分区中,做数据的同步备份,尤其是在producer端设定ack=-1的时候,
    要在所有副本都同步完毕消息以后才会返回ack,producer才会发送下一条数据过来,但是如果因为其中一个副本的网络卡顿
    或者是自己宕机那么会出现一直同步不成功的情况,从而producer不能继续发送数据,所以kafka动态维护了一个副本集,
    这个副本集中都是可以正常同步数据的,也就是说他们都是正常的,如果不正常的不能保持和主节点同步的副本就会从这个
    里面删除掉
    ---------------------------------------------------------------------------------------------------------
    为什么说这个ISR呢?
    因为一旦主分区宕机了,那么broker【controller】节点就会检测到,那么就会在ISR中按照顺序选择一个好的节点成为主分区。
    这就是主分区宕机后的选举实现,但是选择的节点一般都是数据比较新的,会选择落后太多的副本
    但是有时候ISR中的副本如果都为空就没有办法选择新的leader分区了,这个时候为了集群的稳定性,可以设定
    # 将这个开关打开,默认是false关闭的,它允许实现ISR以外的节点成为主节点
    unclean.leader.election.enable
    ---------------------------------------------------------------------------------------------------------
    这个会出现数据丢失问题,但是可以在一定程度上保证程序的稳定性
    我们可以做一个实验,先查看一下topic_a的分区详细信息,可以发现partition0的主分区在broker3上面
    我们可以在s2节点关闭broker,来模拟集群单节点损坏的情况
    kafka-server-stop.sh
    再次查看可以看到partition0的主分区在broker4上面

1.7 数据同步?宕机后重选

01.发现每个分区的数据都不一样,但是三个分区对外的数据却是一致的
    所有数据的操作全部都针对于leader分区,follower负责同步数据,所以主分区的数据是最新的,从节点会有一个数据同步的过程
    这个时候副本间会有一个数据差别,所有的节点都同步完的数据会对外提供使用

02.这个时候如果第二个副本宕机了
    在损坏的时候不能及时同步数据就会被提出ISR列表。这个时候,它记录了宕机的时候watermark是4,
    如果他重新启动了,并且加入到集群中了,首先会把自己多于4的部分56出除,
    然后和leader重新同步数据,直到同步完毕教据会加入到isr中

03.但是如果是leader副本岩机了会发生什么呢?
    leader节点岩机了,那么会选举出来ISR中的一个副本为主,
    首先选的就是第二个机器,因为他的数据是最新的,所以在选举成为主节点的时候数据会比较新不会丢失数据
    这个时候其他的节点会截取掉watermark之后的数据,然后和新的leader同步数据

1.8 文件存储?流式处理

01.介绍
    kafka的使用场景就是在流式处理过程中,充当一个中间缓冲介质的作用,主要功能是将数据先放入到kafka中,
    计算框架会自己拉取要消费和计算的数据过来,采用poll的方式完全适配自身消费速率。
    那么kafka的存储和hdfs的存储的区别非常清楚的就可以提现出来,hdfs更加适用于整体的存储和取出,
    kafka主要做的是流,数据都是按照条进行流转的,主打的是灵活和效率,那么效率提现在哪些方面上呢???

02.效率提现在哪些方面上呢?
    首先topic是按照分区进行划分的,因为多个分区可以将存储的数据放入到不同的机器节点上,这样起到负载均衡的作用,
    所以每个broker机器节点上面存储的数据都是多个topic的不同的分区的数据,这样分布式处理可以增加kafka的计算和处理能力
    ---------------------------------------------------------------------------------------------------------
    所以broker上面会管理很多topic的不同partition的数据,存储的结构就是以topic-partition方式进行命名的文件夹存储数据,
    但是随着数据的增加,单个分区的数据也会随之增多,这样管理和检索都在一个文件中也是非常低效率的,
    解决办法就是单个分区的数据也会切段进行存储,每个段称之为segment,每一个段称之为一个segement
    ---------------------------------------------------------------------------------------------------------
    在官网中形容的是单个日志文件的最大值,默认是1G
    这样不管找寻什么样的数据都会直接找寻相应的segment段落就行了,不管数据多大,其检索范围也不会超过1G
    但是一个G的文件检索还是比较大的,所以kafka在存储数据的时候,首先存储数据在内存中,然后将数据刷写到磁盘上,这个刷写的大小是以4K为主的
    ---------------------------------------------------------------------------------------------------------
    在这个插入过程中会追加的形式存储到log文件中,并且在index和timeindex中存在稀疏的索引数据
    这个时候查询的时候就可以直接去根据文件条数命名的对应segment中查询数据。能够轻易的跳过1G的部分
    在具体查询数据的时候可以根据index去log中查询数据,速度更快,效率更高
    检索过程为先跳过整体segment部分,然后在segment部分找到index,根据index找到相对应偏移量位置,然后找寻log日志中的数据
    以如此方式进行数据检索,这样的存储格式让检索效果更佳明显
    以上只是kafka的存储方式之一,主要是为了让数据存储更加方便管理和检索

1.9 文件追加(写)、零拷贝(读)

01.介绍1
    我们发现存储的方式比较有规划是对于后续查询非常便捷的,但是这样存储是不是会更加消耗存储性能呢?
    其实kafka的数据存储是追加形式的,也就是数据在存储到文件中的时候是以追加方式拼接到文件末尾的,这样就非常快速的跳过了文件的检索步骤

02.写入的速度非常快?文件追加
    机械磁盘的文件检索需要使用到磁头进行不断扫描数据,如果存储大量的小文件或者存储位置不同的时候需要不停的扫描检索文件
    的位置,这个过程是非常浪费时间的,但是kafka的数据完全以追加的方式存储到磁盘中的,那么这个时候就完全省去了这样的
    一个过程,使得机械磁盘的性能和固态的性能相差无几
    我们可以看到经过测试,机械磁盘的存储性能可以达到600M/s 但是随机读写就比较慢100k/s,所以追加写造就了kafka的高写入性能

03.读取的性能是如何保证的呢?零拷贝
    首先kafka的数据就是以分区作为单位进行分布式管理的,所以多个机器共同管理,效果更加明显
    kafka的存储是按照segment切分的,并且存储的数据是带有index索引的,这个速度可以几乎直接定位到相应的检索文件
    并且kafka还实现了零拷贝技术

04.零拷贝技术
    首先我们可以看到普通的存储在磁盘上的文件要想发送出去的话,需要走以上的步骤
    通过内核和用户空间的加载,反反复复经过4次加载和拷贝过程,这个过程是非常消耗性能和io的
    其实直白来说,如果数据加载的过程中不走用户缓冲区的话直接以内核加载一次的方式进行传输效率是更加高效的
    ---------------------------------------------------------------------------------------------------------
    所以使用到零拷贝技术,方式就是只将数据从磁盘加载到内存中一次,然后直接从内核空间将数据发送到网卡从而直接传输给消费者端
    零拷贝技术的本质就是怎么减少数据的复制过程,并不是没有数据的复制
    这个实现方式就是使用到sendFile的系统函数,它可以直接实现系统内存的映射

1.10 文件清除?默认存储7天

01.介绍
    kafka数据并不是为了做大量存储使用的,
    主要的功能是在流式计算中进行数据的流转,所以kafka中的数据并不做长期存储,默认存储时间为7天

02.kafka中的数据是如何进行删除的呢
    在Kafka中,存在数据过期的机制,称为data expire。
    如何处理过期数据是根据指定的policy(策略)决定的,而处理过期数据的行为,即为log cleanup。

03.kafka中有以下几种处理过期数据的策略
    log.cleanup.policy=delete(Kafka中所有用户创建的topics,默认均为此策略)
    根据数据已保存的时间,进行删除(默认为1周)
    根据log的max size,进行删除(默认为-1,也就是无限制)
    log.cleanup.policy=compact(topic __consumer_offsets 默认为此策略)
    根据messages中的key,进行删除操作
    在active segment 被commit 后,会删除掉old duplicate keys
    无限制的时间与空间的日志保留
    ---------------------------------------------------------------------------------------------------------
    自动清理Kafka中的数据可以控制磁盘上数据的大小、删除不需要的数据,同时也减少了对Kafka集群的维护成本。

04.那Log cleanup 在什么时候发生呢?
    首先值得注意的是:log cleanup 在partition segment 上发生
    更小/更多的segment,也就意味着log cleanup 发生的频率会上升
    Log cleanup 不应该频繁发生=> 因为它会消耗CPU与内存资源
    Cleaner的检查会在每15秒进行一次,由log.cleaner.backoff.ms 控制

05.log.cleanup.policy=delete
    log.cleanup.policy=delete 的策略,根据数据保留的时间、以及log的max size,对数据进行cleanup。
    控制数据保留时间以及log max size的参数分别为:
    log.retention.hours:指定数据保留的时常(默认为一周,168)
    将参数调整到更高的值,也就意味着会占据更多的磁盘空间
    更小值意味着保存的数据量会更少(假如consumer 宕机超过一周,则数据便会再未处理前即丢失)
    log.retention.bytes:每个partition中保存的最大数据量大小(默认为-1,也就是无限大)
    再控制log的大小不超过一个阈值时,会比较有用

06.在到达log cleanup 的条件后,cleaner会自动根据时间或是空间的规则进行删除,新数据仍写入active segment:
    针对于这个参数,一般有以下两种使用场景,分别为:
    log保留周期为一周,根据log保留期进行log cleanup:
    log.retention.hours=168 以及 log.retention.bytes=-1
    log保留期为无限制,根据log大小进行进行log cleanup:
    log.retention.hours=17520以及 log.retention.bytes=524288000
    其中第一个场景会更常见。

07.Log Compaction
    a.介绍
        Log compaction用于确保:在一个partition中,对任意一个key,它所对应的value都是最新的。
        这里举个例子:我们有个topic名为employee-salary,我们希望维护每个employee当前最新的工资情况。
        左边的是compaction前,segments中的数据,右边为compaction 后,segments中的数据,其中有部分key对应的value有更新:
        可以看到在log compaction后,相对于更新后的key-value message,旧的message被删除。
    b.特点
        messages的顺序仍然是保留的,log compaction 仅移除一些messages,但不会重新对它们进行排序
        一条message的offset是无法改变的(immutable),如果一条message缺失,则offset会直接被跳过
        被删除的records在一段时间内仍然可以被consumers访问到,这段时间由参数delete.retention.ms(默认为24小时)控制
    c.注意
        需要注意的是:Kafka 本身是不会组织用户发送duplicate data的。
        这些重复数据也仅会在一个segment在被commit 的时候做重复数据删除,
        所以consumer仍会读取到这部分重复数据(如果客户端有发的话)。
        -----------------------------------------------------------------------------------------------------
        Log Compaction也会有时失败,compaction thread 可能会crash,
        所以需要确保给Kafka server 足够的内存用于做这些操作。
        如果log compaction异常,则需要重启Kafka(此为一个已知的bug)。
        Log Compaction也无法通过API手动触发(至少到现在为止是这样),只能server端自动触发。
        -----------------------------------------------------------------------------------------------------
        正在写入的records仍会被写入Active Segment,已经committed segments会自动做compaction。
        此过程会遍历所有segments中的records,并移除掉所有需要被移除的messages。
        -----------------------------------------------------------------------------------------------------
        Log compaction由上文提到的log.cleanup.policy=compact进行配置,其中:
        Segment.ms(默认为7天):在关闭一个active segment前,所需等待的最长时间
        Segment.bytes(默认为1G):一个segment的最大大小
        Min.compaction .lag.ms(默认为0):在一个message可以被compact前,所需等待的时间
        Delete.retention.ms(默认为24小时):在一条message被加上删除标记后,在实际删除前等待的时间
        Min.Cleanable.dirty.ratio(默认为0.5):若是设置的更高,则会有更高效的清理,但是更少的清理操作触发。若是设置的更低,则清理的效率稍低,但是会有更多的清理操作被触发

2 Kafka介绍2

2.1 ISR、OSR、AR

00.汇总
    ISR:In-Sync Replicas 副本同步队列
    OSR:Out-of-Sync Replicas
    AR:Assigned Replicas 所有副本

2.2 LEO、HW、LSO、LW

00.汇总
    LEO:是 LogEndOffset 的简称,代表当前日志文件中下一条
    HW:水位或水印(watermark)一词,也可称为高水位(high watermark),通常被用在流式处理领域(比如Apache Flink、Apache Spark等),以表征元素或事件在基于时间层面上的进度。在Kafka中,水位的概念反而与时间无关,而是与位置信息相关。严格来说,它表示的就是位置信息,即位移(offset)。取 partition 对应的 ISR中 最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置上一条信息。
    LSO:是 LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同
    LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值。

2.3 Kafka简化流程

01.Producer
    根据指定的 partition 方法(round-robin、hash等),将消息发布到指定 Topic 的 Partition 里面

02.Kafka集群
    接收到 Producer 发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费

03.Consumer
    从 Kafka 集群 pull 数据,并控制获取消息的 offset 。至于消费的进度,可手动或者自动提交给 Kafka 集群

2.4 Kafka采用Pull/Push

00.设计
    Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息

2.5 Kafka传输事务级别

00.三种级别
    a.最多一次
        消息不会被重复发送,最多被传输一次,但也有可能一次不传输
    b.最少一次
        消息不会被漏发送,最少被传输一次,但也有可能被重复传输
    c.精确的一次(Exactly once)
        不会漏传输也不会重复传输,每个消息都被传输

2.6 Kafka实现高吞吐率

01.定义
    Kafka是分布式消息系统,需要处理海量的消息,
    Kafka的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,
    但实际上,使用硬盘并没有带来过多的性能损失

02.kafka主要使用了以下几个方式实现了超高的吞吐率
    顺序读写
    零拷贝
    文件分段
    批量发送
    数据压缩

2.7 Kafka实现高可用?集群

01.Zookeeper部署2N+1节点
    形成 Zookeeper 集群,保证高可用

02.Kafka Broker 部署集群
    每个 Topic 的 Partition ,基于【副本机制】,在 Broker 集群中复制,形成 replica 副本,保证消息存储的可靠性
    每个 replica 副本,都会选择出一个 leader 分区(Partition),提供给客户端(Producer 和 Consumer)进行读写

03.Kafka Producer 无需考虑集群
    因为和业务服务部署在一起
    Producer 从 Zookeeper 拉取到 Topic 的元数据后,
    选择对应的 Topic 的 leader 分区,进行消息发送写入
    而 Broker 根据 Producer 的 request.required.acks 配置
    是写入自己完成就响应给 Producer 成功,还是写入所有 Broker 完成再响应
    这个,就是胖友自己对消息的可靠性的选择

04.Kafka Consumer 部署集群
    每个 Consumer 分配其对应的 Topic Partition ,根据对应的分配策略。并且,Consumer 只从 leader 分区(Partition)拉取消息。另外,当有新的 Consumer 加入或者老的 Consumer 离开,都会将 Topic Partition 再均衡,重新分配给 Consumer 。

2.8 ZooKeeper作用有哪些

00.在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有
    1.Broker 在 ZooKeeper 中的注册。
    2.Topic 在 ZooKeeper 中的注册。
    3.Consumer 在 ZooKeeper 中的注册。
    4.Producer 负载均衡。
    5.Consumer 负载均衡。
    6.记录消费进度 Offset 。
    7.记录 Partition 与 Consumer 的关系。

3 Kafka组件

3.1 broker:进程

00.概念
    每个kafka的机器节点都会运行一个进程,这个进程叫做broker,负责管理自身的topic和partition,
    以及数据的存储和处理,因为kafka是集群形式的,所以一个集群中会存在多个broker,但是kafka的整体又不是一个主从集群,
    需要选举出来一个broker节点为主节点,管理整个集群中所有的数据和操作,以及所有节点的协同工作。
    每个broker上面都存在一个controller组件,这个组件就是主节点管理组件,负责整个集群的管理,
    但是只有一个机器是active状态的,这个需要zookeeper进行协调和选举

3.2 topic:主题

00.概念
    在kafka中存在一个非常重要的逻辑结构叫做topic,可以称之为主题。当我们很多业务需要使用kafka进行消息队列的消息缓存
    和处理的时候我们会将消息进行分类处理,不能让多种类的数据放入到一起,这样使用特别混乱,
    所以topic主主题进行分类,是kafka数据处理的一大特色,可以类比现实中的主播。
    一个主播在直播的时候都会创建一个自己的房间,每个主播都不会相互干扰。各自主播自己的内容。

3.3 partition:分区

00.概念
    每个topic中在使用过程中会存储很多数据,这些数据如果默认只给一个broker进行处理,那么这个broker的压力会太大,
    集群应该负载均衡让数据的压力在不同的机器上共同分摊,所以每个topic都会分为不同的分区,
    一个分区是一个topic数据真正的物理存储方式,让数据分为不同的部分,在多个节点上存储和管理。
    分区是kafka物理存储最小的负载均衡单位,生产者生产数据的时候指向多个分区,
    消费者也可以在消费数据的时候从不同的分区读取数据
    ---------------------------------------------------------------------------------------------------------
    每个broker节点会按照topic的名称和分区的名称组合在一起形成一个文件夹进行文件内容的存储,一个broker会管理多个topic的不同分区的数据

3.4 附:备份、主从

01.备份
    在一个topic中存在多个分区,每个分区存储一部分这个topic的数据,但是因为存在多个机器上,不能够保证数据的稳定性,
    所以数据需要进行备份管理,所以分区是存在备份的,
    比如topicA的数据就需要存储多份在不同的机器上,这样数据损坏一份,其他的部分还可以使用

02.主从
    数据在存储的时候需要备份多个,那么这些数据就要保证数据的一致性,所以我们不能再存放数据的时候随意的向任何副本写入,
    因为这样集群中一个分区的多个副本没有办法保证数据的一致性,所以我们只能写入数据到一个副本,这个副本叫做主副本,
    其他的副本会从主副本同步数据,从而保证数据的一致性,那么这个主从的选举是broker的主节点进行选举的和zookeeper没有关系

3.5 附:选举、生产、消费

00.总结
    数据不管是生产者还是消费者,都是一条一条的操作,这个才是消息队列,这也是消息队列和hdfs等存储介质不同的地方,
    消息队列更加偏向于流式处理,并不是整体存取

01.zookeeper
    帮助选举broker为主,记录哪个是主broker,集群存在几个topic,
    每个topic存在几个分区,分区存在几个副本,每个分区分别在哪个机器节点上

02.producer: 生产者
    将数据远程发送到kafka集群,一般都是flume进行数据采集,并且发送到集群,
    producer一般只能发送数据到一个topic中,和一个主播只能在自己的房间直播一样

03.consumer:消费者
    消费数据并且参加计算处理,一般都是spark,flink等计算框架充当。
    但是一个消费者可以同时消费多个分区的数据,就如一个观众可以一起看多个小姐姐直播一样

4 Kafka生产者

4.1 流程

01.步骤1
     首先kafka启动以后所有的broker都会向zookeeper进行注册,在/brokers/ids中以列表的形式展示所有的节点,
     在/controller节点中使用独享锁实现broker的选举,其中一个机器为主节点。
     其他的为从节点,选举的根本原则就是谁先来的谁就是主节点

02.步骤2
    broker0现在是controller节点,他会监听所有的broker节点的动态变化,然后选举出来所有的topic的分区的主从,
    这个选举完毕以后,所有的操作都会指向主分区,不管是生产数据还是消费数据都是主分区在管理,从分区只是同步数据的

03.步骤3
    broker0选举完毕以后将数据上传到zookeeper中,记录在/broker/topics这个目录中,具体的topic信息都会被其他的broker节点进行同步过去,多个broker都会识别选举出来的主从分区信息
    其中在zookeeper中的ISR它是数据的传递优先级别顺序,如上图中数据的传输应该先到leader节点所在的机器4上面然后数据在同步到其他的从分区中,从而所有的分区数据都同步完毕保持一致

04.步骤4
    数据生产和传输都会走主节点,topic正常对外提供服务

4.2 KV结构

01.介绍
    kafka中的数据存储分为两个部分,分别是k-v两个部分,并且存储的数据都是二进制的,
    我们在存储数据的时候要转换为二进制存储,使用的时候读出来也是二进制的,
    我们需要人为转换成自己想要的数据类型才能使用,这个和hbase的存储及其相似,
    但是其中的k一般我们都不会做任何操作,只放入value的值

02.注意
    虽然数据分为k-v两个部分,但是不要把它当成map集合,相同的key的数据value不会被去重掉

4.3 整体结构

01.生产者部分的整体流程
    首先producer将发送的数据准备好
    经过interceptor的拦截器进行处理,如果有的话
    然后经过序列化器进行转换为相应的byte[]
    经过partitioner分区器分类在本地的record accumulator中缓冲
    sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka

02.producer:生产者
    它由三个部分组成
    interceptor:拦截器,能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的
    serialiazer:序列化器,kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定
    partitioner: 分区器,主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的

03.record accumulator
    本地缓冲累加器 默认32M
    ---------------------------------------------------------------------------------------------------------
    producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,
    远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,
    record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,
    远程发送给kafka集群,这个异步线程会根据linger.ms和batch-size进行拉取数据。如果本地累加器中的数据达到
    batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,
    还可以批次形式执行任务,增加效率
    ---------------------------------------------------------------------------------------------------------
    batch-size 默认16KB
    linger.ms 默认为0

4.4 producer确认机制

01.介绍
    主线程将数据放入到本地累加器中record accumulator中进行存储,sender线程会异步的拉取数据到kafka集群中,
    这个数据拉取并且复制到kafka集群中以后,kafka需要返回给sender线程一个确认应答ack这个确认应答用于
    在sender线程中进行判定sender线程是否复制拉取数据成功,如果我们在producer中设定了retries开关,
    那么失败以后sender线程还会多次重新复制尝试拉取数据
    ---------------------------------------------------------------------------------------------------------
    其中失败尝试和producer端没有任何关系,producer端只是将数据放入到本地累加器中而已,失败尝试是由sender线程重新尝试的
    ack的级别:
    ack = 0 ;sender线程认为拉取过去的数据kafka一定会收到
    ack = 1 ; sender线程拉取过去的数据leader节点接收到,并且存储到自己的本地,然后在返回ack
    ack = -1 ; sender线程拉取数据,leader节点收到存储到本地,所有follower节点全部都接收到并且存储到本地这个时候leader返回ack
    ---------------------------------------------------------------------------------------------------------
    综上所述ack = -1的级别是数据稳定性最高的,因为能够保证数据全部都同步完毕再返回给sender线程

02.带有确认应答的代码:
    其中回调函数中的metadata对象可以知道发送数据到哪里了,exception用于区分是不是本条数据发送成功
    但是这个回调函数不能做出任何的反馈操作,只能起到通知的作用
    ---------------------------------------------------------------------------------------------------------
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Properties;

    public class ProducerWithCallBack {
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            //设定ack,在代码中ack的级别存在三种 0 1 all
            pro.put(ProducerConfig.ACKS_CONFIG, "all");
            //设定重试次数
            pro.put(ProducerConfig.RETRIES_CONFIG,3 );
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
            pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
            for(int i=0;i<5;i++){
                producer.send(record, new Callback() {
                //发送方法中增加回调代码
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    //metadata中包含所有的发送数据的元数据信息
                    //哪个topic的那个分区的第几个数据
                    String topic = metadata.topic();
                    int partition = metadata.partition();
                    long offset = metadata.offset();
                    if(exception == null ){
                        System.out.println("success"+" "+topic+" "+partition+" "+offset);
                    }else{
                        System.out.println("fail"+" "+topic+" "+partition+" "+offset);
                    }
                }
            });
            }
            producer.close();
        }
    }

4.5 producer自定义拦截器

01.介绍
    interceptor是拦截器,可以拦截到发送到kafka中的数据进行二次处理,它是producer组成部分的第一个组件
    实现拦截器需要实现ProducerInterceptor这个接口,其中的泛型要和producer端发送的数据的类型一致
    ---------------------------------------------------------------------------------------------------------
    onSend方法是最主要的方法用户拦截数据并且处理完毕发送 onAcknowledgement 获取确认应答的方法,
    这个方法和producer端的差不多,只能知道结果通知 close是执行完毕拦截器最后执行的方法 configure方法是用于获取配置文件信息的方法

02.实现基于场景是获取到producer端的数据然后给数据加上时间戳
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Date;
    import java.util.Map;
    import java.util.Properties;

    public class ProducerWithInterceptor {

        public static class MyInterceptor implements ProducerInterceptor<String,String>{

            @Override
            public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
                String value = record.value();
                Long time = new Date().getTime();
                String topic = record.topic();
                //获取原始数据并且构建新的数据,增加时间戳信息
                return new ProducerRecord<String,String>(topic,time+"-->"+value);
            }

            @Override
            public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                //获取确认应答和producer端的代码逻辑相同
                String topic = metadata.topic();
                int partition = metadata.partition();
                long offset = metadata.offset();
                if(exception == null ){
                    System.out.println("success"+" "+topic+" "+partition+" "+offset);
                }else{
                    System.out.println("fail"+" "+topic+" "+partition+" "+offset);
                }
            }

            @Override
            public void close() {
                //不需要任何操作
                //no op
            }

            @Override
            public void configure(Map<String, ?> configs) {
                 //不需要任何操作
                //no op
            }
        }

        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.ACKS_CONFIG, "all");
            pro.put(ProducerConfig.RETRIES_CONFIG,3 );
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
            pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);
            pro.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.class.getName());
            //设定拦截器
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
            for(int i=0;i<5;i++){
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        String topic = metadata.topic();
                        int partition = metadata.partition();
                        long offset = metadata.offset();
                        if(exception == null ){
                            System.out.println("success"+" "+topic+" "+partition+" "+offset);
                        }else{
                            System.out.println("fail"+" "+topic+" "+partition+" "+offset);
                        }
                    }
                });
            }
            producer.close();
        }
    }
    ---------------------------------------------------------------------------------------------------------
    1674968729541-->this is hainiu
    1674968729969-->this is hainiu
    1674968729969-->this is hainiu
    1674968729969-->this is hainiu
    1674968729969-->this is hainiu
    ---------------------------------------------------------------------------------------------------------
    在客户端消费者中可以打印出来信息带有时间戳
    拦截器一般很少人为定义,比如一般producer在生产环境中都是有flume替代,一般flume会设定自己的时间戳拦截器,指定数据采集时间,相比producer更加方便实用

4.6 producer自定义序列化器

01.介绍
    kafka中的数据存储是二进制的byte数组形式,所以我们在存储数据的时候要使用序列化器进行数据的转换,序列化器的结构要和存储数据的kv的类型一致

02.实现系统的String类型序列化器
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.nio.charset.StandardCharsets;
    import java.util.Map;
    import java.util.Properties;

    public class ProducerWithSerializer {
        public static void main(String[] args) {
            Properties pro = new Properties();
            //bootstrap-server key value batch-size linger.ms ack retries interceptor
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyConfig.BOOTSTRAP_SERVERS);
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MyStringSerializer.class.getName());
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyStringSerializer.class.getName());
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG,MyConfig.BATCH_SIZE);
            pro.put(ProducerConfig.LINGER_MS_CONFIG,MyConfig.LINGER_MS);
            pro.put(ProducerConfig.ACKS_CONFIG,MyConfig.ACK);
            pro.put(ProducerConfig.RETRIES_CONFIG,MyConfig.RETRIES);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);

            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "hainiu","message_" + i);
                producer.send(record);
            }

            producer.close();
        }

        public static class MyStringSerializer implements Serializer<String>{

            @Override
            public byte[] serialize(String topic, String data) {
                return data.getBytes(StandardCharsets.UTF_8);
            }
        }
    }

03.序列化对象整体
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.io.Serializable;
    import java.nio.charset.StandardCharsets;
    import java.util.Properties;

    /**
     * producerRecord==> key:string,value:student
     */
    public class ProducerWithStudentSerializer {

        public static class Student implements Serializable{
            private int id;
            private String name;
            private int age;

            public int getId() {
                return id;
            }

            public void setId(int id) {
                this.id = id;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            public int getAge() {
                return age;
            }

            public void setAge(int age) {
                this.age = age;
            }

            public Student() {
            }

            public Student(int id, String name, int age) {
                this.id = id;
                this.name = name;
                this.age = age;
            }
        }

        public static class MyStudentSerializer implements Serializer<Student>{
            @Override
            public byte[] serialize(String topic, Student data) {
                ByteArrayOutputStream byteOS = null;
                ObjectOutputStream objectOS = null;
                try {
                     byteOS =new ByteArrayOutputStream();
                     objectOS = new ObjectOutputStream(byteOS);
                    objectOS.writeObject(data);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }finally {
                    try {
                        byteOS.close();
                        objectOS.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                return byteOS.toByteArray();
            }
        }

        public static void main(String[] args) {
            Properties pro = new Properties();
            //bootstrap-server key value batch-size linger.ms ack retries interceptor
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyConfig.BOOTSTRAP_SERVERS);
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG,MyConfig.BATCH_SIZE);
            pro.put(ProducerConfig.LINGER_MS_CONFIG,MyConfig.LINGER_MS);
            pro.put(ProducerConfig.ACKS_CONFIG,MyConfig.ACK);
            pro.put(ProducerConfig.RETRIES_CONFIG,MyConfig.RETRIES);
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,MyStudentSerializer.class.getName());

            KafkaProducer<String, Student> producer = new KafkaProducer<>(pro);

            Student s1 = new Student(1, "zhangsan", 20);
            Student s2 = new Student(2, "lisi", 30);

            ProducerRecord<String, Student> r1 = new ProducerRecord<>("topic_a", "hainiu", s1);
            ProducerRecord<String, Student> r2 = new ProducerRecord<>("topic_a", "hainiu", s2);

            producer.send(r1);
            producer.send(r2);

            producer.close();
        }
    }

04.实现serialize的方法用于将数据对象转换为二进制的数组
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.nio.charset.Charset;
    import java.util.Properties;

    public class Producer1 {
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            //key因为没有放入任何值,所以序列化器使用原生的就可以
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StudentSeria.class.getName());
            //value的序列化器需要指定相应的student序列化器
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
            pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);

            KafkaProducer<String, Student> producer = new KafkaProducer<String, Student>(pro);
            //producer生产的数据类型也必须是string,student类型的kv
            Student student = new Student(1, "zhangsan", 30);
            ProducerRecord<String, Student> record = new ProducerRecord<>("topic_a", student);
            producer.send(record);
            producer.close();
        }

        public static class Student{
            private int id;
            private String name;
            private int age;

            public int getId() {
                return id;
            }

            public Student() {
            }

            public void setId(int id) {
                this.id = id;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            public int getAge() {
                return age;
            }

            public void setAge(int age) {
                this.age = age;
            }

            public Student(int id, String name, int age) {
                this.id = id;
                this.name = name;
                this.age = age;
            }
        }

        public static class StudentSeria implements Serializer<Student> {

            @Override
            public byte[] serialize(String topic, Student data) {
                String line =data.getId()+" "+data.getName()+" "+data.getAge();
                //获取student对象中的所有数据信息
                return line.getBytes(Charset.forName("utf-8"));
                //转换为byte数组返回
            }
        }
    }

4.7 producer自定义分区器

01.介绍
    首先在kafka存储数据的时候topic中的数据是分为多个分区进行存储的,topic设定分区的好处是可以进行分布式存储和分布式管理,
    那么好的分区器可以让数据尽量均匀的分布到不同的机器节点,数据更加均匀,那么kafka中的分区器是如果实现的呢?
    ---------------------------------------------------------------------------------------------------------
    根据图我们可以看出数据首先通过分区器进行分类,在本地的累加器中进行存储缓存,
    然后在复制到kafka集群中,所以分区器产生作用的位置在本地的缓存之前

02.原理
    a.kafka的分区规则是如何实现的呢
        ProducerRecord<String, Student> record = new ProducerRecord<>("topic_a", student);
        producer.send(record);
    b.kafka的生产者数据发送是通过上面的方法实现的
        首先要构造一个ProducerRecord对象,然后通过producer.send来进行发送数据,其中ProducerRecord对象的构造器种类分为以下几种
        /**
             * Creates a record to be sent to a specified topic and partition
             *
             * @param topic The topic the record will be appended to
             * @param partition The partition to which the record should be sent
             * @param key The key that will be included in the record
             * @param value The record contents
             */
        public ProducerRecord(String topic, Integer partition, K key, V value) {
            this(topic, partition, null, key, value, null);
            }

        /**
             * Create a record to be sent to Kafka
             *
             * @param topic The topic the record will be appended to
             * @param key The key that will be included in the record
             * @param value The record contents
             */
        public ProducerRecord(String topic, K key, V value) {
            this(topic, null, null, key, value, null);
        }

        /**
             * Create a record with no key
             *
             * @param topic The topic this record should be sent to
             * @param value The record contents
             */
        public ProducerRecord(String topic, V value) {
            this(topic, null, null, null, value, null);
        }
    c.分区策略
        我们想要发送一个数据到kafka的时候可以构造以上的ProducerRecord但是构造的方式却不同,
        大家可以发现携带的参数也有所不同,当携带不同参数的时候数据会以什么样的策略发送出去呢,
        这个时候需要引入一个默认分区器,就是在用户没有指定任何规则的时候系统自带的分区器规则
        -----------------------------------------------------------------------------------------------------
        在producerConfig对象中我们可以看到源码指示,如果没有任何人为分区器规则指定,那么默认使用的DefaultPartitioner的规则
        而打开DefaultPartitioner以后可以看到他的分区器规则,就是在构建ProducerRecord的时候
        new ProducerRecord(topic,partition,k,v);
        //指定分区直接发送数据到相应分区中
        new ProducerRecord(topic,k,v);
        //没有指定分区就按照k的hashcode发送到不同分区
        new ProducerRecord(topic,v);
        //如果k和partition都没有指定就使用粘性分区
        -----------------------------------------------------------------------------------------------------
        这个逻辑可以在DefaultPartitioner中看到
        partition方法中,如果key为空就放入到粘性缓冲中,它的意思就是如果满足batch-size或者linger.ms就会触发应用执行,
        将数据复制到kafka中,并且再次随机到其他分区,所以简单来说粘性分区就是可一个分区放入数据,一旦满了以后才会改变分区,
        粘性分区规则使用主要是为了让每次复制数据更加快捷方便都赋值到一个分区中
        -----------------------------------------------------------------------------------------------------
        而如果key不为空那么就按照hashcode值进行取余处理,以上就是kafka的分区策略
        我们也可以认为设定自己的分区器规则来替换kafka的分区器

03.三种分区器
    a.默认分区器的规则DefaultPartitioner
        package com.hainiu.kafka;

        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;
        import org.apache.kafka.common.serialization.Serializer;
        import org.apache.kafka.common.serialization.StringSerializer;

        import java.nio.charset.Charset;
        import java.util.Properties;

        public class Producer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
                pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);

                KafkaProducer<String, String> producer = new KafkaProducer<>(pro);
                ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", 0, null, "aaaa");
                //创建对象指定分区
                producer.send(record);
                producer.close();
            }

        }
        -----------------------------------------------------------------------------------------------------
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092 --from-beginning --partition 2
        指定分区0进行消费数据,可以得到数据相应位置
    b.不指定分区,携带key 111的hashcode值274292042,使用这个值和分区数4取余得出相应的分区编号为2
        package com.hainiu.kafka;

        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;
        import org.apache.kafka.common.serialization.Serializer;
        import org.apache.kafka.common.serialization.StringSerializer;
        import org.apache.kafka.common.utils.Utils;

        import java.nio.charset.Charset;
        import java.util.Properties;

        public class Producer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
                pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);

                KafkaProducer<String, String> producer = new KafkaProducer<>(pro);
                ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "111", "111");
                producer.send(record);
                producer.close();
            }
        }
        -----------------------------------------------------------------------------------------------------
        可以看出数据的计算是没有问题的,按照k的hashcode值进行分发
    c.没有增加partition和key的时候会按照粘性分区规则进行分配
        package com.hainiu.kafka;

        import org.apache.kafka.clients.producer.*;
        import org.apache.kafka.common.serialization.Serializer;
        import org.apache.kafka.common.serialization.StringSerializer;
        import org.apache.kafka.common.utils.Utils;

        import java.nio.charset.Charset;
        import java.util.Properties;

        public class Producer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
                pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);

                KafkaProducer<String, String> producer = new KafkaProducer<>(pro);
                for (int i = 0; i < 5; i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "hahahaha");
                    producer.send(record, new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            String topic = metadata.topic();
                            int partition = metadata.partition();
                            System.out.println(topic+"-->"+partition);
                        }
                    });
                }
                producer.close();
            }

        }
        -----------------------------------------------------------------------------------------------------
        我们没有指定任何分区和key,默认分区器会自动随机一个分区。然后数据以粘性的方式分发到不同的分区中

04.自定义分区器
    a.背景
        我们可以人为定义分区器的规则来替换原生分区器的规则,因为在很多时候默认分区器的规则都不适用于业务场景
    b.程序背景
        使用producer采集本地数据并且发送到不同的分区中,按照每个专业的类别将数据分发到不同的分区
        -----------------------------------------------------------------------------------------------------
        # 首先创建topic名称为teacher,设定分区数量为spark和java两个分区
        kafka-topics.sh --bootstrap-server nn1:9092 --create --topic teacher --partitions 6 --replication-factor 2
        # 创建一个data/teacher.txt 放入老师的访问数据,然后分类发送
        http://spark.hainiubl.com/unclewang
        http://spark.hainiubl.com/unclewang
        http://spark.hainiubl.com/xiaohe
        http://spark.hainiubl.com/xiaohe
        http://spark.hainiubl.com/laoyang
        http://spark.hainiubl.com/laoyang
        http://java.hainiubl.com/laochen
        http://java.hainiubl.com/laochen
        http://java.hainiubl.com/laoliu
        http://java.hainiubl.com/laoliu
        -----------------------------------------------------------------------------------------------------
        实现思路就是采集数据然后将数据按照专业分类,分别发送到不同的分区中
        -----------------------------------------------------------------------------------------------------
        自定分区器逻辑
        public static class MyPartitioner implements Partitioner{

            @Override
            public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                return 0;
            }

            @Override
            public void close() {

            }

            @Override
            public void configure(Map<String, ?> configs) {

            }
        }
        -----------------------------------------------------------------------------------------------------
        自定义分区器要实现Partitioner接口
        其中Partition方法是最重要的方法,可以根据不同的数据发送到不同的分区
        close方法是关闭时候执行的方法一般不做任何操作
        configure方法是获取配置文件的方法,一般也不做任何操作
    c.在partition方法中得到value值中是否包含spark或者java,然后分类发送
        package com.hainiu.kafka;

        import org.apache.kafka.clients.producer.*;
        import org.apache.kafka.common.Cluster;
        import org.apache.kafka.common.serialization.StringSerializer;
        import org.apache.kafka.common.utils.Utils;

        import java.io.FileInputStream;
        import java.net.URL;
        import java.util.*;

        public class ProducerWithUDPartitioner {

            public static class MyTeacherPartitioner implements Partitioner{
                List<String> list = Arrays.asList(
                        "unclewang",
                        "xiaohe",
                        "laoyang",
                        "laochen",
                        "laoliu",
                        "laozhang"
                );

                @Override
                public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                    String valueStr = value.toString();
                    return list.indexOf(valueStr);
                }

                @Override
                public void close() {
                    //no - op
                }

                @Override
                public void configure(Map<String, ?> configs) {
                    // no - op
                }
            }

            public static void main(String[] args) throws Exception{

                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyTeacherPartitioner.class.getName());
                KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);

                FileInputStream in = new FileInputStream("data/teacher.txt");
                Scanner sc = new Scanner(in);
                while(sc.hasNext()){
                    String line = sc.nextLine();
                    URL url = new URL(line);
                    String host = url.getHost();
                    String path = url.getPath();
                    String subject = host.split("\\.")[0];
                    String teacher = path.substring(1);
                    ProducerRecord<String, String> record = new ProducerRecord<>("teacher", subject, teacher);
                    producer.send(record, new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if(exception == null){
                                System.out.println(metadata.topic()+"-->"+metadata.partition()+"-->"+record.key()+"-->"+record.value());
                            }else{
                                System.out.println("fail");
                            }
                        }
                    });
                }
                producer.close();
            }
        }

4.8 producer一致性和ACK

01.介绍
    kafka是存在确认应答机制的,也就是数据在发送到kafka的时候,kafka会回复一个确认信息,这个确认信息是存在等级的
    ack=0 这个等级是最低的,这个级别中数据sender线程复制完毕数据默认人为kafka已经接收到数据
    ack=1 这个级别中,sender线程复制完毕数据leader分区拿到数据放入到自己的存储并且返回确认信息
    ack=-1 这个级别比较重要,sender线程复制完毕数据,主分区接受完毕数据并且从分区都同步完毕数据然后在返回确认信息
    ---------------------------------------------------------------------------------------------------------
    ack=0 会丢失数据
    ack=1 的时候leader虽然接收到数据存储到本地,但是没有同步给follower节点,这个时候主节点宕机,从节点重新选举新的主节点,主节点是不含有这个数据的,数据会丢失
    ack=-1 这个模式不会丢失数据,但是如果leader接受完毕数据并且将数据同步给不同的follower,从节点已经接受完毕,但是还没有返回给sender线程ack的时候,这个时候leader节点宕机了,sender没有接收到这个ack,它人为没有发送成功还会重新发送数据过来,会造成数据重复
    ---------------------------------------------------------------------------------------------------------
    acks=0: 表示生产者不需要等待任何 broker 的确认,可能导致数据丢失。
    acks=1: 表示生产者只等待领导者 broker 的确认,若领导者在确认后崩溃则可能丢失数据。
    acks=all: 表示生产者会等待所有副本的确认,这样可以确保数据不会丢失,但延迟可能较大。
    一般前两种都适合在数据并不是特别重要的时候使用,而最后一种效率会比较低下,但是适用于可靠性比较高的场景使用
    所以一般使用我们都会使用ack = -1 retries = N 联合在一起使用

02.一致性语义
    在大数据场景中存在三种时间语义,分别为
    At Least Once 至少一次,数据至少一次,可能会重复
    At Most Once 至多一次,数据至多一次,可能会丢失
    Exactly Once 精准一次,有且只有一次,准确的消息传输
    ---------------------------------------------------------------------------------------------------------
    那么针对于以上我们学习了ack已经幂等性以及事物
    所以我们做以下分析:
    如果设定ack = 0 或者是 1 出现的语义就是At Most Once 会丢失数据
    如果设定ack = - 1 会出现At Least Once 数据的重复
    在ack = -1的基础上开启幂等性会解决掉数据重复问题,但是不能保证一个批次的数据整体一致,所以还要开启事物才可以

03.幂等性
    在kafka的0.10以后的版本中增加了新的特性,幂等性,主要就是为了解决kafka的ack = -1的时候,数据的重复问题,设计的原理就是在kafka中增加一个事物编号
    数据在发送的时候在单个分区中的seq事物编号是递增的,如果重复的在一个分区中多次插入编号一致的两个信息,那么这个数据会被去重掉
    在单个分区中序号递增,也就是我们开启幂等性也只能保证单个分区的数据是可以去重的
    ---------------------------------------------------------------------------------------------------------
    整体代码如下:
    pro.put(ProducerConfig.RETRIES_CONFIG,3);
    pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
    ---------------------------------------------------------------------------------------------------------
    设定retries = 3 ,enable.idempotence = true
    幂等性开启的时候,ack默认设定为-1
    幂等性的工作原理很简单,每条消息都有一个「主键」,这个主键由 <PID, Partition, SeqNumber> 组成,他们分别是:
        PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
        Partition:消息需要发往的分区号
        SeqNumber:生产者,他会记录自己所发送的消息,给他们分配一个自增的 ID,这个 ID 就是 SeqNumber,是该消息的唯一标识
    对于主键相同的数据,Kafka 是不会重复持久化的,它只会接收一条,但由于是原理的限制,
    幂等性也只能保证单分区、单会话内的数据不重复,
    如果 Kafka 挂掉,重新给生产者分配了 PID,还是有可能产生重复的数据,这就需要另一个特性来保证了--Kafka事务

4.9 附:参数调整

01.参数
    参数                                     调节
    buffer.memory                            record accumulator的大小,适当增加可以保证producer的速度,默认32M
    batch-size                               异步线程拉取的批次大小,适当增加可以提高效率,但是会增加延迟性
    linger.ms                                异步线程等待时长一般根据生产效率而定,不建议太大增加延迟效果
    acks                                     确认应答一般设定为-1,保证数据不丢失
    enable.idempotence                       开启幂等性保证数据去重,实现exactly once语义
    retries                                  增加重试次数,保证数据的稳定性
    compression.type                         增加producer端的压缩
    max.in.flight.requests.per.connection    sender线程异步复制数据的阻塞次数,当没收到kafka的ack之前可以最多发送五个写入请求,调节这个参数可以保证数据的有序性

02.代码
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Properties;

    public class ProducerWithMultiConfig {
        public static void main(String[] args) throws InterruptedException {
            Properties pro = new Properties();
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
            pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);
            pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);
            pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
            pro.put(ProducerConfig.RETRIES_CONFIG, 3);
            pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
            pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
            producer.send(record);
            producer.close();
        }
    }
    ---------------------------------------------------------------------------------------------------------
    其中max.in.flight.requests.per.connection参数设定后可以增加producer的阻塞大小
    在未开启幂等性的时候,这个值设定为1,可以保证单个批次的数据有序,在分区内部有序
    如果开启了幂等性可以设定最大值不超过5,可以保证五个request请求单个分区内有序
    ---------------------------------------------------------------------------------------------------------
    因为没有开启幂等性的时候如果第一个请求失败,第二个请求重新发送的时候需要二次排序
    要是开启幂等性了会保留原来的顺序性,不需要重新排序
    总而言之kafka可以保证单分区有序但是整体是无序的

5 Kafka消费者

5.1 存储结构

01.介绍
    这些数据可以在服务器集群上对应的文件夹中查看到
    每个文件夹以topic+partition进行命名,更加便于管理和查询检索,
    因为kafka的数据都是按照条进行处理和流动的一般都是给流式应用做数据供给和缓冲,
    所以检索速度必须要快,分块管理是最好的方式

02.消费者在检索相应数据的时候会非常的简单
    consumer检索数据的过程
        首先文件的存储是分段的,那么文件的名称代表的就是这个文件中存储的数据范围和条数
        00000000000000000000.index
        00000000000000000000.log
        00000000000000000000.timeindex 代表存储的数据是从0条开始的
        00000000000000100000.index
        00000000000000100000.log
        00000000000000100000.timeindex 代表存储的数据是从100000条开始的
    所以首先检索数据的时候就可以跳过1G为大小的块,比如检索888这条数据的,就可以直接去00000000000000000000.log中查询数据
    那么查询数据还是需要在1G大小的内容中找寻是比较麻烦的,这个时候可以从index索引出发去检索,首先我们可以通过kafka提供的工具类去查看log和index中的内容

03.代码实现
    # 首先创建一个topic_b
    kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_b --partitions 5 --replication-factor 2
    ---------------------------------------------------------------------------------------------------------
    # 然后通过代码随机向不同的分区中分发不同的数据1W条
    package com.hainiu.kafka;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;

    import java.util.Properties;

    public class ProducerWithMultiConfig {
        public static void main(String[] args) throws InterruptedException {
            Properties pro = new Properties();
            pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
            pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);
            pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);
            pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
            pro.put(ProducerConfig.RETRIES_CONFIG, 3);
            pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
            pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
            for (int i = 0; i < 10000; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", ""+i,"this is hainiu");
                producer.send(record);
            }
            producer.close();
        }
    }
    ---------------------------------------------------------------------------------------------------------
    # kafka查看日志和索引的命令
    kafka-run-class.sh kafka.tools.DumpLogSegments --files xxx
    ---------------------------------------------------------------------------------------------------------
    index索引
        offset第几条   position物理偏移量位置,也就是第几个字
        1187           5275
        1767           10140
        2022           15097
    log日志
        # 打印日志内容的命令 --print-data-log 打印数据
        kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
        # 可以看到刷写的日志
        baseOffset: 1768 lastOffset: 2022 count: 255
        从1768到2022条一次性刷写255条
        lastSequence: 2022 producerId: 1007 position: 15097
        刷写事物日志编号,生产者的编号,最后一条数据的物理偏移量
    ---------------------------------------------------------------------------------------------------------
    通过名称跳过1G的端,然后找到相应的index的偏移量,然后根据偏移量定位log位置,不断向下找寻数据
    大家可以看到index中的索引数据是轻量稀疏的,这个数据是按照4KB为大小生成的,一旦刷写4KB大小的数据就会写出相应的文件索引
    官网给出的默认值4KB
    一个数据段大小是1G
    ---------------------------------------------------------------------------------------------------------
    timeIndex
        # 查询时间索引
        kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex
         可以看到和index索引一样,这个也是4Kb写出一部分数据,但是写出的是时间,我们可以根据时间进行断点找寻数据,指定时间重复计算
        也就是说,写到磁盘的数据是按照1G分为一个整体部分的,但是这个整体部分需要4KB写一次,并且一次会生成一个索引问题信息,在检索的时候可以通过稀疏索引进行数据的检索,效率更快

5.2 消费者组、分区

01.介绍
    能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者
    ---------------------------------------------------------------------------------------------------------
    这里面要设计到一个动作叫做拉取
    首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,
    但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,
    这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,
    然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,
    所以kafka的消费者一般都是采用拉的方式pull,并不是push

02.消费者组
    在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,
    如果消费者只有一个的话很难全部消费其中的数据,压力也会集中在一个消费者中,
    并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,
    那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,
    所有消费者不可能只有一个,一般情况下都会有多个消费者
    ---------------------------------------------------------------------------------------------------------
    正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,
    所以有几个分区应该对应存在几个消费者是最好的
    这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的
    ---------------------------------------------------------------------------------------------------------
    那么消费者是多个人,每个人对应一个单独的分区进行消费数据是最好的,
    但问题是一个消费者难道知道自己应该去消费哪个分区吗,他们直接会不会出现混乱呢
    ---------------------------------------------------------------------------------------------------------
    如果一个消费者想要消费多个分区的数据,或者两个消费者消费了同一个分区的数据怎么办,这样数据就会出现混乱了?
    消费者组出现了,它是一个组标识,每个消费者上面都应该设置一个消费者组标识,
    这样在进入到kafka消费相应分区的时候kafka就不会让数据混乱的分配给不同的消费者了,当然只有组内是有这样的分配关系的
    ---------------------------------------------------------------------------------------------------------
    消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
    消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

03.消费者和分区
    a.介绍
         一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,
         但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况
    b.验证
        # 首先创建topic_c 用于测试分区和消费者的对应关系
        kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_c --partitions 3 --replication-factor 2
        # 启动两个消费者 刚才我们写的消费者main方法运行两次
        # 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况
        ---------------------------------------------------------------------------------------------------------
        启动两次,这个时候我们就有了两个消费者实例
        生产者线程:分别向三个分区中发送1 2 3元素
        可以看到有的消费者消费了两个分区的数据,
        如果启动三个消费者,会发现每个人消费一个分区的数据
        如果启动四个消费者,我们发现有一个消费者没有数据
    c.代码
        package com.hainiu.kafka;

        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;
        import org.apache.kafka.common.serialization.StringSerializer;

        import java.util.Properties;

        public class Producer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
                ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_c", 0,null,"1");
                ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_c", 1,null,"2");
                ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_c", 2,null,"3");
                producer.send(record1);
                producer.send(record2);
                producer.send(record3);
                producer.close();
            }
        }

5.3 一对一:点对点消费[4种]

00.消费方式
    a.点对点消费
        点对点的方式,在队列中的数据有且只有一个消费者可以消费数据,
        在消费完毕数据以后会将数据从队列中删除,这个数据有且只有一次消费
    b.发布订阅模式
        发布定语模式中每个人可以消费数据,这个数据会在队列中存储七天,
        每个订阅这个数据的人都可以消费到相应的数据,并且可以重复的进行消费数据,在大多数情况下我们都使用发布订阅模式

01.4种方式
    一个组 消费 一个topic
    多个组 消费 一个topic
    不同组 消费 多个topic
    一个组 消费 多个topic

02.一个组 消费 多个topic
    package com.hainiu.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Properties;
    
    public class Consumer1 {
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
            pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
            List<String> topics = Arrays.asList("topic_c","topic_a");
            //订阅多个topic的数据变化
            consumer.subscribe(topics);
    
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                Iterator<ConsumerRecord<String, String>> it = records.iterator();
                while(it.hasNext()){
                    ConsumerRecord<String, String> record = it.next();
                    System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                }
            }
        }
    }

03.多个组 消费 一个topic
    修改同一份代码的组标识不同。启动两个实例查看里面的消费信息
    pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");
    pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
    ---------------------------------------------------------------------------------------------------------
    package com.hainiu.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Properties;
    
    public class Consumer1 {
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
            pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
            List<String> topics = Arrays.asList("topic_c");
            //订阅多个topic的数据变化
            consumer.subscribe(topics);
    
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                Iterator<ConsumerRecord<String, String>> it = records.iterator();
                while(it.hasNext()){
                    ConsumerRecord<String, String> record = it.next();
                    System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                }
            }
        }
    }

5.4 一对多:发布订阅模式

00.消费方式
    a.点对点消费
        点对点的方式,在队列中的数据有且只有一个消费者可以消费数据,
        在消费完毕数据以后会将数据从队列中删除,这个数据有且只有一次消费
    b.发布订阅模式
        发布定语模式中每个人可以消费数据,这个数据会在队列中存储七天,
        每个订阅这个数据的人都可以消费到相应的数据,并且可以重复的进行消费数据,在大多数情况下我们都使用发布订阅模式

5.5 consumer自定义反序列化器

01.String类型的反序列化器
    package com.hainiu.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.nio.charset.StandardCharsets;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Properties;
    
    public class ConsumerWithStringDeserializer {
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");
            pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, MyDeserializer.class.getName());
            pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyDeserializer.class.getName());
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
            List<String> topics = Arrays.asList("topic_c","topic_a");
            consumer.subscribe(topics);
    
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
                Iterator<ConsumerRecord<String, String>> it = records.iterator();
    
                while(it.hasNext()){
                    ConsumerRecord<String, String> record = it.next();
                    System.out.println(record.topic()+"-->"+record.partition()+"-->"+record.key()+"-->"+record.value()+"-->"+record.offset());
                }
            }
        }
    
        public static class MyDeserializer implements Deserializer<String>{
    
            @Override
            public String deserialize(String topic, byte[] data) {
                String line = new String(data, StandardCharsets.UTF_8);
                return line;
            }
        }
    
    }

02.Student类型的反序列化器
    package com.hainiu.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Properties;
    
    public class ConsumerWithStudentDeSerializer {
    
       public static class MyStudentDeserializer implements Deserializer<Student>{
    
           @Override
           public Student deserialize(String topic, byte[] data) {
               ByteArrayInputStream byteIn = null;
               ObjectInputStream objectIn = null;
               Student s = null;
               try {
                   byteIn = new ByteArrayInputStream(data);
                   objectIn = new ObjectInputStream(byteIn);
                   Object o = objectIn.readObject();
                   s = (Student) o;
               } catch (Exception e) {
                   throw new RuntimeException(e);
               }finally {
                   try {
                       byteIn.close();
                       objectIn.close();
                   } catch (IOException e) {
                       throw new RuntimeException(e);
                   }
               }
               return s;
           }
       }
    
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");
            pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyStudentDeserializer.class.getName());
            KafkaConsumer<String, Student> consumer = new KafkaConsumer<String, Student>(pro);
            List<String> topics = Arrays.asList("topic_c","topic_a");
            consumer.subscribe(topics);
    
            while(true){
                ConsumerRecords<String, Student> records = consumer.poll(Duration.ofMillis(100));
    
                Iterator<ConsumerRecord<String, Student>> it = records.iterator();
    
                while(it.hasNext()){
                    ConsumerRecord<String, Student> record = it.next();
                    System.out.println(record.key()+"-->"+record.value());
                }
            }
        }
    }

5.6 consumer分区分配规则

00.背景
    消费者有的时候会少于或者多于分区的个数,
    那么如果消费者少了有的消费者要消费多个分区的数据,
    如果消费者多了,有的消费者就可能没有分区的数据消费
    ---------------------------------------------------------------------------------------------------------
    现在我们知道kafka中存在一个coordinator可以管理这么一堆消费者,
    它可以帮助一个组内的所有消费者进行分区的分配和对应,通过coordinator进行协调

00.通过coordinator进行协调的5种方式
    方式1:range分配器:按照范围形式进行分配分区数量
    方式2:roundRobin轮训分配策略
    方式3:sticky粘性分区
    方式4:CooperativeSticky分区
    方式5:指定分区消费数据

01.方式1:range分配器:按照范围形式进行分配分区数量
    a.步骤1
        # 为了演示分区的分配效果我们创建一个topic_d,设定为7个分区
        kafka-topics.sh --bootstrap-server nn1:9092 --topic topic_d --create --partitions 7 --replication-factor 2
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

            }
        });
    b.步骤2
        然后改版订阅代码,subscribe订阅信息的时候展示出来分区的对应映射关系,这个只是一个监控的作用没有其他的代码影响,ConsumerRebalanceListener增加监视
        其中存在两个比较直观的方法
        onPartitionsRevoked回收的分区
        onPartitionsAssigned分配的分区
        能够直观展示在分区分配的对应关系
        其中我们需要知道两个比较重要的参数
        -----------------------------------------------------------------------------------------------------
        | 参数                          | 解释                                 
        |-------------------------------|------------------------------------
        | offsets.topic.num.partitions  | __consumer_offset这个topic的分区数量默认50个 
        | heartbeat.interval.ms         | 消费者和协调器的心跳时间 默认3s                  
        | session.timeout.ms            | 消费者断开的超时时间 默认45s,最小不能小于6000        
        | partition.assignment.strategy | 设定分区分配策略                           
        -----------------------------------------------------------------------------------------------------
        也就是说我们想要直观查看消费者变化后的映射对应关系需要停止消费者以后45s才可以,这个在代码中我们需要人为设定小点,更加快速查看变化
    c.步骤3
        代码测试原理
        首先设定topic_d的分区为7个,然后启动一个组内的两个消费者,可以看到他们的分配关系在onPartitionsAssigned
        这个方法中打印出来,同时我们关闭一个消费者,可以看到onPartitionsRevoked可以展示回收的分区,
        onPartitionsAssigned以及这个方法中分配的分区
    d.步骤4
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.consumer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        import java.time.Duration;
        import java.util.*;
        
        public class Consumer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
                pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RangeAssignor.class.getName());
                //设定分区分配策略为range
                pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);
                //设定consumer断开超时时间最小不能小于6s
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                List<String> topics = Arrays.asList("topic_d");
                consumer.subscribe(topics, new ConsumerRebalanceListener() {
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        for (TopicPartition partition : partitions) {
                            System.out.println("revoke-->"+partition.topic()+"-->"+partition.partition());
                        }
                    }
        
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        for (TopicPartition partition : partitions) {
                            System.out.println("assign-->"+partition.topic()+"-->"+partition.partition());
                        }
                    }
                });
        
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    Iterator<ConsumerRecord<String, String>> it = records.iterator();
                    while(it.hasNext()){
                        ConsumerRecord<String, String> record = it.next();
                        System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                    }
                }
            }
        }
    e.步骤5
        我们执行两个实例,两个实例代表两个消费者位于同一个组中,那么两个消费者的分配关系按照,范围进行分割
        consumer0[0,1,2,3] consumer1[4,5,6]
        运行一个实例的时候可以看见,没有回收,因为第一次分配,所以第一个consumer消费所有的分区
        如果启动第二个消费者可以看到,第一个消费者要回收所有的已经分配给他的分区,
        然后重新将分区分配给consumer1和consumer2,因为coordinator的分配规则基于eager协议,
        这个协议的规则就是当分配关系发生变化的时候要全部回收然后打乱重分
        -----------------------------------------------------------------------------------------------------
        缺点:
        这个协议只是按照范围形式进行重新分配分区,会造成单个消费者的压力过大问题,多个topic就会不均匀,会造成分配不均匀的问题

02.方式2:roundRobin轮训分配策略
    a.介绍
        轮训形式分配分区,一个消费者一个分区
    b.代码
        pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RoundRobinAssignor.class.getName());
    c.优点
        同range方式相比,在多个topic的情况下,可以保证多个consumer负载均衡
        分配规则如上图,一人一个轮训形式
        consumer0 [0 2 4 6 1 3 5]
        consumer1 [1 3 5 0 2 4 6]
    d.缺点
        不管是range的还是roundRobin的分配方式都是全量收回打乱重新分配,这样的效率太低,所以我们使用下面的粘性分区策略

03.方式3:sticky粘性分区
    a.介绍
        粘性分区它的重新分区原理和原来的roundRobin的分区方式差不多,但是又不相同,主要是分区逻辑一样,
        但是重新分配分区的时候优先保留原分区,然后重新分配其他分区,从而不需要全部打乱重分,减少重新分配分区消耗
        分区分配方式一样,但是如果重新分配的话会有很多原来分区的预留,重新分配新的分区
    b.操作1
        # 为了演示效果再次创建新的topic topic_e 七个分区
        kafka-topics.sh --bootstrap-server nn1:9092 --topic topic_e --create --partitions 7 --replication-factor 2
        // 修改为粘性分区
        pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());
        List<String> topics = Arrays.asList("topic_d","topic_e");
    c.操作2
        并且运行应用实例分别运行1 ,2 ,3 多种个数的实例
    d.结论
        我们发现多个实例的运行时候,优先保留之前的分区规则,然后重新分配,但是优先以分区分配均衡为主
        以上三种都基于eager协议,也就是想要重新分配分区一定要将原来的所有分区回收,全部打乱重新,即使保留原来的分区规则,
        也需要全部都回收分区,这样效率非常低下,最后一种CooperativeSticky分区策略完全打破以上三种的分区关系
    
04.方式4:CooperativeSticky分区
    a.介绍
        以粘性为主,但是不全部收回分区,只是将部分需要重新分配的分区重新调配,效率高于以上三种分区策略
    b.操作
        //设定分区策略
        pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,CooperativeStickyAssignor.class.getName());
    c.结论
        运行两个实例,查看控制台信息发现
        整个分区分配规则和粘性分区策略一致,但是并不需要收回全部分区
        系统默认分区分配规则为:
        range+CooperativeSticky
        范围分区为主,优先粘性并且不急于eager协议

05.方式5:指定分区消费数据
    a.介绍
        在计算处理过程中,有时候我们需要指定一个消费者组消费指定的分区,计算其中的数据,
        这个时候以上的所有分区策略都不符合我们人为的要求,我们需要指定相应的分区进行消费
    b.操作
        //用指定的方式定向消费相应的分区数据
        consumer.assign();
    c.代码
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.consumer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        import java.time.Duration;
        import java.util.*;
        
        public class ConsumerAssginPartition {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
                pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,CooperativeStickyAssignor.class.getName());
                pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                List<TopicPartition> list = Arrays.asList(
                        new TopicPartition("topic_d", 0),
                        new TopicPartition("topic_e", 0)
                );
                consumer.assign(list);
        
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    Iterator<ConsumerRecord<String, String>> it = records.iterator();
                    while(it.hasNext()){
                        ConsumerRecord<String, String> record = it.next();
                        System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                    }
                }
            }
        }
    d.结论
        我们只消费topic_e的0号分区和topic_d的0号分区
        生产者向两个topic的0号分区发送数据
        消费者可以直接消费其中的数据

5.7 consumer偏移量

01.偏移量的概念
    消费者在消费数据的时候需要将消费的记录存储到一个位置,防止因为消费者程序宕机而引起断点消费数据丢失问题,
    下一次可以按照相应的位置从kafka中找寻数据,这个消费位置记录称之为偏移量offset
    ---------------------------------------------------------------------------------------------------------
    kafka0.9以前版本将偏移量信息记录到zookeeper中
    新版本中偏移量信息记录在__consumer_offsets中,这个topic是系统生成的,不仅仅帮助管理偏移量信息还能分配consumer给哪个coordinator管理,是一个非常重要的topic
    它的记录方式和我们知道的记录方式一样 groupid + topic + partition ==> offset
    其中存储到__consumer_offsets中的数据格式也是按照k-v进行存储的,其中k是groupid + topic + partition value值为offset的偏移量信息
    可以看到系统生成的topic
    因为之前我们消费过很多数据,现在可以查看一下记录在这个topic中的偏移量信息
    ---------------------------------------------------------------------------------------------------------
    其中存在一个kafka-consumer-groups.sh 命令
    # 查看消费者组信息
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    # 查询具体信息
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
    # 查看活跃信息
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
    ---------------------------------------------------------------------------------------------------------
    当前组消费偏移量信息
    GROUP:组名 
    TOPIC:topic信息           
    PARTITION:分区  
    CURRENT-OFFSET:当前消费偏移量  
    LOG-END-OFFSET:这个分区总共存在多少数据  
    LAG:还差多少没消费             
    CONSUMER-ID:随机消费者id                                                  
    HOST:主机名            
    CLIENT-ID:客户端id
    ---------------------------------------------------------------------------------------------------------
    查询__consumer_offset中的原生数据
    kafka-console-consumer.sh  --bootstrap-server nn1:9092 \
    --topic __consumer_offsets --from-beginning --formatter \
    kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter
    ---------------------------------------------------------------------------------------------------------
    使用元数据格式化方式查看偏移量信息数据
    结果如下key展示的是groupid,topic,partition value值展示的是当前的偏移量信息,并且在这个topic中是追加形式一致往里面写入的

02.偏移量的自动管理
    a.介绍
        那么我们已经看到了偏移量的存储但是偏移量究竟是怎么提交的呢?
    b.操作
        首先我们没有设置任何的偏移量提交的代码,这个是默认开启的,其中存在两个参数
        //开启自动提交偏移量信息
        pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //默认提交间隔5s
        pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    c.细节
        官网的设置参数为两个true和5000
        所以我们在没有开启默认提交的时候已经自动提交了
        为了演示自动提交的效果我们引入一个参数
        auto.offset.reset
        这个参数用于控制没有偏移量存储的时候,应该从什么位置进行消费数据
        -----------------------------------------------------------------------------------------------------
        其中参数值官网中给出三个
        [latest, earliest, none]
        latest:从最新位置消费
        earliest:最早位置消费数据
        none:如果不指定消费的偏移量直接报错
        一定要记得一点,如果有偏移量信息那么以上的设置是无效的
    d.使用
        现在我们设置读取位置为最早位置,并且消费数据,看看可不可以记录偏移量,断点续传
        思路:
            首先修改组id为一个新的组,然后从最早位置消费数据,如果记录了偏移量,那么重新启动消费者会看到,没有任何数据,因为之前记录了消费数据的位置
        整体代码如下:
            package com.hainiu.kafka;

            import org.apache.kafka.clients.consumer.*;
            import org.apache.kafka.common.TopicPartition;
            import org.apache.kafka.common.serialization.StringDeserializer;

            import java.time.Duration;
            import java.util.*;

            public class Consumer1 {
                public static void main(String[] args) {
                    Properties pro = new Properties();
                    pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                    pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new_group");
                    pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
                    pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                    pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

                    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                    List<String> topics = Arrays.asList("topic_d","topic_e");
                    consumer.subscribe(topics);

                    while (true){
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                        Iterator<ConsumerRecord<String, String>> it = records.iterator();
                        while(it.hasNext()){
                            ConsumerRecord<String, String> record = it.next();
                            System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                        }
                    }
                }
            }
        运行完毕打印数据
            这个时候我们需要在5s之内关闭应用,然后重新启动,因为提交的间隔时间是5s
        再次启动
            我们发现数据依旧被消费出来了,证明之前的偏移量存储没有任何效果和作用,因为间隔时间是5s
            现在我们等待5s后在关闭应用
            发现没有任何数据产生,因为偏移量已经提交了

03.偏移量的手动提交
    a.介绍
        如上的案例我们发现偏移量的管理如果交给系统自己管理,我们没有办法及时的修改和管理偏移量信息,
        这个时候我们需要手动来提交给管理偏移量,更加及时和方便
    b.引入两个方法
        consumer.commitAsync();
        consumer.commitSync();
        -----------------------------------------------------------------------------------------------------
        commitAsync 异步提交方式:只提交一次,不管成功与否不会重试
        commitSync 同步提交方式:同步提交方式会一直提交到成功为止
    c.操作
        一般我们都会选择异步提交方式,他们的功能都是将拉取到的一整批数据的最大偏移量直接提交到__consumer_offsets中,
        但是同步方式会很浪费资源,异步方式虽然不能保证稳定性但是我们的偏移量是一直递增存储的,
        所以偶尔提交不成功一个两个不影响我们的使用
    d.代码
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.consumer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        import java.time.Duration;
        import java.util.*;
        
        public class Consumer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new_group1");
                pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
                pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //        pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                List<String> topics = Arrays.asList("topic_d","topic_e");
                consumer.subscribe(topics);
        
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    Iterator<ConsumerRecord<String, String>> it = records.iterator();
                    while(it.hasNext()){
                        ConsumerRecord<String, String> record = it.next();
                        System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                    }
                    consumer.commitAsync();
        //            consumer.commitSync();
                }
            }
        }
    e.再操作
        现在先在topic中输入部分数据
        然后启动消费者,当存在数据打印的时候马上关闭掉应用,在此启动会发现数据不会重新消费
        现在启动应用,修改组id
        打印完毕数据立即重新启动应用
        偏移量已经提交不会重复消费数据

04.断点消费数据
    a.介绍
        在没有偏移量的时候我们可以设定
        auto.offset.reset进行数据的消费
        可选参数有 latest earliest none等位置
        但是如果存在偏移量以上的设定就不在好用了,我们需要根据偏移量的位置进行断点消费数据
        但是有的时候我们需要指定位置消费相应的数据
    b.方法
        //可以指定位置进行数据的检索
        onsumer.seek();
    c.操作
        但是我们不能随意的指定消费者消费数据的位置,因为在启动消费者的时候,一个组中会存在多个消费者,
        每个人拿到的对应分区是不同的,所以我们需要知道这个消费者能够获取的分区是哪个,然后再指定相应的断点位置
        -----------------------------------------------------------------------------------------------------
        这里我们就需要监控分区的方法展示出来所有订阅的分区信息
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

            }
        });
    d.为了演示效果我们使用生产者在topic_d中增加多个消息
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;
        import org.apache.kafka.common.serialization.StringSerializer;
        
        import java.util.Properties;
        
        public class Producer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
                for (int i = 0; i < 1000; i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic_d", "" + i, "message"+i);
                    producer.send(record);
                }
                producer.close();
            }
        }
    e.随机发送数据到不同的节点,使用随机k,然后使用断点消费数据,不设置任何的偏移量提交操作和断点位置
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.consumer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        import java.time.Duration;
        import java.util.*;
        
        public class ConsumerWithUDOffset {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new1");
                pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
                pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);
                pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        
        
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                List<String> topics = Arrays.asList("topic_e");
                // range roundRobin sticky cooperativeSticky
                consumer.subscribe(topics, new ConsumerRebalanceListener() {
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        
                    }
        
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        for (TopicPartition topicPartition : collection) {
                            consumer.seek(topicPartition,195);
                        }
                    }
                });
        
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    Iterator<ConsumerRecord<String, String>> it = records.iterator();
                    while(it.hasNext()){
                        ConsumerRecord<String, String> record = it.next();
                        System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                    }
                    consumer.commitAsync();
                }
            }
        }

05.时间断点
    a.介绍
        kafka没有给大家提供直接根据时间找到断点位置的方法,我们需要根据时间找到偏移量,然后根据偏移量进行数据消费
    b.方法
        //通过这个方法找到对应时间的偏移量位置
        consumer.offsetsForTimes();
        //然后在通过这个方法根据断点进行消费数据
        consumer.seek();
    c.思路
        找寻一个时间点,然后将它转换为时间戳,放入到consumer.offsetsForTimes();中找寻这个偏移量的位置,
        然后在根据偏移量seek到数据,一定要注意判断这个偏移量是否为空
    d.代码
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.consumer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        import java.time.Duration;
        import java.util.*;
        
        public class Consumer1 {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new_group221");
                pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                List<String> topics = Arrays.asList("topic_e");
                consumer.subscribe(topics, new ConsumerRebalanceListener() {
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        // no op
                    }
        
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        HashMap<TopicPartition, Long> map = new HashMap<>();
                        for (TopicPartition partition : partitions) {
                            map.put(partition,1675076400000L);
                            //将时间和分区绑定在一起,然后合并在一起放入到检索方法中
                        }
                        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);
                        //根据时间获取时间对应的偏移量位置
                        for (Map.Entry<TopicPartition, OffsetAndTimestamp> en : offsets.entrySet()) {
                            System.out.println(en.getKey()+"-->"+en.getValue());
                            if(en.getValue() != null){
                                consumer.seek(en.getKey(),en.getValue().offset());
                                //获取每个分区的偏移量的位置,使用seek进行找寻数据
                            }
                        }
        
                    }
                });
        
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    Iterator<ConsumerRecord<String, String>> it = records.iterator();
                    while(it.hasNext()){
                        ConsumerRecord<String, String> record = it.next();
                        System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                    }
        //            consumer.commitAsync();
                }
            }
        }

06.人为托管偏移量
    a.介绍
        但是偏移量信息默认是存储在__consumer_offsets中的,有的时候我们需要进行偏移量的直观查看和管理,
        这个数据是没有办法做任何操作的,所以我们需要人为换介质存储和处理偏移量信息,
        这个时候我们会选择外部存储比如redis或者mysql等等存储偏移量信息
    b.存储偏移量信息到redis中,整体思路:
        不使用系统的偏移量提交和保存方式,都关闭掉
        在ConsumerRebalanceListener监听方法中获取到相应的分区信息
        一启动应用就去redis中查询对应的偏移量数据,然后从断点位置seek
        下面消费数据的时候每次消费数据都要提交偏移量信息存储到redis进行更新
        redis的key设定为 [group-topic-partition] value值设定为[offsets]
    c.代码
        package com.hainiu.kafka;
        
        import org.apache.kafka.clients.consumer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        import redis.clients.jedis.Jedis;
        
        import java.time.Duration;
        import java.util.*;
        
        public class ConsumerWithOffsets2Redis {
            public static void main(String[] args) {
                Properties pro = new Properties();
                String groupId = "redis_group";
                pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
                pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
                List<String> topics = Arrays.asList("topic_e");
                Jedis jedis = new Jedis("11.99.16.109");
                consumer.subscribe(topics, new ConsumerRebalanceListener() {
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        // no op
                    }
        
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        for (TopicPartition partition : partitions) {
                            String topic = partition.topic();
                            int part = partition.partition();
                            String key = groupId+"_"+topic+"_"+part;
                            String offsetstr = jedis.get(key);
                            if(offsetstr != null){
                                consumer.seek(partition,Long.valueOf(offsetstr));
                            }
                        }
                    }
                });
        
                Map<String,Long> offsetMap = new HashMap<>();
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    Iterator<ConsumerRecord<String, String>> it = records.iterator();
                    while(it.hasNext()){
                        ConsumerRecord<String, String> record = it.next();
                        System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                        String key = groupId+"_"+record.topic()+"_"+record.partition();
                        offsetMap.put(key,record.offset());
                    }
                    for (Map.Entry<String, Long> en : offsetMap.entrySet()) {
                        jedis.set(en.getKey(),en.getValue().toString());
                    }
                }
            }
        }

5.8 附:参数调整

01.配置重要参数
    参数                 解释
    bootstrap.servers    集群地址
    key.deserializer     key反序列化器
    value.deserializer   value反序列化器
    group.id             消费者组id

02.消费者对象订阅相应的topic然后拉取其中的数据进行消费
    package com.hainiu.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Properties;
    
    public class Consumer1 {
        public static void main(String[] args) {
            Properties pro = new Properties();
            pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
            pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
            //设定组id
            pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            //设定key的反序列化器
            pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            //设定value的反序列化器
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
            List<String> topics = Arrays.asList("topic_a,topic_b");
            //一个消费者可以消费多个分区的数据
            consumer.subscribe(topics);
            //订阅这个topic
            while (true){
                //死循环要一直消费数据
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                //间隔一秒钟消费一次数据,拉取一批数据过来
                Iterator<ConsumerRecord<String, String>> it = records.iterator();
                while(it.hasNext()){
                    ConsumerRecord<String, String> record = it.next();
                    System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
                }
            }
        }
    }

6 Kafka使用

6.1 命令

01.Kafka2.2.0及以上版本
    a.创建一个名为new-topic的主题,具有3个分区和2个副本
        kafka-topics.sh --create --topic new-topic --bootstrap-server 192.168.185.150:9092 --partitions 3 --replication-factor 2
        ---------------------------------------------------------------------------------------------------------
        # alter修改topic
        kafka-topics.sh --bootstrap-server 192.168.185.150:9092 --alter --topic topic_a --partitions 2
        # 分区数量不能减少只能增加,减少分区会使得数据丢失
        kafka-topics.sh --bootstrap-server 192.168.185.150:9092 --alter --topic topic_a --partitions 4
        -----------------------------------------------------------------------------------------------------
        # 修改副本数据
        # 在创建完毕的topic以后,我们在使用的时候可能会遇见,副本不足的情况,这个时候我们可以动态增加topic的副本数量,但是增加的副本数量要在原有的基础上进行增加
        # 首先我们创建一个json文件,用于支配topic的分区副本和节点的对应关系
        vim json.txt
        # 以原有分区分配的策略基础之上做二次变化
        {"partitions":[{"topic":"topic_a","partition":0,"replicas":[4,3,2]},{"topic":"topic_a","partition":1,"replicas":[1,0,2]},{"topic":"topic_a","partition":2,"replicas":[2,4,0]},{"topic":"topic_a","partition":3,"replicas":[2,3,1]}],"version":1}
        # 执行重新分配命令
        kafka-reassign-partitions.sh --bootstrap-server 192.168.185.150:9092 --reassignment-json-file json.txt --execute
    b.删除主题
        kafka-topics.sh --delete --topic new-topic --bootstrap-server 192.168.185.150:9092
    c.放入数据(生产者)
        kafka-console-producer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092
        其中:
        --topic new-topic:指定主题名称为new-topic。
        --bootstrap-server localhost:9092:Kafka broker的地址,假设broker运行在localhost,端口是9092。
        -----------------------------------------------------------------------------------------------------
        输入数据:启动后,你会看到一个提示符,可以直接在终端输入消息并按回车键发送。每条输入的消息都会被发送到new-topic。示例:
        > Hello Kafka
        > This is a test message
        每条消息输入后都会立即发送到Kafka的new-topic中。
        -----------------------------------------------------------------------------------------------------
        Ctrl + C 或 Ctrl + D 是结束会话的标准方法,并不会对Kafka或生产者造成负面影响。
    d.查看数据(消费者)
        # 从头开始消费
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092 --from-beginning
        # 指定分区,并且消费历史数据
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092 --from-beginning --partition 2
        -----------------------------------------------------------------------------------------------------
        Hello Kafka
        This is a test message
        -----------------------------------------------------------------------------------------------------
        Ctrl + C 或 Ctrl + D 是结束会话的标准方法,并不会对Kafka或生产者造成负面影响。
    e.使用--bootstrap-server
        a.示例
            kafka-topics.sh --create --topic new-topic --bootstrap-server 192.168.185.150:9092 --partitions 3 --replication-factor 2
        b.说明
            Kafka 版本:这个命令适用于 Kafka 2.2.0 及以上版本(从 2.2.0 开始,Kafka 引入了 KRaft 模式,不再依赖 Zookeeper)。
            -------------------------------------------------------------------------------------------------
            参数说明:
            --bootstrap-server:指定 Kafka Broker 的地址,用于连接 Kafka 集群。
            --create:创建一个新主题。
            --topic:指定主题的名称。
            --partitions:设置分区数量。
            --replication-factor:设置副本因子(每个分区的副本数量)。
            -------------------------------------------------------------------------------------------------
            Zookeeper:这种方式不需要直接与 Zookeeper 交互,因为 Kafka 已经将管理主题的操作转移到 Broker 上(尤其是在 KRaft 模式下)。
            使用场景:适用于现代 Kafka 环境,其中 Kafka 集群可以直接管理元数据,无需通过 Zookeeper 进行。

02.Kafka2.1及以下版本
    a.在主节点上创建主题TestTopic,具有3个分区和2个副本
        kafka-topics.sh --zookeeper 192.168.185.150:2181,192.168.185.151:2181,192.168.185.152:2181 --topic TestTopic --replication-factor 2 --partitions 3 --create
    b.删除主题
        kafka-topics.sh --zookeeper 192.168.185.150:2181,192.168.185.151:2181,192.168.185.152:2181 --topic TestTopic --delete
    c.放入数据(在主节点上启动一个生产者)
        kafka-console-producer.sh --broker-list 192.168.185.150:9092,192.168.185.151:9092,192.168.185.152:9092 --topic TestTopic
        -----------------------------------------------------------------------------------------------------
        --broker-list(在 Kafka 2.1 及以下版本中使用):指定 Kafka Broker 的地址和端口,多个 Broker 地址用逗号分隔。这些地址用于连接到 Kafka 集群,以便生产者能够将消息发送到集群。在 Kafka 2.2 及以上版本中,这个参数被 --bootstrap-server 替代,但功能相同。
        --topic TestTopic:指定要向其发送消息的 Kafka 主题名称。TestTopic 是目标主题的名称,消息将被发送到这个主题中。
    d.查看数据(在三个节点上分别创建消费者)
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092 --from-beginning
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.151:9092 --from-beginning
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.152:9092 --from-beginning
    e.使用--zookeeper
        a.示例
            kafka-topics.sh --zookeeper 192.168.185.150:2181,192.168.185.151:2181,192.168.185.152:2181 --topic TestTopic --replication-factor 2 --partitions 3 --create
        b.说明
            Kafka 版本:这个命令适用于 Kafka 2.1 及以下版本(在 2.2 之前,Kafka 使用 Zookeeper 来管理集群的元数据)。
            -------------------------------------------------------------------------------------------------
            参数说明:
            --zookeeper:指定 Zookeeper 集群的地址,用于连接 Zookeeper。
            --create:创建一个新主题。
            --topic:指定主题的名称。
            --partitions:设置分区数量。
            --replication-factor:设置副本因子(每个分区的副本数量)。
            -------------------------------------------------------------------------------------------------
            Zookeeper:在这种模式下,Kafka 使用 Zookeeper 来管理和协调主题的元数据。Zookeeper 负责跟踪 Kafka 集群的状态,包括主题、分区、副本等信息。

03.常用命令(Kafka2.2.0及以上版本)
    a.查看Kafka集群中所有broker的状态
        kafka-broker-api-versions.sh --bootstrap-server 192.168.185.150:9092
    b.查看Kafka主题列表
        kafka-topics.sh --list --bootstrap-server 192.168.185.150:9092
    c.查看主题的详细信息
        kafka-topics.sh --describe --topic new-topic --bootstrap-server 192.168.185.150:9092
        -----------------------------------------------------------------------------------------------------
        Topic: test-topic  PartitionCount: 3  ReplicationFactor: 2  Configs:
            Topic: test-topic  Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
            Topic: test-topic  Partition: 1  Leader: 2  Replicas: 2,3  Isr: 2,3
            Topic: test-topic  Partition: 2  Leader: 3  Replicas: 3,1  Isr: 3,1
        -----------------------------------------------------------------------------------------------------
        主题的数据存放在Kafka集群的分区中。可以通过查看每个分区的详细信息来了解数据的分布情况。
        PartitionCount:分区数量
        ReplicationFactor:副本因子
        Leader:当前分区的主副本所在的broker
        Replicas:所有副本所在的broker
        Isr:同步副本(In-Sync Replica)集合
    d.生产者和消费者命令
        a.启动生产者:用于向指定的主题发送消息。
            kafka-console-producer.sh --topic <topic_name> --bootstrap-server <broker_host>:<port>
            启动后可以在控制台直接输入消息并按回车发送。
        b.启动消费者:读取主题的消息并输出到控制台。
            kafka-console-consumer.sh --topic <topic_name> --bootstrap-server <broker_host>:<port> --from-beginning
            其中--from-beginning表示从头开始消费所有消息。
    e.消费者组命令
        重置消费者组偏移量:重置消费者组的消费偏移量到指定的位置(如最新、最旧或某个时间点)。这在重新处理消息或跳过消息时非常有用。
        kafka-consumer-groups.sh --bootstrap-server <broker_host>:<port> --group <group_name> --reset-offsets --to-earliest --execute --topic <topic_name>
        其他可选参数:
        --to-latest:重置到最新的偏移量。
        --shift-by -2:向前或向后移动指定的偏移量数量。
        --to-offset <offset>:重置到指定的偏移量。
        --by-duration PT10M:重置到10分钟前的时间。
    f.分区管理命令
        增加主题分区数:增加主题的分区数量,注意分区数只能增加不能减少。
        kafka-topics.sh --alter --topic <topic_name> --partitions <new_partition_count> --bootstrap-server <broker_host>:<port>
    g.查看Kafka集群信息
        查看Kafka的元数据:查看Kafka集群元数据,包括主题、分区、领导者信息等
        kafka-metadata-quorum.sh --describe --bootstrap-server <broker_host>:<port>
        -----------------------------------------------------------------------------------------------------
        检查Kafka集群控制器状态:用于检查Kafka集群中的控制器节点(负责集群管理任务的节点)
        bin/kafka-leader-election.sh --bootstrap-server <broker_host>:<port> --describe
    h.Kafka ACL(访问控制列表)管理命令
        创建ACL:用于给用户或IP分配对主题的读写权限。
        kafka-acls.sh --bootstrap-server <broker_host>:<port> --add --allow-principal User:<user_name> --operation <read|write> --topic <topic_name>
        -----------------------------------------------------------------------------------------------------
        查看ACL:查看Kafka当前配置的ACL。
        kafka-acls.sh --bootstrap-server <broker_host>:<port> --list
        -----------------------------------------------------------------------------------------------------
        删除ACL:删除指定的ACL规则。
        kafka-acls.sh --bootstrap-server <broker_host>:<port> --remove --allow-principal User:<user_name> --operation <read|write> --topic <topic_name>
    i.Kafka日志管理命令
        查看Kafka日志清理状态:Kafka有日志清理机制,可以手动触发或查看清理状态。
        bin/kafka-log-dirs.sh --describe --bootstrap-server <broker_host>:<port>
    j.检查和修复Kafka工具
        Kafka检查工具(Check Tool):用于检查Kafka日志文件的完整性和一致性。
        kafka-check.sh --broker-id <broker_id> --controller-only --bootstrap-server <broker_host>:<port>
        -----------------------------------------------------------------------------------------------------
        Kafka重新分配分区工具:可以根据集群的负载情况,重新分配分区。
        kafka-reassign-partitions.sh --bootstrap-server <broker_host>:<port> --execute --reassignment-json-file <file_path>
    k.镜像工具
        镜像创建和管理工具(Mirror Maker):用于跨集群同步消息,可以在集群之间复制数据。
        kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --num.streams <number_of_streams>

6.2 使用1:API

00.总结
    a.操作
        经测试,
        必须先启动 消费者,再启动 生产者
        如若先启动 生产者,再启动 消费者,会导致数据无法被消费到
        -----------------------------------------------------------------------------------------------------
        若消费不成功,修改【生产者主题】【消费者组ID】
        private static final String TOPIC = "test1-topic";
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "object-group2");
    b.支持的数据类型
        Kafka 主要以字节数组(byte arrays)形式存储消息,但通过序列化和反序列化机制,可以支持多种数据类型。常见的数据格式包括:
        字符串(String):"user_signup", "order_created"
        JSON:{"userId": "12345", "action": "signup"}
        Avro:使用 Avro Schema 定义复杂数据结构,适用于大数据生态系统。
        Protobuf:Google 的 Protocol Buffers,用于高效的二进制序列化。
    c.数据传输模式
        a.Stream 流
            Kafka Streams:提供强大的流处理能力,支持实时数据处理和转换。
            示例:实时日志处理、实时ETL(Extract, Transform, Load)任务。
        b.批量发送
            Producer Batch Sending:Kafka 生产者可以将多条消息打包在一个批次中发送,提高吞吐量。
            示例:将多个用户事件批量发送到 Kafka 主题,提高网络利用率。
    d.消息传递模式
        a.一对一(Point-to-Point)
            消费者组(Consumer Groups):每条消息只会被一个消费者实例消费,类似于点对点模式。
            示例:订单处理系统中,每个订单只被一个服务实例处理。
        b.一对多(Publish/Subscribe)
            发布/订阅模型:同一消息可以被多个消费者组独立消费,适用于广播场景。
            示例:日志广播,多个系统需要接收相同的日志数据。
    e.事务机制
        Kafka 支持事务,确保一组消息的原子性操作(即“要么全部成功,要么全部失败”),实现严格的一致性。
        事务支持:跨多个主题或分区的消息发送,确保分布式事务的一致性。
    f.消费者
        a.Pull 消费者
            Kafka 默认使用拉取(pull)模式,消费者主动从 Kafka 中拉取消息。
            偏移量管理:Kafka 自动管理消费者的偏移量,也可以手动提交偏移量以实现更精细的控制。
            定时调度拉取:虽然 Kafka 本身不直接提供定时调度拉取,但可以通过客户端代码实现定时拉取逻辑。
        b.Push 消费者
            Kafka 实际上是基于拉取模型,但通过客户端库(如 Kafka Streams 或使用回调机制)可以实现类似推送的效果。
        c.标签订阅
            Kafka 不直接支持基于标签的订阅,但可以通过使用不同的主题或消息过滤在消费者端实现类似功能。
        d.消费者的消息模型
            集群模式(默认):消费者组内的多个消费者实例协同消费不同分区的数据,实现负载均衡。
            广播模式:每个消费者实例独立消费所有分区的数据,适用于广播场景。可以通过将每个实例放在不同的消费者组中实现。
        e.监听器:并发消费
            Kafka 支持多线程消费,消费者客户端可以配置多个线程并发处理消息,提高吞吐量。
        f.消费类型
            迭代消费:消费者持续不断地消费来自 Kafka 的消息流。
            一次性消费:消费者在特定时间点消费一批消息后停止,适用于批处理任务。
        g.其他常见API
            Kafka Streams API:用于复杂的流处理和实时计算。
            Kafka Connect API:用于数据源与 Kafka 之间的集成,实现数据的高效导入和导出。
    g.生产者
        a.消息延迟
            Kafka 原生不支持延迟消息,但可以通过以下方式实现:
            a.延迟队列模式
                使用定时任务或外部调度系统将消息在指定时间后发送到 Kafka。
            b.使用 Kafka Streams 或其他流处理框架
                实现基于时间的消息处理逻辑。
        b.消息类型
            a.同步消息
                生产者在发送消息时等待 Kafka 确认,确保消息已被成功写入。
            b.异步消息
                生产者发送消息后不等待确认,通过回调机制处理结果,提高吞吐量。
            c.单向消息
                生产者发送消息后不关心结果,适用于日志记录等无需确认的场景。
    h.消费者的消息顺序
        a.局部顺序消费
            生产者:通过分区键(partition key)确保相关消息发送到同一分区,从而保持顺序。
            消费者:单线程消费特定分区的消息,使用 Kafka 的有序性保证(Kafka 保证单个分区内消息的顺序)。
        b.示例
            生产者(MyProducerOrder):根据订单 ID 作为分区键发送消息,确保同一订单的消息有序。
            消费者(MyConsumerOrder):单线程或使用 KafkaConsumer 的顺序消费模式处理订单消息。
    i.批量发送消息
        生产者(MyProducerBatch)
        将多条消息封装到一个批次中,通过 Kafka 生产者的批量发送功能一次性发送,提高效率。
        示例:将一批用户行为事件打包后发送到 Kafka 主题,减少网络请求次数。

00.总结
    a.Kafka生产者
        各种类型:文本数据、JSON数据、CSV数据、日志数据
        producer确认机制
        producer自定义拦截器
        producer自定义序列化器:String类型序列化器,将 String 类型数据转换为字节数组进行传输。
        producer自定义序列化器:对象整体序列化,使用 ObjectOutputStream 将整个 Java 对象序列化为字节数组,方便发送自定义的对象数据。
        producer自定义序列化器:手动序列化数据对象,通过手动拼接对象的属性,将其转换为字符串,再转为字节数组。这种方法更灵活,可以自定义序列化格式。
        producer自定义分区器
        producer一致性和ACK
        producer事务
    b.Kafka消费者
        默认,订阅模式
        consumer自定义序列化器
        consumer指定分区消费数据
        consumer偏移量:强制重复消费

01.Kafka生产者
    a.各种类型:文本数据、JSON数据、CSV数据、日志数据
        a.生产者
            public class Demo01Producer {
                private static final String TOPIC = "test1-topic"; // 主题名称
            
                public static void main(String[] args) {
                    // 配置生产者属性
                    Properties props = new Properties();
                    props.put("bootstrap.servers", "192.168.185.150:9092");
                    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            
                    // 创建生产者
                    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            
                    // 发送基本消息
                    for (int i = 0; i < 10; i++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
                        sendWithCallback(producer, record);
                    }
            
                    // 发送文本数据
                    ProducerRecord<String, String> textRecord = new ProducerRecord<>(TOPIC, "textKey", "Hello, Kafka!");
                    sendWithCallback(producer, textRecord);
            
                    // 发送 JSON 数据
                    String json = "{\"name\":\"Alice\", \"age\":30}";
                    ProducerRecord<String, String> jsonRecord = new ProducerRecord<>(TOPIC, "jsonKey", json);
                    sendWithCallback(producer, jsonRecord);
            
                    // 发送 CSV 数据
                    String csv = "name,age\nAlice,30\nBob,25";
                    ProducerRecord<String, String> csvRecord = new ProducerRecord<>(TOPIC, "csvKey", csv);
                    sendWithCallback(producer, csvRecord);
            
                    // 发送日志数据
                    String logMessage = "INFO: User logged in at " + System.currentTimeMillis();
                    ProducerRecord<String, String> logRecord = new ProducerRecord<>(TOPIC, "logKey", logMessage);
                    sendWithCallback(producer, logRecord);
            
                    // 关闭生产者
                    producer.close();
                }
            
                // 发送消息并附加回调
                private static void sendWithCallback(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
                    producer.send(record, new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                            } else {
                                System.out.println("Sent message: (" + record.key() + ", " + record.value() + ") to partition " +
                                        metadata.partition() + " with offset " + metadata.offset());
                            }
                        }
                    });
                }
            }
        b.消费者
            public class Demo01Consumer {
                private static final String TOPIC = "test1-topic"; // 主题名称
            
                public static void main(String[] args) {
                    // 配置消费者属性
                    Properties props = new Properties();
                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-tmp");
                    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            
                    // 创建消费者
                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                    consumer.subscribe(Collections.singletonList(TOPIC));
            
                    // 轮询消息
                    while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //            if (records.isEmpty()) {
            //                System.out.println("No more messages, exiting...");
            //                break; // 退出循环
            //            }
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.printf("Consumed text message: (key: %s, value: %s) at offset %d%n", record.key(), record.value(), record.offset());
                        }
                    }
                }
            }
    b.producer确认机制
        a.生产者
            public class Demo02Producer {
                public static void main(String[] args) {
                    Properties properties = new Properties();
                    // 配置 Kafka 集群的地址
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    // 配置键的序列化方式
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    // 配置值的序列化方式
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    // 设置 ack 的级别为 "all",表示所有副本都必须确认接收
                    properties.put(ProducerConfig.ACKS_CONFIG, "all");
                    // 设置重试次数为 3
                    properties.put(ProducerConfig.RETRIES_CONFIG, 3);
                    // 设置每个批次的大小为 16 KB
                    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
                    // 设置消息发送延迟时间为 0,立即发送
                    properties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
            
                    // 创建 Kafka 生产者
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
                    // 创建要发送的消息记录
                    ProducerRecord<String, String> record = new ProducerRecord<>("test2-topic", "this is hainiu");
            
                    for (int i = 0; i < 5; i++) {
                        // 发送消息并添加回调
                        producer.send(record, new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata metadata, Exception exception) {
                                // 获取发送的元数据信息
                                String topic = metadata.topic();
                                int partition = metadata.partition();
                                long offset = metadata.offset();
                                // 判断是否发送成功
                                if (exception == null) {
                                    System.out.println("成功发送消息到 topic: " + topic + ", 分区: " + partition + ", 偏移量: " + offset);
                                } else {
                                    System.out.println("发送消息失败: " + exception.getMessage());
                                }
                            }
                        });
                    }
                    // 关闭生产者
                    producer.close();
                }
            }
        b.消费者
            public class Demo02Consumer {
                public static void main(String[] args) {
                    Properties properties = new Properties();
                    // 配置 Kafka 集群的地址
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    // 设置消费者组 ID
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
                    // 配置键的反序列化方式
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    // 配置值的反序列化方式
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    // 设置自动偏移量重置为最早
                    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            
                    // 创建 Kafka 消费者
                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
                    // 订阅指定的主题
                    consumer.subscribe(Collections.singletonList("test2-topic"));
            
                    while (true) {
                        // 拉取消息
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, String> record : records) {
                            // 打印接收到的消息
                            System.out.printf("接收到消息: topic=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",
                                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        }
                    }
                }
            }
    c.producer自定义拦截器
        a.生产者
            public class Demo03Producer {
            
                // 自定义拦截器
                public static class MyInterceptor implements ProducerInterceptor<String, String> {
            
                    @Override
                    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
                        // 获取当前消息内容
                        String value = record.value();
                        // 获取当前时间戳
                        Long timestamp = new Date().getTime();
                        // 获取消息的主题
                        String topic = record.topic();
                        // 构建新的消息,附加时间戳
                        return new ProducerRecord<>(topic, record.partition(), record.key(), timestamp + "-->" + value);
                    }
            
                    @Override
                    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                        // 获取确认应答,类似于 producer 的逻辑
                        String topic = metadata.topic();
                        int partition = metadata.partition();
                        long offset = metadata.offset();
                        if (exception == null) {
                            System.out.println("Message sent successfully to topic: " + topic + ", partition: " + partition + ", offset: " + offset);
                        } else {
                            System.out.println("Failed to send message to topic: " + topic + ", partition: " + partition + ", offset: " + offset);
                        }
                    }
            
                    @Override
                    public void close() {
                        // 关闭时无需执行额外操作
                    }
            
                    @Override
                    public void configure(Map<String, ?> configs) {
                        // 无需配置额外操作
                    }
                }
            
                public static void main(String[] args) {
                    // Kafka 生产者配置
                    Properties properties = new Properties();
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    properties.put(ProducerConfig.ACKS_CONFIG, "all");
                    properties.put(ProducerConfig.RETRIES_CONFIG, 3);
                    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
                    properties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
                    // 设置自定义拦截器
                    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyInterceptor.class.getName());
            
                    // 创建生产者实例
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            
                    // 发送消息
                    for (int i = 0; i < 5; i++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>("test3-topic", "this is hainiu " + i);
                        producer.send(record, new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata metadata, Exception exception) {
                                String topic = metadata.topic();
                                int partition = metadata.partition();
                                long offset = metadata.offset();
                                if (exception == null) {
                                    System.out.println("Message sent to topic: " + topic + ", partition: " + partition + ", offset: " + offset);
                                } else {
                                    System.out.println("Error sending message: " + exception.getMessage());
                                }
                            }
                        });
                    }
            
                    // 关闭生产者
                    producer.close();
                }
            }
        b.消费者
            public class Demo03Consumer {
                public static void main(String[] args) {
                    // Kafka 消费者配置
                    Properties properties = new Properties();
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            
                    // 创建消费者实例
                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
                    // 订阅指定主题
                    consumer.subscribe(Collections.singletonList("test3-topic"));
            
                    // 消费者无限循环,拉取消息
                    while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, String> record : records) {
                            // 打印接收到的消息,带时间戳
                            System.out.printf("Received message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        }
                    }
                }
            }
    d.producer自定义序列化器:String类型序列化器,将 String 类型数据转换为字节数组进行传输。
        a.生产者
            public class Demo04Producer {
            
                public static void main(String[] args) {
                    // Kafka producer 配置
                    Properties properties = new Properties();
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MyStringSerializer.class.getName());
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyStringSerializer.class.getName());
            
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            
                    // 发送使用自定义 String 序列化器序列化后的消息
                    for (int i = 0; i < 5; i++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>("test4-topic", "key_" + i, "message_" + i);
                        producer.send(record);
                    }
            
                    producer.close();
                }
            
                // 实现 String 类型的自定义序列化器
                public static class MyStringSerializer implements Serializer<String> {
                    @Override
                    public byte[] serialize(String topic, String data) {
                        if (data == null) {
                            return null;
                        }
                        // 使用 UTF-8 编码将字符串转换为字节数组
                        return data.getBytes(StandardCharsets.UTF_8);
                    }
                }
            }
        b.消费者
            public class Demo04Consumer {
                public static void main(String[] args) {
                    // Kafka consumer 配置
                    Properties properties = new Properties();
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, MyStringDeserializer.class.getName());
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyStringDeserializer.class.getName());
                    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            
                    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
                        consumer.subscribe(Collections.singletonList("test4-topic"));
            
                        while (true) {
                            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                            for (ConsumerRecord<String, String> record : records) {
                                System.out.printf("接收到消息: topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                            }
                        }
                    }
                }
            
                // 实现 String 类型的自定义反序列化器
                public static class MyStringDeserializer implements Deserializer<String> {
                    @Override
                    public String deserialize(String topic, byte[] data) {
                        if (data == null) {
                            return null;
                        }
                        // 使用 UTF-8 编码将字节数组转换回字符串
                        return new String(data, StandardCharsets.UTF_8);
                    }
                }
            }
    e.producer自定义序列化器:对象整体序列化,使用 ObjectOutputStream 将整个 Java 对象序列化为字节数组,方便发送自定义的对象数据。
        a.生产者
            public class Demo05Producer {
            
                // 定义 Student 类
                public static class Student implements Serializable {
                    private int id;
                    private String name;
                    private int age;
            
                    public Student(int id, String name, int age) {
                        this.id = id;
                        this.name = name;
                        this.age = age;
                    }
            
                    // Getter 和 Setter 方法
                    public int getId() { return id; }
                    public String getName() { return name; }
                    public int getAge() { return age; }
            
                    @Override
                    public String toString() {
                        return "Student{id=" + id + ", name='" + name + "', age=" + age + "}";
                    }
                }
            
                // 实现自定义的 Student 序列化器
                public static class StudentSerializer implements Serializer<Student> {
                    @Override
                    public byte[] serialize(String topic, Student student) {
                        if (student == null) {
                            return null;
                        }
                        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                             ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
                            objectOutputStream.writeObject(student);
                            return byteArrayOutputStream.toByteArray();
                        } catch (IOException e) {
                            throw new RuntimeException("Error serializing Student object", e);
                        }
                    }
                }
            
                public static void main(String[] args) {
                    // Kafka producer 配置
                    Properties properties = new Properties();
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StudentSerializer.class.getName());
            
                    KafkaProducer<String, Student> producer = new KafkaProducer<>(properties);
            
                    // 发送自定义对象 Student 的消息
                    Student student1 = new Student(1, "zhangsan", 20);
                    Student student2 = new Student(2, "lisi", 30);
            
                    ProducerRecord<String, Student> record1 = new ProducerRecord<>("test5-topic", "key_1", student1);
                    ProducerRecord<String, Student> record2 = new ProducerRecord<>("test5-topic", "key_2", student2);
            
                    producer.send(record1);
                    producer.send(record2);
            
                    producer.close();
                }
            }
        b.消费者
            public class Demo05Consumer {
            
                public static class Student implements Serializable {
                    private int id;
                    private String name;
                    private int age;
            
                    public Student(int id, String name, int age) {
                        this.id = id;
                        this.name = name;
                        this.age = age;
                    }
            
                    // Getter 和 Setter 方法
                    public int getId() { return id; }
                    public String getName() { return name; }
                    public int getAge() { return age; }
            
                    @Override
                    public String toString() {
                        return "Student{id=" + id + ", name='" + name + "', age=" + age + "}";
                    }
                }
            
                // 自定义 Student 反序列化器
                public static class StudentDeserializer implements org.apache.kafka.common.serialization.Deserializer<com.ruoyi.kafka.Demo05Producer.Student> {
            
                    @Override
                    public com.ruoyi.kafka.Demo05Producer.Student deserialize(String topic, byte[] data) {
                        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
                             ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
                            return (com.ruoyi.kafka.Demo05Producer.Student) objectInputStream.readObject();
                        } catch (IOException | ClassNotFoundException e) {
                            throw new RuntimeException("Error deserializing Student object", e);
                        }
                    }
                }
            
                public static void main(String[] args) {
                    // Kafka consumer 配置
                    Properties properties = new Properties();
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "object-group2");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StudentDeserializer.class.getName());
            
                    KafkaConsumer<String, com.ruoyi.kafka.Demo05Producer.Student> consumer = new KafkaConsumer<>(properties);
            
                    // 订阅 topic
                    consumer.subscribe(Collections.singletonList("test5-topic"));
            
                    // 消费消息
                    while (true) {
                        ConsumerRecords<String, com.ruoyi.kafka.Demo05Producer.Student> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, com.ruoyi.kafka.Demo05Producer.Student> record : records) {
                            System.out.printf("Consumed record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
                                    record.key(), record.value().toString(), record.partition(), record.offset());
                        }
                    }
                }
            }
    f.producer自定义序列化器:手动序列化数据对象,通过手动拼接对象的属性,将其转换为字符串,再转为字节数组。这种方法更灵活,可以自定义序列化格式。
        a.生产者
            public class Demo06Producer {
            
                // 定义 Student 类
                public static class Student {
                    private int id;
                    private String name;
                    private int age;
            
                    public Student(int id, String name, int age) {
                        this.id = id;
                        this.name = name;
                        this.age = age;
                    }
            
                    // Getter 和 Setter 方法
                    public int getId() { return id; }
                    public String getName() { return name; }
                    public int getAge() { return age; }
                }
            
                // 自定义 Student 的序列化器,手动将对象转换为字节数组
                public static class CustomStudentSerializer implements Serializer<Student> {
            
                    @Override
                    public byte[] serialize(String topic, Student student) {
                        if (student == null) {
                            return null;
                        }
                        // 将 Student 对象的属性拼接成字符串,然后转换为字节数组
                        String data = student.getId() + "," + student.getName() + "," + student.getAge();
                        return data.getBytes(Charset.forName("UTF-8"));
                    }
                }
            
                public static void main(String[] args) {
                    // Kafka producer 配置
                    Properties properties = new Properties();
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomStudentSerializer.class.getName());
            
                    KafkaProducer<String, Student> producer = new KafkaProducer<>(properties);
            
                    // 发送手动序列化的 Student 消息
                    Student student1 = new Student(1, "zhangsan", 20);
                    Student student2 = new Student(2, "lisi", 30);
            
                    ProducerRecord<String, Student> record1 = new ProducerRecord<>("test6-topic", "key_1", student1);
                    ProducerRecord<String, Student> record2 = new ProducerRecord<>("test6-topic", "key_2", student2);
            
                    producer.send(record1);
                    producer.send(record2);
            
                    producer.close();
                }
            }
        b.消费者
            public class Demo06Consumer {
            
                // Student 类定义
                public static class Student {
                    private int id;
                    private String name;
                    private int age;
            
                    // 无参构造函数
                    public Student() {
                    }
            
                    // 带参构造函数
                    public Student(int id, String name, int age) {
                        this.id = id;
                        this.name = name;
                        this.age = age;
                    }
            
                    // Getter 和 Setter 方法
                    public int getId() {
                        return id;
                    }
            
                    public void setId(int id) {
                        this.id = id;
                    }
            
                    public String getName() {
                        return name;
                    }
            
                    public void setName(String name) {
                        this.name = name;
                    }
            
                    public int getAge() {
                        return age;
                    }
            
                    public void setAge(int age) {
                        this.age = age;
                    }
            
                    @Override
                    public String toString() {
                        return "Student{id=" + id + ", name='" + name + "', age=" + age + "}";
                    }
                }
            
                // 自定义反序列化器,用于反序列化 Student 对象
                public static class CustomStudentDeserializer implements org.apache.kafka.common.serialization.Deserializer<Student> {
            
                    @Override
                    public Student deserialize(String topic, byte[] data) {
                        if (data == null || data.length == 0) {
                            return null;
                        }
                        // 将字节数组解析为字符串,假设格式为 "id,name,age"
                        String[] studentData = new String(data, StandardCharsets.UTF_8).split(",");
                        // 校验数据格式是否正确
                        if (studentData.length != 3) {
                            throw new IllegalArgumentException("Invalid data format for Student");
                        }
                        return new Student(Integer.parseInt(studentData[0]), studentData[1], Integer.parseInt(studentData[2]));
                    }
                }
            
                public static void main(String[] args) {
                    // Kafka consumer 配置
                    Properties properties = new Properties();
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomStudentDeserializer.class.getName());
            
                    // 创建 KafkaConsumer 实例
                    KafkaConsumer<String, Student> consumer = new KafkaConsumer<>(properties);
            
                    // 订阅 topic
                    consumer.subscribe(Collections.singletonList("test6-topic"));
            
                    // 消费消息
                    while (true) {
                        ConsumerRecords<String, Student> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, Student> record : records) {
                            System.out.printf("Consumed record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
                                    record.key(), record.value().toString(), record.partition(), record.offset());
                        }
                    }
                }
            }
    g.producer自定义分区器
        public class Demo07Producer {
        
            public static class MyTeacherPartitioner implements Partitioner {
                @Override
                public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                    String valueStr = value.toString();
                    // 根据教师名称来确定分区
                    if (valueStr.contains("spark")) {
                        return 0; // spark 发送到分区 0
                    } else if (valueStr.contains("java")) {
                        return 1; // java 发送到分区 1
                    } else {
                        return 2; // 默认发送到分区 2
                    }
                }
        
                @Override
                public void close() {
                    // no-op
                }
        
                @Override
                public void configure(Map<String, ?> configs) {
                    // no-op
                }
            }
        
            public static void main(String[] args) throws Exception {
                // 创建 Kafka 主题
                String topicName = "test7-topic";
                createKafkaTopic(topicName, "192.168.185.150:9092");
        
                // 配置 Kafka Producer
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyTeacherPartitioner.class.getName());
        
                KafkaProducer<String, String> producer = new KafkaProducer<>(pro);
        
                // 读取数据并发送到 Kafka
                String[] teacherUrls = {
                        "http://spark.hainiubl.com/unclewang",
                        "http://spark.hainiubl.com/xiaohe",
                        "http://spark.hainiubl.com/laoyang",
                        "http://java.hainiubl.com/laochen",
                        "http://java.hainiubl.com/laoliu"
                };
        
                for (String line : teacherUrls) {
                    URL url = new URL(line);
                    String host = url.getHost();
                    String path = url.getPath();
                    String subject = host.split("\\.")[0]; // 获取科目
                    String teacher = path.substring(1); // 获取教师名称
                    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, subject, teacher);
        
                    producer.send(record, new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception == null) {
                                System.out.println(metadata.topic() + "-->" + metadata.partition() + "-->" + record.key() + "-->" + record.value());
                            } else {
                                System.out.println("fail");
                            }
                        }
                    });
                }
                producer.close();
            }
        
            private static void createKafkaTopic(String topicName, String bootstrapServers) {
                Properties props = new Properties();
                props.put("bootstrap.servers", bootstrapServers);
                props.put("key.serializer", StringSerializer.class.getName());
                props.put("value.serializer", StringSerializer.class.getName());
        
                try (AdminClient adminClient = AdminClient.create(props)) {
                    // 检查主题是否已存在
                    ListTopicsResult topics = adminClient.listTopics();
                    Set<String> existingTopics = topics.names().get();
                    if (!existingTopics.contains(topicName)) {
                        // 创建主题
                        NewTopic newTopic = new NewTopic(topicName, 3, (short) 1); // 3 个分区,副本因子为 1
                        adminClient.createTopics(Collections.singleton(newTopic));
                        System.out.println("Topic '" + topicName + "' created successfully.");
                    } else {
                        System.out.println("Topic '" + topicName + "' already exists.");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    h.producer一致性和ACK
        a.生产者
            public class Demo08Producer {
            
                public static void main(String[] args) {
                    Properties properties = new Properties();
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    // 设置 ACK 配置为 "all",确保所有副本都确认收到消息
                    properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 可选值:0, 1, all
                    properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 设置重试次数
            
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            
                    for (int i = 0; i < 10; i++) {
                        String key = "key-" + i;
                        String value = "value-" + i;
            
                        ProducerRecord<String, String> record = new ProducerRecord<>("test8-topic", key, value);
            
                        producer.send(record, new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata metadata, Exception exception) {
                                if (exception == null) {
                                    System.out.printf("Sent record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
                                            key, value, metadata.partition(), metadata.offset());
                                } else {
                                    System.err.println("Error while producing: " + exception.getMessage());
                                }
                            }
                        });
                    }
            
                    producer.close();
                }
            }
        b.消费者
            public class Demo08Consumer {
            
                public static void main(String[] args) {
                    Properties properties = new Properties();
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            
                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
                    consumer.subscribe(Collections.singletonList("test8-topic"));
            
                    while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.printf("Consumed record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
                                    record.key(), record.value(), record.partition(), record.offset());
                        }
                    }
                }
            }
    i.producer事务
        a.生产者
            public class Demo09Producer {
            
                public static void main(String[] args) {
                    Properties properties = new Properties();
                    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                    // 设置事务 ID
                    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id-1");
                    properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置 ACK 为 all
            
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            
                    // 初始化事务
                    producer.initTransactions();
            
                    try {
                        // 开始事务
                        producer.beginTransaction();
                        for (int i = 0; i < 10; i++) {
                            String key = "key-" + i;
                            String value = "value-" + i;
            
                            ProducerRecord<String, String> record = new ProducerRecord<>("test9-topic", key, value);
                            producer.send(record);
                        }
                        // 提交事务
                        producer.commitTransaction();
                    } catch (Exception e) {
                        // 出现异常时,撤销事务
                        producer.abortTransaction();
                        e.printStackTrace();
                    } finally {
                        producer.close();
                    }
                }
            }
        b.消费者
            public class Demo09Consumer {
            
                public static void main(String[] args) {
                    Properties properties = new Properties();
                    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
                    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            
                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
                    consumer.subscribe(Collections.singletonList("test9-topic"));
            
                    while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.printf("Consumed record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
                                    record.key(), record.value(), record.partition(), record.offset());
                        }
                    }
                }
            }

02.Kafka消费者
    a.默认,订阅模式
        在 Kafka 的发布订阅模式中,消费者和生产者之间的关系是通过主题(Topic)来实现的。以下是订阅模式的具体体现:
        1.主题(Topic):
            生产者将消息发送到特定的主题(在你的代码中是 kafka1-topic)。
            消费者订阅这个主题,以接收该主题下的消息。
        2.消费者组(Consumer Group):
            消费者可以被组织成一个消费者组。每个消费者组中的消费者会共同消费同一个主题的消息。
            在 Demo01Consumer 中,设置了 GROUP_ID_CONFIG,这意味着该消费者属于一个特定的消费者组。
        3.消息的接收:
            在 Demo01Consumer 中,使用 consumer.subscribe(Collections.singletonList(TOPIC)); 订阅了 kafka1-topic 主题。
            消费者通过 poll 方法不断轮询消息,这体现了消费者对主题的订阅。
        4.消息的分发:
            Kafka 会确保同一组内的消费者不会重复消费同一条消息,而是将消息分发给不同的消费者。这种机制使得多个消费者可以并行处理消息,提高了系统的吞吐量。
        -----------------------------------------------------------------------------------------------------
        代码中的体现
        在 Demo01Consumer 中,消费者通过 subscribe 方法订阅了主题,并在一个无限循环中使用 poll 方法接收消息。
        在 Demo01Producer 中,生产者将消息发送到同一个主题,确保消费者能够接收到这些消息。
        这种模式使得生产者和消费者之间的耦合度降低,允许它们独立地扩展和维护。
    b.consumer自定义序列化器
        生产者:序列化器
        public static class CustomStudentSerializer implements Serializer<Student> {

            @Override
            public byte[] serialize(String topic, Student student) {
                if (student == null) {
                    return null;
                }
                // 将 Student 对象的属性拼接成字符串,然后转换为字节数组
                String data = student.getId() + "," + student.getName() + "," + student.getAge();
                return data.getBytes(Charset.forName("UTF-8"));
            }
        }
        -----------------------------------------------------------------------------------------------------
        消费者:反序列化器
        public static class CustomStudentDeserializer implements org.apache.kafka.common.serialization.Deserializer<Student> {

            @Override
            public Student deserialize(String topic, byte[] data) {
                if (data == null || data.length == 0) {
                    return null;
                }
                // 将字节数组解析为字符串,假设格式为 "id,name,age"
                String[] studentData = new String(data, StandardCharsets.UTF_8).split(",");
                // 校验数据格式是否正确
                if (studentData.length != 3) {
                    throw new IllegalArgumentException("Invalid data format for Student");
                }
                return new Student(Integer.parseInt(studentData[0]), studentData[1], Integer.parseInt(studentData[2]));
            }
        }
    c.consumer指定分区消费数据
        生产者
        public class Demo03Producer {
            public static void main(String[] args) {
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.185.150:9092");
                props.put("key.serializer", StringSerializer.class.getName());
                props.put("value.serializer", StringSerializer.class.getName());
        
                KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
                // 发送消息到指定分区
                int partition = 1; // 指定分区
                String topic = "kafka3-topic"; // 替换为你的主题名称
                String key = "key1";
                String value = "Hello, Kafka!";
        
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            exception.printStackTrace();
                        } else {
                            System.out.println("Sent message to partition " + metadata.partition() + " with offset " + metadata.offset());
                        }
                    }
                });
        
                producer.close();
            }
        }
        -----------------------------------------------------------------------------------------------------
        消费者
        public class Demo03Consumer {
            public static void main(String[] args) {
                Properties props = new Properties();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); // 替换为你的消费组ID
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
                // 订阅特定主题和分区
                String topic = "kafka3-topic"; // 替换为你的主题名称
                int partition = 0; // 指定分区
                consumer.assign(Collections.singletonList(new org.apache.kafka.common.TopicPartition(topic, partition)));
        
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed message from partition %d with offset %d: %s%n", record.partition(), record.offset(), record.value());
                    }
                }
            }
        }
    d.consumer偏移量:强制重复消费
        1.使用seek()方法重置偏移量
            这是最直接的方法,就像代码中展示的那样:
            consumer.seek(partition, 0); // 从偏移量0开始消费
            我们可以将偏移量设置为0或其他任意值,从指定位置重新开始消费。
        2.使用auto.offset.reset配置
            可以在创建消费者时设置auto.offset.reset参数:
            props.put("auto.offset.reset", "earliest");
            这样当找不到初始偏移量时,会自动重置到最早的偏移量。
        3.使用新的消费者组
            每次创建一个新的消费者组ID,这样会从头开始消费所有消息:
            props.put("group.id", "new_group_" + System.currentTimeMillis());
        4.手动提交偏移量
            禁用自动提交,在处理完消息后手动提交偏移量:
            props.put("enable.auto.commit", "false");
            // 处理消息后
            consumer.commitSync();
            这样可以控制提交的时机,需要重复消费时不提交即可。
        5.使用assign()而非subscribe()
            使用assign()手动分配分区,这样可以完全控制消费的分区和偏移量:
            consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
            consumer.seek(new TopicPartition(topic, 0), 0);

6.3 使用2:Stream

00.总结
    a.StreamBuilder
        stream(String topic)
        stream(Collection<String> topics)
        table(String topic)
        globalTable(String topic)
    b.Transformations
        map(KeyValueMapper<K, V, R> mapper)
        flatMap(KeyValueMapper<K, V, Iterable<R>> mapper)
        filter(Predicate<K, V> predicate)
        filterNot(Predicate<K, V> predicate)
        mapValues(ValueMapper<V, R> mapper)
        groupByKey()
        groupBy(KeyValueMapper<K, V, KeyValue<K, V>> mapper)
        aggregate(V initialValue, Aggregator<K, V, R> aggregator)
        count()
        reduce(Reduce<K, V> reducer)
        join(KTable<K, V> table, ValueJoiner<V, V, R> joiner)
        leftJoin(KTable<K, V> table, ValueJoiner<V, V, R> joiner)
        outerJoin(KTable<K, V> table, ValueJoiner<V, V, R> joiner)
        transform(Transformer<K, V, R> transformer)
        transformValues(ValueTransformer<V, R> transformer)
    c.Windowing
        timeWindow(TimeWindows windows)
        sessionWindow(SessionWindows windows)
        tumblingWindow(TimeWindows windows)
    d.State Stores
        Materialized.as(String storeName)
        withKeySerde(Serde<K> keySerde)
        withValueSerde(Serde<V> valueSerde)
    e.Output Operations
        to(String topic)
        toStream()
        toTable()
    f.KTable Operations
        toStream()
        join(KTable<K, V> table, ValueJoiner<V, V, R> joiner)
        groupBy(KeyValueMapper<K, V, KeyValue<K, V>> mapper)
    g.Processing Guarantees
        exactlyOnce()
        atLeastOnce()
    h.Error Handling
        setErrorHandler(DeserializationExceptionHandler handler)

01.StreamBuilder
    a.介绍
        stream(String topic)                                                   创建一个流,消费指定主题的数据
        stream(Collection<String> topics)                                      创建一个流,消费多个主题的数据
        table(String topic)                                                    创建一个表,消费指定主题的数据,并将其视为一个表格
        globalTable(String topic)                                              创建一个全局表,消费指定主题的数据,所有实例都可以访问这个表
    b.示例
        package com.ruoyi.kafka3;
        
        import java.util.Properties;
        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.KTable;
        
        public class Demo01Consumer {
            private static final String TOPIC = "stream-build-topic"; // 主题名称
        
            public static void main(String[] args) {
                // 配置 Kafka Streams
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-consumer1");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
                // 创建 Kafka Streams 实例并启动流处理
                startStreamProcessing(props);
                // 创建 Kafka Streams 实例并启动表处理
                startTableProcessing(props);
            }
        
            // 启动流处理
            private static void startStreamProcessing(Properties props) {
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream(TOPIC);
                stream.foreach((key, value) -> {
                    System.out.printf("Stream - Key: %s, Value: %s%n", key, value);
                });
        
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
        
                // 添加关闭钩子
                Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
            }
        
            // 启动表处理
            private static void startTableProcessing(Properties props) {
                StreamsBuilder builder = new StreamsBuilder();
                KTable<String, String> table = builder.table(TOPIC);
                table.toStream().foreach((key, value) -> {
                    System.out.printf("Table - Key: %s, Value: %s%n", key, value);
                });
        
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
        
                // 添加关闭钩子
                Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
            }
        }

02.Transformations
    a.介绍
        map(KeyValueMapper<K, V, R> mapper)                                    将每个输入记录映射到一个新的输出记录,使用提供的映射器进行转换
        flatMap(KeyValueMapper<K, V, Iterable<R>> mapper)                      类似于 map,但允许每个输入记录生成零个或多个输出记录,返回一个可迭代的集合
        filter(Predicate<K, V> predicate)                                      过滤输入记录,仅保留满足给定条件的记录
        filterNot(Predicate<K, V> predicate)                                   过滤输入记录,仅保留不满足给定条件的记录
        mapValues(ValueMapper<V, R> mapper)                                    仅对值进行映射,返回一个新的流,其中每个记录的键保持不变,而值经过映射器转换
        groupByKey()                                                           根据记录的键对流进行分组,返回一个新的 KGroupedStream
        groupBy(KeyValueMapper<K, V, KeyValue<K, V>> mapper)                   根据提供的映射器对流进行分组,允许自定义分组逻辑
        aggregate(V initialValue, Aggregator<K, V, R> aggregator)              对流中的记录进行聚合,使用初始值和聚合器来计算最终结果
        count()                                                                计算每个键的记录数量,返回一个 KTable,其中包含每个键的计数
        reduce(Reduce<K, V> reducer)                                           对流中的记录进行归约操作,使用提供的归约器将多个值合并为一个值
        join(KTable<K, V> table, ValueJoiner<V, V, R> joiner)                  将流与给定的 KTable 进行连接,使用提供的连接器来生成输出记录
        leftJoin(KTable<K, V> table, ValueJoiner<V, V, R> joiner)              执行左连接,将流与 KTable 进行连接,保留流中的所有记录
        outerJoin(KTable<K, V> table, ValueJoiner<V, V, R> joiner)             执行外连接,将流与 KTable 进行连接,保留两者中的所有记录
        transform(Transformer<K, V, R> transformer)                            允许用户定义自定义转换逻辑,返回一个新的 KStream
        transformValues(ValueTransformer<V, R> transformer)                    仅对值进行自定义转换,返回一个新的 KStream,其中每个记录的键保持不变,而值经过转换
    b.示例
        package com.ruoyi.kafka3;
        
        import java.util.Arrays;
        import java.util.Properties;
        import java.util.stream.Collectors;
        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.KeyValue;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.KTable;
        import org.apache.kafka.streams.kstream.Transformer;
        import org.apache.kafka.streams.kstream.TransformerSupplier;
        import org.apache.kafka.streams.processor.ProcessorContext;
        
        public class Demo02Consumer {
            private static final String TOPIC = "stream-trans-topic"; // 主题名称
        
            public static void main(String[] args) {
                // 配置 Kafka Streams
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-consumer1"); // 设置应用程序ID
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092"); // 设置Kafka服务器地址
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 设置默认键的序列化/反序列化类
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 设置默认值的序列化/反序列化类
        
                // 启动流处理
                startStreamProcessing(props);
            }
        
            // 启动流处理
            private static void startStreamProcessing(Properties props) {
                StreamsBuilder builder = new StreamsBuilder(); // 创建流构建器
                KStream<String, String> stream = builder.stream(TOPIC); // 从指定主题创建流
                processStream(stream); // 处理流
                KafkaStreams streams = new KafkaStreams(builder.build(), props); // 创建KafkaStreams实例
                streams.start(); // 启动流处理
                addShutdownHook(streams); // 添加关闭钩子以优雅地关闭流
            }
        
            // 处理流
            private static void processStream(KStream<String, String> stream) {
                // 示例:映射
                stream.map((key, value) -> KeyValue.pair(key, value.toUpperCase())); // 将值转换为大写
        
                // 示例:扁平映射
                stream.flatMap((key, value) -> {
                    String[] words = value.split(" "); // 按空格分割值
                    return words.length == 0 ? null : Arrays.stream(words).map(word -> KeyValue.pair(key, word)).collect(Collectors.toList()); // 将每个单词映射为键值对
                });
        
                // 示例:过滤
                stream.filter((key, value) -> value.length() > 5); // 过滤掉长度小于等于5的值
        
                // 示例:不过滤
                stream.filterNot((key, value) -> value.length() <= 5); // 过滤掉长度大于5的值
        
                // 示例:映射值
                stream.mapValues(value -> value.length()); // 将值映射为其长度
        
                // 示例:分组
                stream.groupByKey(); // 按键分组
        
                // 示例:按键分组
                stream.groupBy((key, value) -> KeyValue.pair(value.charAt(0), value)); // 按值的首字母分组
        
                // 示例:聚合
                stream.groupByKey().aggregate(
                    () -> 0, // 初始值
                    (key, value, aggregate) -> aggregate + value.length() // 聚合逻辑:累加值的长度
                );
        
                // 示例:计数
                stream.groupByKey().count(); // 计算每个键的出现次数
        
                // 示例:减少
                stream.groupByKey().reduce((aggValue, newValue) -> newValue); // 示例减少:保留最新的值
        
                // 示例:连接
                KTable<String, String> table = stream.groupByKey().reduce((aggValue, newValue) -> newValue); // 创建KTable并连接流和表
                stream.join(table, (value1, value2) -> value1 + value2); // 连接流和表的值
        
                // 示例:左连接
                stream.leftJoin(table, (value1, value2) -> value1 + (value2 != null ? value2 : "default")); // 左连接,若表中没有对应值则使用默认值
        
                // 示例:转换
                stream.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
                    @Override
                    public Transformer<String, String, KeyValue<String, String>> get() {
                        return new Transformer<String, String, KeyValue<String, String>>() {
                            @Override
                            public void init(ProcessorContext context) {}
        
                            @Override
                            public KeyValue<String, String> transform(String key, String value) {
                                return KeyValue.pair(key, value + " transformed"); // 在值后添加" transformed"
                            }
        
                            @Override
                            public void close() {}
                        };
                    }
                });
        
                // 示例:转换值
                stream.mapValues(value -> value.length()); // 将值转换为其长度
            }
        
            // 添加关闭钩子
            private static void addShutdownHook(KafkaStreams streams) {
                Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); // 在JVM关闭时关闭KafkaStreams
            }
        }

03.Windowing
    a.介绍
        timeWindow(TimeWindows windows)                                        创建一个时间窗口,用于对流数据进行分组和聚合,基于时间段的窗口
        sessionWindow(SessionWindows windows)                                  创建一个会话窗口,用于对流数据进行分组和聚合,基于活动会话的时间段
        tumblingWindow(TimeWindows windows)                                    创建一个滚动窗口,数据在固定的时间段内进行分组和聚合,时间段之间没有重叠
    b.示例
        package com.ruoyi.kafka3;
        
        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.TimeWindows;
        import org.apache.kafka.streams.kstream.SessionWindows;
        
        import java.time.Duration;
        import java.util.Properties;
        
        public class Demo03Consumer {
            private static final String TOPIC = "stream-Windowing-topic"; // 主题名称
        
            public static void main(String[] args) {
                // 设置流处理配置
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-consumer");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
                // 创建流处理拓扑
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream(TOPIC);
        
                // 调用不同的窗口处理方法
                timeWindowExample(stream);
                sessionWindowExample(stream);
                tumblingWindowExample(stream);
        
                // 启动流处理
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
        
                // 添加关闭钩子以优雅地关闭流
                Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
            }
        
            // 时间窗口示例
            // 时间窗口将数据分组到固定的时间段内,所有在该时间段内到达的数据都会被聚合。
            // 窗口之间可能会有重叠,适合需要重叠的时间段聚合的场景。
            private static void timeWindowExample(KStream<String, String> stream) {
                stream.groupByKey()
                      .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) // 5分钟的时间窗口
                      .count()
                      .toStream()
                      .foreach((key, value) -> System.out.println("Time Window Count: " + value));
            }
        
            // 会话窗口示例
            // 会话窗口用于对流数据进行分组,基于活动会话的时间段。
            // 会话窗口会根据数据的到达时间动态调整窗口的大小,适合处理用户活动等动态会话的场景。
            private static void sessionWindowExample(KStream<String, String> stream) {
                stream.groupByKey()
                      .windowedBy(SessionWindows.with(Duration.ofMinutes(5))) // 5分钟的会话窗口
                      .count()
                      .toStream()
                      .foreach((key, value) -> System.out.println("Session Window Count: " + value));
            }
        
            // 滚动窗口示例
            // 滚动窗口将数据分组到固定的时间段内,时间段之间没有重叠。
            // 数据在每个固定的时间段内进行聚合,适合定期统计的场景。
            private static void tumblingWindowExample(KStream<String, String> stream) {
                stream.groupByKey()
                      .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) // 1分钟的滚动窗口
                      .count()
                      .toStream()
                      .foreach((key, value) -> System.out.println("Tumbling Window Count: " + value));
            }
        }

04.State Stores
    a.介绍
        Materialized.as(String storeName)                                      定义一个状态存储的名称,用于存储流处理中的中间状态
        withKeySerde(Serde<K> keySerde)                                        指定状态存储中键的序列化和反序列化方式
        withValueSerde(Serde<V> valueSerde)                                    指定状态存储中值的序列化和反序列化方式
    b.示例
        package com.ruoyi.kafka3;
        
        import java.util.Properties;
        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.Materialized;
        
        public class Demo04Consumer {
            public static void main(String[] args) {
                // 配置Kafka Streams
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-consumer");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
                // 创建StreamsBuilder
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream("stream-state-topic");
        
                // 处理流数据
                processStream(stream);
        
                // 启动Kafka Streams
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
        
                // 添加关闭钩子
                Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
            }
        
            private static void processStream(KStream<String, String> stream) {
                // 处理流数据的示例
                stream.foreach((key, value) -> {
                    // 这里可以添加处理逻辑
                    System.out.printf("Key: %s, Value: %s%n", key, value);
                });
        
                // 使用 Materialized.as 定义状态存储
                createMaterializedStore(stream);
                // 使用 withKeySerde 和 withValueSerde
                createKeyValueSerdeStore(stream);
            }
        
            private static void createMaterializedStore(KStream<String, String> stream) {
                // 定义状态存储的名称
                String storeName = "my-state-store";
        
                // 使用 Materialized.as 创建状态存储
                stream.groupByKey()
                      .count(Materialized.as(storeName))
                      .toStream()
                      .foreach((key, count) -> {
                          // 打印状态存储中的键和值
                          System.out.printf("Key: %s, Count: %d%n", key, count);
                      });
            }
        
            private static void createKeyValueSerdeStore(KStream<String, String> stream) {
                // 定义状态存储的名称
                String storeName = "key-value-store";
        
                // 使用 Materialized.as 创建状态存储,并指定键和值的序列化方式
        //        stream.groupByKey()
        //              .count(Materialized.as(storeName).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
        
                // 打印状态存储中的键和值
                stream.groupByKey()
                      .count(Materialized.as(storeName))
                      .toStream()
                      .foreach((key, count) -> {
                          System.out.printf("Key: %s, Count: %d%n", key, count);
                      });
            }
        }

05.Output Operations
    a.介绍
        to(String topic)                                                       将处理结果发送到指定的 Kafka 主题。
        toStream()                                                             将 KTable 转换为 KStream,允许对表数据进行流式处理。
        toTable()                                                              将 KStream 转换为 KTable,允许对流数据进行表格处理。
    b.示例
        package com.ruoyi.kafka3;
        
        import java.util.Properties;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.KTable;
        
        /*
        KafkaConsumer: 用于创建和管理 Kafka 消费者。
        */
        public class Demo05Consumer {
        
            public static void main(String[] args) {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
        
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream("stream-output-topic");
                KTable<String, String> table = stream.toTable();
        
                // 处理 KTable 数据
                table.toStream().foreach((key, value) -> {
                    // 在这里处理 KTable 的每个键值对
                    System.out.println("KTable entry: key = " + key + ", value = " + value);
                });
        
                // 处理流数据
                stream.to("output_topic");
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
            }
        }

06.KTable Operations
    a.介绍
        toStream()                                                             将 KTable 转换为 KStream,便于对表数据进行流式操作。
        join(KTable<K, V> table, ValueJoiner<V, V, R> joiner)                  将当前 KTable 与另一个 KTable 进行连接,使用提供的连接器生成输出记录。
        groupBy(KeyValueMapper<K, V, KeyValue<K, V>> mapper)                   根据提供的映射器对 KTable 中的数据进行分组。
    b.示例
        package com.ruoyi.kafka3;
        
        import java.util.Properties;
        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.KeyValue;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.KTable;
        import org.apache.kafka.streams.kstream.ValueJoiner;
        
        public class Demo06Consumer {
        
            public static void main(String[] args) {
                // 配置 Kafka Streams
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo06-consumer");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
                // 创建流处理拓扑
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream("input-topic");
                KTable<String, String> table = builder.table("table-topic");
        
                // 示例 1: 将 KTable 转换为 KStream
                KStream<String, String> transformedStream = toStream(table);
                // 示例 2: 连接两个 KTable
                KTable<String, String> joinedTable = joinTables(table, table);
                // 示例 3: 根据映射器对 KTable 中的数据进行分组
                KTable<String, String> groupedTable = groupByExample(table);
        
                // 启动流处理
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
            }
        
            // 将 KTable 转换为 KStream
            private static KStream<String, String> toStream(KTable<String, String> table) {
                return table.toStream();
            }
        
            // 连接两个 KTable
            private static KTable<String, String> joinTables(KTable<String, String> table1, KTable<String, String> table2) {
                return table1.join(table2, new ValueJoiner<String, String, String>() {
                    @Override
                    public String apply(String value1, String value2) {
                        return value1 + value2; // 自定义连接逻辑
                    }
                });
            }
        
            // 根据映射器对 KTable 中的数据进行分组
            private static KTable<String, String> groupByExample(KTable<String, String> table) {
                return (KTable<String, String>) table.groupBy((key, value) -> new KeyValue<>(key, value));
            }
        }

07.Processing Guarantees
    a.介绍
        exactlyOnce()                                                          确保处理结果的精确一次语义,避免重复处理。
        atLeastOnce()                                                          确保处理结果的至少一次语义,可能会导致重复处理。
    b.示例
        package com.ruoyi.kafka3;
        
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.consumer.ConsumerRecords;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStream;
        
        import java.time.Duration;
        import java.util.Collections;
        import java.util.Properties;
        
        public class Demo07Consumer {
        
            public static void main(String[] args) {
                // 创建Kafka消费者
                KafkaConsumer<String, String> consumer = createConsumer();
        
                // 订阅主题
                consumer.subscribe(Collections.singletonList("test-topic"));
        
                // 消费消息
                consumeMessages(consumer);
        
                // 关闭消费者
                consumer.close();
            }
        
            private static KafkaConsumer<String, String> createConsumer() {
                Properties props = new Properties();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                return new KafkaConsumer<>(props);
            }
        
            private static void consumeMessages(KafkaConsumer<String, String> consumer) {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed message from topic %s partition %d with offset %d: %s%n",
                                          record.topic(), record.partition(), record.offset(), record.value());
                    }
                }
            }
        
            // 使用Kafka Streams API进行精确一次处理
            private static void exactlyOnce() {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-app");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // 确保精确一次处理
                // 其他配置...
        
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream("input-topic");
                stream.to("output-topic");
        
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
            }
        
            // 使用Kafka Streams API进行至少一次处理
            private static void atLeastOnce() {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "at-least-once-app");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); // 确保至少一次处理
                // 其他配置...
        
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream("input-topic");
                stream.to("output-topic");
        
                KafkaStreams streams = new KafkaStreams(builder.build(), props);
                streams.start();
            }
        }

08.Error Handling
    a.介绍
        setErrorHandler(DeserializationExceptionHandler handler)               设置反序列化异常处理程序,以处理在流处理过程中发生的反序列化错误。
    b.示例
        package com.ruoyi.kafka3;
        
        import java.time.Duration;
        import java.util.Collections;
        import java.util.Map;
        import java.util.Properties;
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
        import org.apache.kafka.streams.kstream.Consumed;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.processor.ProcessorContext;
        import org.apache.kafka.common.serialization.Serdes;
        
        public class Demo08Consumer {
        
            public static void main(String[] args) {
                // 创建Kafka消费者
                KafkaConsumer<String, String> consumer = createConsumer();
                // 处理消息
                processMessages(consumer);
            }
        
            private static KafkaConsumer<String, String> createConsumer() {
                Properties props = new Properties();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                return new KafkaConsumer<>(props);
            }
        
            private static void processMessages(KafkaConsumer<String, String> consumer) {
                consumer.subscribe(Collections.singletonList("test-topic"));
                while (true) {
                    consumer.poll(Duration.ofMillis(100)).forEach(record -> {
                        System.out.printf("Consumed message: %s from topic: %s%n", record.value(), record.topic());
                    });
                }
            }
        
            private static void setErrorHandler() {
                // 创建Streams配置
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
        
                // 设置反序列化异常处理程序
                props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationExceptionHandler.class);
        
                // 创建StreamsBuilder
                StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> stream = builder.stream("test-topic", Consumed.with(Serdes.String(), Serdes.String()));
        
                // 处理流
                stream.foreach((key, value) -> System.out.printf("Processed message: %s from key: %s%n", value, key));
            }
        }
        
        // 自定义反序列化异常处理程序
        class CustomDeserializationExceptionHandler implements DeserializationExceptionHandler {
        
            @Override
            public DeserializationHandlerResponse handle(ProcessorContext context,
                ConsumerRecord<byte[], byte[]> record,
                Exception exception) {
                // 处理反序列化异常
                System.err.printf("Error deserializing record with key %s: %s%n", record.key(), exception.getMessage());
                return DeserializationHandlerResponse.CONTINUE; // 继续处理其他记录
            }
        
            @Override
            public void configure(Map<String, ?> configs) {
                // 配置方法可以留空
            }
        }

6.4 使用3:Spring

00.总结
    Spring Kafka 通过 KafkaTemplate(生产者)和 @KafkaListener(消费者)提供了便捷的操作接口。
    a.生产者
        a.发送消息
            简单消息发送
            带键的消息发送(用于控制消息分布)
            指定分区的消息发送(用于控制消息进入特定的分区)
        b.异步发送与同步发送
            异步发送:生产者可以异步发送消息,KafkaTemplate 返回一个 ListenableFuture,可用于注册成功或失败的回调处理。
            同步发送:在某些场景下,可能需要确保消息已经成功发送,此时可以使用同步发送,通过 get() 方法阻塞当前线程,等待结果返回。
        c.事务支持
            KafkaTemplate 支持 Kafka 的事务机制,可以确保一组消息要么全部成功发送,要么全部失败回滚。
        d.回调处理
            生产者可以为每次发送操作注册回调,以处理消息发送成功或失败后的逻辑。
        e.消息序列化
            KafkaTemplate 支持将 Java 对象序列化为 Kafka 消息。
            默认情况下,消息的键和值使用 StringSerializer 进行序列化,但你可以配置其他序列化器,例如 JSON 序列化器。
    b.消费者
        a.基本消息消费
            通过 @KafkaListener 注解,可以指定消费者监听的主题。Spring 自动管理消费者组、分区分配和偏移提交等细节。
        b.手动偏移提交
            默认情况下,Spring Kafka 会自动提交消息的偏移量,但也可以手动控制何时提交偏移量,以确保消息的精确处理。
            例如,可以在处理完成消息后再提交偏移量,以避免消息丢失。
        c.消息过滤
            消费者可以通过实现 RecordFilterStrategy 来过滤不需要处理的消息。例如,只有当消息符合特定条件时才进行处理。
        d.批量消费
            消费者可以配置为批量接收消息,这样可以提高吞吐量。通过设置 @KafkaListener 中的 batch 属性并返回一个 List 来处理消息的批次。
        e.并发消费者
            Spring Kafka 支持为单个主题配置多个并发消费者,这样可以在高负载的场景下提升消息处理能力。
        f.错误处理
            消费者在处理消息时可能遇到异常,Spring Kafka 提供了多种错误处理策略,如重试机制、死信队列等,以保证消费的可靠性。

01.生产者
    a.发送消息
        a.简单消息发送
            kafkaTemplate.send("topicName", "messageContent");
        b.带键的消息发送(用于控制消息分布)
            kafkaTemplate.send("topicName", "key", "messageContent");
        c.指定分区的消息发送(用于控制消息进入特定的分区)
            kafkaTemplate.send("topicName", partition, "key", "messageContent");
    b.异步发送与同步发送
        a.异步发送:生产者可以异步发送消息,KafkaTemplate 返回一个 ListenableFuture,可用于注册成功或失败的回调处理。
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "messageContent");
            future.addCallback(successCallback, failureCallback);
        b.同步发送:在某些场景下,可能需要确保消息已经成功发送,此时可以使用同步发送,通过 get() 方法阻塞当前线程,等待结果返回。
            SendResult<String, String> result = kafkaTemplate.send("topic", "messageContent").get();
    c.事务支持
        KafkaTemplate 支持 Kafka 的事务机制,可以确保一组消息要么全部成功发送,要么全部失败回滚。
        -----------------------------------------------------------------------------------------------------
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1", "message1");
            operations.send("topic2", "message2");
            return true;
        });
    d.回调处理
        生产者可以为每次发送操作注册回调,以处理消息发送成功或失败后的逻辑。
        -----------------------------------------------------------------------------------------------------
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "messageContent");
        future.addCallback(
            result -> System.out.println("Message sent successfully!"),
            ex -> System.err.println("Message failed to send!")
        );
    e.消息序列化
        KafkaTemplate 支持将 Java 对象序列化为 Kafka 消息。
        默认情况下,消息的键和值使用 StringSerializer 进行序列化,但你可以配置其他序列化器,例如 JSON 序列化器。
        -----------------------------------------------------------------------------------------------------
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

02.消费者
    a.基本消息消费
        通过 @KafkaListener 注解,可以指定消费者监听的主题。Spring 自动管理消费者组、分区分配和偏移提交等细节。
        -----------------------------------------------------------------------------------------------------
        @KafkaListener(topics = "topicName", groupId = "group_id")
        public void listen(String message) {
            System.out.println("Received message: " + message);
        }
    b.手动偏移提交
        默认情况下,Spring Kafka 会自动提交消息的偏移量,但也可以手动控制何时提交偏移量,以确保消息的精确处理。
        例如,可以在处理完成消息后再提交偏移量,以避免消息丢失。
        -----------------------------------------------------------------------------------------------------
        @KafkaListener(topics = "topicName", groupId = "group_id")
        public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
            System.out.println("Received message: " + record.value());
            acknowledgment.acknowledge();  // 手动提交偏移量
        }
    c.消息过滤
        消费者可以通过实现 RecordFilterStrategy 来过滤不需要处理的消息。例如,只有当消息符合特定条件时才进行处理。
        -----------------------------------------------------------------------------------------------------
        @Bean
        public RecordFilterStrategy<String, String> filter() {
            return record -> !record.value().contains("important");  // 过滤掉不包含 "important" 的消息
        }
    d.批量消费
        消费者可以配置为批量接收消息,这样可以提高吞吐量。通过设置 @KafkaListener 中的 batch 属性并返回一个 List 来处理消息的批次。
        -----------------------------------------------------------------------------------------------------
        @KafkaListener(topics = "topicName", groupId = "group_id", containerFactory = "batchFactory")
        public void listen(List<String> messages) {
            System.out.println("Received batch messages: " + messages);
        }
    e.并发消费者
        Spring Kafka 支持为单个主题配置多个并发消费者,这样可以在高负载的场景下提升消息处理能力。
        -----------------------------------------------------------------------------------------------------
        @KafkaListener(topics = "topicName", groupId = "group_id", concurrency = "3")
        public void listen(String message) {
            System.out.println("Received message: " + message);
        }
    f.错误处理
        消费者在处理消息时可能遇到异常,Spring Kafka 提供了多种错误处理策略,如重试机制、死信队列等,以保证消费的可靠性。
        -----------------------------------------------------------------------------------------------------
        配置全局错误处理器
        @Bean
        public SeekToCurrentErrorHandler errorHandler() {
            return new SeekToCurrentErrorHandler();
        }

03.使用1
    a.生产者
        @Data
        @RestController
        public class KafkaController {
        
            private String name;
        
            private Integer age;
        
            @Autowired
            private KafkaTemplate<String,String> kafkaTemplate;
        
            /**
             * 发送消息, 消息生产者
             * @Param []
             * @Return {@link String}
             */
            @GetMapping("/kafka")
            public String kafkaTest() {
                // 发送消息
                kafkaTemplate.send("itcast-topic", "xazhao");
                return "OK";
            }
        
            /**
             * 发送对象
             * @Param []
             * @Return {@link String}
             */
            @GetMapping("/mapkafka")
            public String mapKafkaTest() {
                Map<String, Object> map = new HashMap<>();
                map.put("name", "小安");
                map.put("age", 23);
                // 将map转换为string
                String toJSONString = JSON.toJSONString(map);
                // 发送消息
                kafkaTemplate.send("itcast-topic2", toJSONString);
        
                return "ok";
            }
        
        
            @GetMapping("/KafkaTest1")
            public String KafkaTest1() {
                KafkaController kafkaController = new KafkaController();
                kafkaController.name = "kafka";
                kafkaController.age = 23;
        
                // 将对象转换为string
                String jsonString = JSON.toJSONString(kafkaController);
                // 发送消息
                kafkaTemplate.send("itcast-topic1", jsonString);
        
                return "ok";
            }
        }
    b.消费者
        @Slf4j
        @Component
        public class KafkaListenerTest {
        
            @Autowired
            private KafkaTemplate<String, String> kafkaTemplate;
        
            /**
             * 消息的消费者
             * @Param [message]
             * @Return 
             */
            @KafkaListener(topics = "itcast-topic")
            public void kafkaListener(String message) {
                // 消息不为空接收
                if (message != null) {
                    log.info("消息为 : {}", message);
                }
            }
        
            /**
             * 消息的消费者
             * @Param [message]
             * @Return
             */
            @KafkaListener(topics = "itcast-topic2")
            public void mapKafkaListener(String message) {
                log.info("message消息为 : {}", message);
                // 将传回的string转为map
                Map map = JSON.parseObject(message, Map.class);
                log.info("map消息为 : {}", map);
            }
        
            @KafkaListener(topics = "itcast-topic1")
            public void mapKafkaListener1(String message) {
                log.info("message消息为 : {}", message);
                // 将传回的string转为对象
                KafkaController kafkaController = JSON.parseObject(message, KafkaController.class);
                log.info("KafkaController消息为 : {}", kafkaController);
            }
        }

04.使用2
    a.ProcuderTest
        package com.heima;
        
        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.Producer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;
        
        import java.util.Properties;
        
        public class ProcuderTest {
        
            public static void main(String[] args) {
        
                // kafkastream相关配置
                Properties properties = new Properties();
                // kafka的地址, key和value的序列化器
                properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
                properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        
                Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        
                // 封装发送消息
                ProducerRecord<String, String> producerRecord1 = new ProducerRecord<String, String>("TopicSource", "xiaoan", "1");
                ProducerRecord<String, String> producerRecord2 = new ProducerRecord<String, String>("TopicSource", "xiaoan", "5");
                ProducerRecord<String, String> producerRecord3 = new ProducerRecord<String, String>("TopicSource", "Marry", "2");
                ProducerRecord<String, String> producerRecord4 = new ProducerRecord<String, String>("TopicSource", "Marry", "4");
                ProducerRecord<String, String> producerRecord5 = new ProducerRecord<String, String>("TopicSource", "Tom", "12");
        
                producer.send(producerRecord1);
                producer.send(producerRecord2);
                producer.send(producerRecord3);
                producer.send(producerRecord4);
                producer.send(producerRecord5);
                // 关闭消息通道, 必须关闭, 否则消息发送失败
                producer.close();
                System.out.println("消息发送结束...");
            }
        }
    b.ConsumerTest
        package com.heima;
        
        import org.apache.kafka.clients.consumer.*;
        
        import java.time.Duration;
        import java.util.Arrays;
        import java.util.Properties;
        
        public class ConsumerTest {
        
            public static void main(String[] args) {
        
                // kafkastream相关配置
                Properties properties = new Properties();
                // 创建Kafka的配置对象, 发送消息的地址
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
                // 消息key的反序列化器
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                // 消息value的反序列化器
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                // 消费者组
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topicMGroup1");
        
                // 创建消息消费者对象
                Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
                // 订阅主题
                consumer.subscribe(Arrays.asList("TopicSink"));
                // 死循环让当前线程一直处于监听状态
                while (true) {
                    // 拉取消息, 得到一个map
                    ConsumerRecords<String, String> consumerRecords = consumer
                            // Duration.ofSeconds(3)表示若没有拉取消息, 等待3秒在拉取一次
                            .poll(Duration.ofSeconds(3));
                    // 遍历拉取到的消息
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.print("Key : " + consumerRecord.key());
                        System.out.println(", Value : " + consumerRecord.value() + "\n");
                    }
                }
            }
        }
    c.KafkaStreamTest
        package com.heima;
        
        import java.time.Duration;
        import java.util.Properties;
        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.KeyValue;
        import org.apache.kafka.streams.StreamsBuilder;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.Aggregator;
        import org.apache.kafka.streams.kstream.Initializer;
        import org.apache.kafka.streams.kstream.KStream;
        import org.apache.kafka.streams.kstream.KeyValueMapper;
        import org.apache.kafka.streams.kstream.TimeWindows;
        import org.apache.kafka.streams.kstream.Windowed;
        
        public class KafkaStreamTest {
        
            public static void main(String[] args) {
        
                // kafkastream相关配置
                Properties properties = new Properties();
                // 发送消息的地址
                properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
                // 序列化器和反序列化器
                properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kafkaStream-01");
                // 拓扑
                StreamsBuilder streamsBuilder = new StreamsBuilder();
                // 从哪个source主题中获取消息
                KStream<String, String> KStream = streamsBuilder.stream("TopicSource");
                // 处理aggregate, 聚合
                KStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(5))).aggregate(new Initializer<String>() {
                    @Override
                    public String apply() {
                        // 初始化值
                        return "0";
                    }
                }, new Aggregator<String, String, String>() {
                    // 如何累加, aggregate上次累加的值
                    @Override
                    public String apply(String key, String value, String aggregate) {
                        int parseValue = Integer.parseInt(value);
                        int parseAggregate = Integer.parseInt(aggregate);
                        return String.valueOf(parseValue + parseAggregate);
                    }
                }).toStream().map(new KeyValueMapper<Windowed<String>, String, KeyValue<String, String>>() {
                    @Override
                    public KeyValue<String, String> apply(Windowed<String> key, String value) {
                        return new KeyValue<>(key.key(), value);
                    }
                }).to("TopicSink");
                // 将处理完后的消息发送到那个主题
                KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
                kafkaStreams.start();
            }
        }

6.5 事务

01.原理
    Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,
    即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。
    ---------------------------------------------------------------------------------------------------------
    Kafka 事务分为生产者事务和消费者事务,但它们并不是强绑定的关系,
    消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务。

02.生产者,开启事务
    a.工作原理
        1)启动生产者,分配协调器
        在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务。
        事务协调器的分配涉及到一个特殊的主题 **__transaction_state**,该主题默认有 50 个分区,每个分区负责一部分事务;Kafka 根据事务ID的hashcode值%50 计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。
        分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了。
        2)发送消息
        生产者分配到 PID 后,要先告诉事务协调器要把消息发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息。
        当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应。
        3)确认事务
        当生产者开始发送消息时,协调器判定事务开始。它会将开始的信息持久化到主题 __transaction_state 中。
        当生产者发送完事务内的消息,或者遇到异常发送失败,协调器会收到 Commit 或 Abort 请求,接着事务协调器会跟所有主题通信,告诉它们事务是成功还是失败的。
        如果是成功,主题会汇报自己已经收到消息,协调者收到所有主题的回应便确认了事务完成,并持久化这一结果。
        如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束;整个放弃事务的过程消费者是无感知的,它并不会收到这些数据。
        事物不仅可以保证多个数据整体成功失败,还可以保证数据丢失后恢复
    b.创建一个 Producer,指定一个事务 ID
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置事务ID,必须
        properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");
        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    c.使用事务发送消息
        // 初始化事务
        producer.initTransactions();
        // 开启事务
        producer.beginTransaction();

        //发送10条消息往kafka,假如中间有异常,所有消息都会发送失败
        try {
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("topic-test", "a message" + i));
            }
        }
        // 提交事务
        producer.commitTransaction();
        } catch (Exception e) {
            // 终止事务
            producer.abortTransaction();
        } finally {
            producer.close();
        }
    d.完整代码
        package com.hainiu.kafka;

        import org.apache.kafka.clients.producer.*;
        import org.apache.kafka.common.serialization.StringSerializer;

        import java.util.Properties;

        public class ProducerWithTransaction {
            public static void main(String[] args) {
                Properties pro = new Properties();
                pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
                pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                pro.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaciton_test");

                KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
                ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
                producer.initTransactions();
                producer.beginTransaction();
                try{
                    for(int i=0;i<5;i++){
                        producer.send(record);
                    }
        //            int a = 1/0;
                    producer.commitTransaction();
                }catch (Exception e){
                    producer.abortTransaction();
                }finally {
                    producer.close();
                }

            }
        }
        -----------------------------------------------------------------------------------------------------
        使用int a = 1/0;手动抛出异常信息,如果出现异常那么数据不会出现,异常关闭会一次性出现五条结果

03.消费者,开启事务
    a.工作原理
        1)事务性生产者(Transactional Producer)
            生产者在发送消息时开启事务,将多个发送操作(可以跨多个主题和分区)封装在一个事务中。
            事务提交时,确保所有消息要么全部写入成功,要么全部失败回滚。
        2)消费者(Consumer)
            消费者读取消息,并进行处理。
            消费者使用事务性生产者将处理结果发送到下游主题,并将消费的偏移量提交到事务中。
            通过将偏移量提交作为事务的一部分,确保处理和偏移量的提交是原子性的,从而避免消息重复处理或丢失。
        3)隔离级别(Isolation Level)
            消费者可以配置 isolation.level 来控制读取事务消息的行为:
            read_uncommitted:读取所有消息,包括未提交的事务消息。
            read_committed:只读取已提交的事务消息,默认值。
    b.操作
        消费者本身不直接开启事务,而是通过使用事务性生产者来实现事务操作。
        1) 配置事务性生产者
            设置 transactional.id。
            启用幂等性(enable.idempotence=true)。
        2) 初始化事务
            调用 initTransactions() 方法初始化事务。
        3) 消费、处理和生产:
            开始事务 (beginTransaction)。
            消费消息,处理后生产新消息。
            发送处理结果到目标主题。
            在事务中提交消费者的偏移量 (sendOffsetsToTransaction)。
            提交事务 (commitTransaction)。
        4) 异常处理:
            如果在事务过程中发生异常,需回滚事务 (abortTransaction) 以确保原子性。
    c.代码实现
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.consumer.ConsumerRecords;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.kafka.clients.producer.*;
        import org.apache.kafka.common.TopicPartition;
        import org.apache.kafka.common.serialization.StringDeserializer;
        import org.apache.kafka.common.serialization.StringSerializer;
        
        import java.time.Duration;
        import java.util.*;
        import java.util.concurrent.ExecutionException;
        
        public class TransactionalConsumerExample {
            public static void main(String[] args) {
                String bootstrapServers = "localhost:9092";
                String inputTopic = "input-topic";
                String outputTopic = "output-topic";
                String groupId = "transactional-consumer-group";
                String transactionalId = "transactional-producer-1";
        
                // 配置消费者
                Properties consumerProps = new Properties();
                consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                // 设置隔离级别为读已提交,避免读取未提交的事务消息
                consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
                consumer.subscribe(Arrays.asList(inputTopic));
        
                // 配置生产者
                Properties producerProps = new Properties();
                producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                // 启用事务
                producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
                producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
                producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
                producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
                producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        
                Producer<String, String> producer = new KafkaProducer<>(producerProps);
                producer.initTransactions();
        
                try {
                    while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        
                        if (records.count() > 0) {
                            try {
                                // 开始事务
                                producer.beginTransaction();
        
                                // 处理并发送消息
                                for (ConsumerRecord<String, String> record : records) {
                                    String processedValue = process(record.value());
                                    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outputTopic, record.key(), processedValue);
                                    producer.send(producerRecord);
                                }
        
                                // 准备偏移量提交
                                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                                for (TopicPartition partition : records.partitions()) {
                                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                                }
        
                                // 将偏移量作为事务的一部分提交
                                producer.sendOffsetsToTransaction(offsets, groupId);
        
                                // 提交事务
                                producer.commitTransaction();
                            } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                                // 无法恢复的异常,关闭生产者
                                producer.close();
                                break;
                            } catch (KafkaException e) {
                                // 其他可恢复的异常,回滚事务
                                producer.abortTransaction();
                            }
                        }
                    }
                } finally {
                    consumer.close();
                    producer.close();
                }
            }
        
            // 简单的处理函数,将消息值转换为大写
            private static String process(String value) {
                return value.toUpperCase();
            }
        }

04.示例
    a.生产者
        public class Demo09Producer {

            public static void main(String[] args) {
                Properties properties = new Properties();
                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                // 设置事务 ID
                properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-id-1");
                properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置 ACK 为 all

                KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

                // 初始化事务
                producer.initTransactions();

                try {
                    // 开始事务
                    producer.beginTransaction();
                    for (int i = 0; i < 10; i++) {
                        String key = "key-" + i;
                        String value = "value-" + i;

                        ProducerRecord<String, String> record = new ProducerRecord<>("test9-topic", key, value);
                        producer.send(record);
                    }
                    // 提交事务
                    producer.commitTransaction();
                } catch (Exception e) {
                    // 出现异常时,撤销事务
                    producer.abortTransaction();
                    e.printStackTrace();
                } finally {
                    producer.close();
                }
            }
        }
    b.消费者
        public class Demo09Consumer {

            public static void main(String[] args) {
                Properties properties = new Properties();
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.185.150:9092");
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
                consumer.subscribe(Collections.singletonList("test9-topic"));

                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed record: Key=%s, Value=%s, Partition=%d, Offset=%d%n",
                                record.key(), record.value(), record.partition(), record.offset());
                    }
                }
            }
        }

6.6 说明

01.任意一个节点都可以作为集群入口
    a.为什么可以使用任意一个 Broker IP 地址?
        负载均衡和故障转移: Kafka 集群的元数据会在客户端和集群之间同步。如果你连接到集群中的一个 Broker,客户端会从这个 Broker 获取集群的元数据,包括其他 Brokers 的信息,从而进行负载均衡和故障转移。
        集群的元数据: Kafka 的客户端在连接到集群中的任意 Broker 后,会获取集群的完整元数据,包括所有 Brokers 的地址、分区信息等。这样,客户端能够与集群中的其他 Brokers 通信,而不仅仅是你初始连接的那个 Broker。
    b.示例
        a.发送192.168.185.150
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.185.151:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        b.发送192.168.185.151
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.185.151:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        c.总结
            不管向master发送,还是slave1、slave2发送,都指向同一个存储的位置

02.设置数据过期时间(Kafka2.2.0及以上版本)
    a.创建主题
        kafka-topics.sh --create --topic new-topic --bootstrap-server 192.168.185.150:9092 --partitions 3 --replication-factor 2
        这个命令会创建一个名为 new-topic 的主题,包含 3 个分区和 2 个副本。Kafka 3.4.0 使用 --bootstrap-server 参数来指定 Kafka Broker 的地址。
    b.放入数据
        kafka-console-producer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092
    c.查看数据
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092 --from-beginning
    d.删除主题
        a.方式1:设置主题保留时间
            要更新主题的保留时间,你需要使用 kafka-configs.sh 工具。
            首先,设置保留时间为 0(即立即删除所有数据),然后将保留时间设置为一个合理的值(例如 1 天)。
            -------------------------------------------------------------------------------------------------
            步骤 1:设置保留时间为 0
            kafka-configs.sh --alter --entity-type topics --entity-name new-topic --add-config retention.ms=0 --bootstrap-server 192.168.185.150:9092
            这个命令将主题 new-topic 的保留时间设置为 0 毫秒,从而清除所有现有的数据。
            -------------------------------------------------------------------------------------------------
            步骤 2:设置一个合理的保留时间(例如30s)
            kafka-configs.sh --alter --entity-type topics --entity-name new-topic --add-config retention.ms=30000 --bootstrap-server 192.168.185.150:9092
            这个命令将主题 new-topic 的保留时间设置为 1 天(86400000 毫秒)。
        b.方式2:删除主题
            kafka-topics.sh --delete --topic new-topic --bootstrap-server 192.168.185.150:9092
        c.方式3(不推荐):全局配置
            cd  ../kafka/config/server.properties
            log.retention.hours=168 (配置该参数即可)
            log.cleanup.policy=delete (默认,可不配置)
            -------------------------------------------------------------------------------------------------
            修改配置后重启kafka服务生效,kafka默认的消息过期时间为168h(7天)
            该种设置消息过期时间的优点是可以对所有topic全部生效,缺点是需要重启kafka服务,造成服务短暂的不可用!
    e.查看主题
        kafka-topics.sh --list --bootstrap-server 192.168.185.150:9092
    f.查看数据
        kafka-console-consumer.sh --topic new-topic --bootstrap-server 192.168.185.150:9092 --from-beginning

03.消费者ID的设置
    a.介绍
        消费者ID(即消费组ID)是用于标识一组消费者的标识符。不同的消费者可以共享同一个消费组ID,这样它们就可以共同消费同一个主题的消息。具体来说:
        1. 共享消费组ID:如果多个消费者使用相同的消费组ID,它们将作为一个组来消费消息。Kafka 会将消息分配给组内的消费者,确保每条消息只被组内的一个消费者处理。这种方式适用于负载均衡。
        2. 不同的消费组ID:如果每个消费者使用不同的消费组ID,它们将独立消费消息,每个消费者都会接收到所有的消息。这种方式适用于需要每个消费者都处理所有消息的场景。
        3. 配置:消费者使用消费组ID时,通常不需要额外的配置,只需在消费者的配置中设置 group.id 属性即可。
    b.总结
        多个用户可以共享一个消费组ID,但这取决于你的应用需求。
        如果你希望实现负载均衡,使用相同的ID是合适的;
        如果你希望每个用户都能接收到所有消息,则应使用不同的ID。

91.支持的数据类型
    a.介绍
        Apache Kafka 是一个高吞吐量的分布式消息队列系统,设计用于处理大量的实时数据流。
        Kafka 支持多种类型的数据,包括文本、JSON、Avro、Protobuf、二进制数据、自定义对象和 CSV 数据。
        它的灵活性和可扩展性使得它能够处理各种数据格式,适用于不同的数据流和事件处理需求
        -----------------------------------------------------------------------------------------------------
        注意,虽然数据分为k-v两个部分,但是不要把它当成map集合,相同的key的数据value不会被去重掉
        -----------------------------------------------------------------------------------------------------
        数据流管道:Kafka 常用于构建数据流管道,将不同来源的数据(如数据库变更、用户活动、传感器数据等)传输到不同的处理系统。
        事件驱动架构:在事件驱动架构中,Kafka 充当事件的中心,通过主题传递不同的事件类型。
        实时分析:Kafka 可以与流处理框架(如 Apache Flink、Apache Storm、Kafka Streams 等)集成,用于实时数据处理和分析。
    b.文本数据
        可以直接将普通的文本消息(如日志行、配置文件、用户输入等)发送到 Kafka 主题。这是最常见的数据类型,尤其适合于日志和简单的事件数据流。
    c.JSON数据
        Kafka 非常适合处理 JSON 格式的数据。JSON 是一种灵活的数据交换格式,广泛用于各种应用程序和服务的数据交换。Kafka 消费者可以解析 JSON 数据,处理具体的字段和嵌套结构。
    d.Avro数据
        Avro 是一种数据序列化系统,支持模式(schema)演进,可以用于 Kafka 主题中的数据序列化。Avro 提供高效的二进制格式和与数据模式的集成,有助于减少数据大小和提高处理效率。Kafka 常与 Schema Registry 配合使用来管理 Avro 模式。
    e.Protobuf数据
        Protobuf 是一种高效的数据序列化格式,适用于需要跨语言、跨平台数据交换的场景。Kafka 支持 Protobuf 数据格式,允许以高效的方式在 Kafka 中存储和交换数据。
    f.序列化的二进制数据
        二进制数据:Kafka 允许发送任意二进制数据(例如图片、音频、视频文件等)。在这种情况下,数据可以被序列化为字节流,然后发送到 Kafka 主题。消费者可以将这些字节流反序列化为原始数据格式进行处理。
    g.自定义对象
        Java 对象或其他语言的对象:通过自定义序列化和反序列化逻辑,Kafka 可以处理特定语言中的自定义对象。例如,在 Java 中,你可以实现 Serializer 和 Deserializer 接口来处理自定义对象。
    h.CSV数据
        CSV 格式:虽然 CSV 不像 JSON 或 Avro 那样广泛使用,但 Kafka 也可以处理 CSV 格式的数据。CSV 文件中的每一行可以被视为一条消息。
    i.日志数据
        日志文件:Kafka 是日志数据处理的理想选择。你可以将系统日志、应用日志等数据流送入 Kafka,进行实时分析或存储。

92.__consumer_offsets
    a.介绍
        __consumer_offsets 是Kafka内部用于管理和存储消费者组偏移量的特殊主题。它是Kafka自动创建的系统主题,
        专门用于跟踪各个消费者组的消费进度
    b.作用
        __consumer_offsets主题的主要功能是记录和管理消费者组中每个消费者的消费偏移量(offset)。
        当消费者从Kafka主题中消费消息时,它会定期向Kafka提交自己消费到的最新偏移量,
        这些偏移量被记录在__consumer_offsets主题中。
        -----------------------------------------------------------------------------------------------------
        具体作用包括:
        跟踪消费进度:保存每个消费者组中各个分区的最新偏移量,确保消费者重启或故障恢复时能够从上次停止的位置继续消费。
        协调消费者组:消费者组中的多个消费者可以通过__consumer_offsets主题共享和协调消费进度,避免重复消费或消息丢失。
        提供高可用性和容错:通过将偏移量存储在Kafka集群中,而不是存储在本地,确保消费者在发生崩溃或迁移时能够快速恢复消费状态。
    c.__consumer_offsets主题的结构和存储方式
        __consumer_offsets主题存储的是偏移量相关的信息,这些信息是以特殊的内部格式存储的,包括以下关键内容:
        Key:包括消费者组名称、主题名称、分区号等,用于唯一标识每个消费者组对每个主题分区的消费状态。
        Value:包含偏移量、提交的时间戳、消费者元数据等,记录具体的消费位置信息。
    d.__consumer_offsets中的数据格式
        偏移量提交的消息格式:
        Key:(group_id, topic, partition),唯一标识消费者组对某个主题分区的消费状态。
        Value:包含偏移量(offset)、消费者元数据、提交时间戳等。
        -----------------------------------------------------------------------------------------------------
        偏移量提交示例:
        Key: (group: my-group, topic: my-topic, partition: 0)
        Value: { offset: 150, metadata: null, commitTimestamp: 1621234567890 }
    e.查看__consumer_offsets的信息
        尽管__consumer_offsets是内部主题,但可以使用Kafka的命令行工具查看它的基本信息,不过通常不直接消费该主题的数据,因为它是内部使用的。
        查看__consumer_offsets主题的基本信息:
        bin/kafka-topics.sh --describe --topic __consumer_offsets --bootstrap-server <broker_host>:<port>
    f.常见问题和注意事项
        分区数:__consumer_offsets主题通常有50个分区(默认值),这确保了较高的并发度和性能。可以根据集群规模和消费者组数量调整分区数。
        存储时间:偏移量的存储时间由offsets.retention.minutes配置决定,默认是7天。过期的偏移量记录会被定期清理。
        性能影响:大量消费者组频繁提交偏移量可能对__consumer_offsets的性能有一定影响,尤其是在高负载系统中。因此,适当调整偏移量提交的频率(auto.commit.interval.ms)可以优化性能。

93.代码运行后不结束
    a.介绍
        在 Kafka 消费者程序中,while (true) 循环的设计是为了持续监听并消费主题中的新消息。这种模式适合在服务端应用中使用,
        因为 Kafka 消费者通常是持续运行的服务组件,用于实时消费数据流。因此,即使当前主题中的所有消息都消费完毕,
        消费者依然会保持连接并等待新消息的到来。
        -----------------------------------------------------------------------------------------------------
        Kafka 消费者设计为长期运行模式,这使得它在处理持续的数据流时非常高效。如果需要消费者在消费完现有消息后自动终止,
        可以通过检测 poll() 的返回结果(是否为空)或设置一定条件(如无消息超过某个次数)来控制退出逻辑。
        这样就能避免程序一直等待而不自动结束。
    b.代码运行后不结束的原因
        while (true) 循环:
        这个循环使得程序会持续执行consumer.poll(Duration.ofMillis(100)),即使当前没有新消息,
        程序也会不断地检查主题中是否有新数据。这是 Kafka 消费者的典型设计,用于实时流式数据处理。
        -----------------------------------------------------------------------------------------------------
        consumer.poll() 行为:当没有新消息时,consumer.poll() 方法会等待指定的时间(这里是100毫秒)然后继续下一次轮询。
        这种行为确保消费者能够实时响应新发布到主题的消息,而不会因为没有数据而终止。
        -----------------------------------------------------------------------------------------------------
        消费者并不会自动结束:Kafka 消费者设计为一个长时间运行的进程,除非手动终止(如按 Ctrl + C),否则不会自动结束。
        它会保持运行状态以便随时消费新发布的消息。
    c.如何改为自动结束
        a.修改消费条件
            可以改为检测到消息为空时结束循环
            -------------------------------------------------------------------------------------------------
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (records.isEmpty()) {
                    System.out.println("No more messages, exiting...");
                    break; // 退出循环
                }
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: (key: %s, value: %s) at offset %d%n",
                                      record.key(), record.value(), record.offset());
                }
            }
            consumer.close(); // 关闭消费者
        b.使用计数或时间限制
            可以设置一个计数器或时间条件,判断在若干轮次内都没有新消息时退出。
            -------------------------------------------------------------------------------------------------
            int emptyCount = 0; // 空消息计数
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (records.isEmpty()) {
                    emptyCount++;
                    if (emptyCount > 10) { // 如果连续10次都没有新消息,则退出
                        System.out.println("No more messages for a while, exiting...");
                        break;
                    }
                } else {
                    emptyCount = 0; // 如果有消息,将计数器重置
                }
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: (key: %s, value: %s) at offset %d%n",
                                      record.key(), record.value(), record.offset());
                }
            }
            consumer.close(); // 关闭消费者
        c.手动触发终止
            可以在运行程序时通过特定输入(例如按键、命令)来终止消费者,这种方式适用于交互式或服务类应用。

94.默认不能重复消费
    a.介绍
        Kafka 的设计原则之一是保证消息的处理可靠性和可伸缩性,但默认情况下,Kafka 并不会重复消费已经被消费的消息。
        消息的消费状态由消费者的偏移量(offset)管理。
        -----------------------------------------------------------------------------------------------------
        Kafka 的设计初衷是尽可能减少消息重复消费的情况,但在特定条件下,如消费者故障或配置不当,可能会导致消息被重复消费。
        通过正确配置偏移量提交策略、实现幂等操作和处理故障恢复机制,可以有效避免消息重复消费带来的问题。
    b.消费偏移量管理
        Kafka 中的每个消费者都有一个消费者组(Consumer Group),每个组内部的消费者会共同消费同一主题的消息。
        消费者组的偏移量记录在 __consumer_offsets 主题中,Kafka 使用这个偏移量来跟踪每个消费者组在各个分区中的消费进度。
        -----------------------------------------------------------------------------------------------------
        自动提交:默认情况下,Kafka 消费者会定期自动提交偏移量。这意味着消费者处理完消息后,Kafka 会记录这些消息的偏移量,以便在消费者重新启动时能够从上次提交的位置继续消费。
        手动提交:消费者可以选择手动提交偏移量,这样可以在处理完消息后更精确地控制何时提交偏移量。这允许在消息处理失败时进行重试,避免丢失消息。
    c.重复消费的情况
        尽管Kafka设计目标是避免重复消费,但在实际使用中,可能会遇到以下情况导致消息被重复消费:
        消费者故障:如果消费者在处理消息后崩溃,而偏移量尚未提交,那么这些消息在消费者重启时可能会被重复消费。
        手动提交偏移量失败:如果消费者在处理完消息后没有及时提交偏移量,或在提交过程中发生错误,可能导致消息重复消费。
        配置错误:如果自动提交偏移量的频率过低,消费者在处理大量消息时,可能会遇到重复消费的情况。
    d.确保不重复消费的实践
        为了避免消息重复消费,可以采取以下措施:
        1.使用幂等操作:确保消息处理逻辑是幂等的,即对同一条消息的多次处理结果相同。这可以通过在消息处理系统中实现幂等性操作来确保。
        -----------------------------------------------------------------------------------------------------
        2.手动提交偏移量:如果应用程序对消息处理的准确性要求高,可以选择手动提交偏移量。在确保消息处理成功后,再提交偏移量。
        try {
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Consumed message: (key: %s, value: %s) at offset %d%n",
                                  record.key(), record.value(), record.offset());
            }
            consumer.commitSync(); // 提交偏移量
        } catch (Exception e) {
            e.printStackTrace();
            // 处理异常情况
        }
        -----------------------------------------------------------------------------------------------------
        3.配置合适的 auto.offset.reset:设置 auto.offset.reset 参数为 earliest 或 latest,以确保消费者从预期的偏移量开始消费。
    e.故障恢复和消息重试
        在处理消息时,出现故障可能导致消息重复消费,这时可以结合以下方法来进行消息重试和故障恢复:
        异常处理和重试机制:在处理消息时,如果遇到异常,可以将消息放入错误队列或重试队列,并在合适的时机进行重试。
        利用事务:如果消息处理系统支持事务,可以使用事务来确保消息处理的原子性和一致性。

95.负载均衡机制
    a.介绍
        当你使用 kafka-topics.sh 创建一个新的 Kafka 主题(如 new-topic),并指定分区和副本因子时,
        Kafka 会根据集群的状态自动分配消息到相应的机器(Broker)。
        -----------------------------------------------------------------------------------------------------
        在 Kafka 中,消息的分配到具体的 Broker 是由 Kafka 的负载均衡机制自动管理的。
        分区的领导者副本和跟随者副本会根据集群的状态和负载情况分配到不同的 Brokers 上。
        你可以使用 kafka-topics.sh 工具查看分区和副本的实际分配情况。
    b.分区和副本分配
        分区:每个主题可以有一个或多个分区。你指定了 --partitions 3,这表示 new-topic 将有 3 个分区。Kafka 会将这 3 个分区分配到不同的 Brokers 上。
        -----------------------------------------------------------------------------------------------------
        副本因子:副本因子 (--replication-factor 2) 指定每个分区有两个副本(即两个副本和一个领导者副本)。副本的分配是为了确保数据的高可用性和容错性。
    c.消息的分配
        领导者和副本分配:每个分区有一个领导者(Leader)副本和多个跟随者(Follower)副本。所有的生产和消费操作都是在领导者副本上进行的。领导者副本负责处理所有的读写请求,而跟随者副本则从领导者副本同步数据。
        自动分配:Kafka 使用内置的负载均衡算法自动将分区和副本分配到 Kafka 集群中的不同 Brokers 上。你指定的分区和副本因子会根据 Kafka 集群的实际状态(如 Brokers 的数量和负载)进行分配。Kafka 会尝试均匀分配分区和副本,以确保集群负载均衡和数据高可用性。
    d.如何查看分配情况
        你可以使用以下命令查看主题的分区和副本的分配情况:
        bin/kafka-topics.sh --describe --topic new-topic --bootstrap-server 192.168.185.150:9092
        -----------------------------------------------------------------------------------------------------
        输出示例:
        Topic: new-topic    TopicId: <topic-id> PartitionCount: 3   ReplicationFactor: 2    Configs:
            Topic: new-topic    Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1,2
            Topic: new-topic    Partition: 1    Leader: 2   Replicas: 2,3   Isr: 2,3
            Topic: new-topic    Partition: 2    Leader: 3   Replicas: 3,1   Isr: 3,1
        -----------------------------------------------------------------------------------------------------
        在这个输出中:
        Partition:分区编号。
        Leader:当前领导者副本的 Broker ID。
        Replicas:该分区的所有副本的 Broker ID(包括领导者和跟随者)。
        Isr(In-Sync Replicas):当前与领导者副本保持同步的副本。
    e.默认行为
        负载均衡:Kafka 默认会尽量均匀地分配分区和副本到不同的 Brokers,以避免单个 Broker 过载。
        Broker 选择:如果你有特定的需求,可以通过 Kafka 的 rack awareness 配置或自定义分配策略来控制分区和副本的具体分配方式。

7 Kafka场景

7.1 如何保证消息的顺序性

01.总结
    Kafka 中的消息顺序取决于分区级别。在特定的分区内,消息是以严格的顺序存储和消费的,但跨分区时顺序无法保证
    kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是分区之间的消息是不保证有序的。
    ---------------------------------------------------------------------------------------------------------
    分区内顺序:保证顺序性。
    跨分区顺序:无顺序保证。
    解决方案:通过控制分区数、消息键分配和消费者处理策略可以增强顺序控制。

02.Kafka消息顺序的行为
    a.分区内的顺序
        顺序性:在同一个分区内,消息按照它们发送的顺序存储和消费。生产者发送的消息会被追加到分区的末尾,因此同一分区内的消息顺序是保证的。
        消费顺序:消费者从分区读取消息时,会按照消息的偏移量(offset)顺序进行,因此保证消息的消费顺序与发送顺序一致。
    b.跨分区的顺序
        无顺序保证:如果一个主题有多个分区,消息可能会分布到不同的分区中,因此跨分区的消息顺序无法保证。
        消息顺序混乱:在多分区的场景中,消费者可能会从不同的分区并发读取消息,这会导致全局的消息顺序不再严格。

03.如何确保消息顺序?
    如果需要严格的消息顺序,通常需要限制在单个分区内处理,或通过一些策略来控制消息的分发:
    a.单分区主题
        使用只有一个分区的主题,这样所有消息都进入同一个分区,从而保证顺序。但是,这种方式限制了 Kafka 的吞吐量和并行处理能力,不适用于高吞吐需求的场景。
    b.使用相同的键(Key)
        当使用带有键的消息时,Kafka 的默认分区器会根据键进行哈希,确保相同键的消息进入同一个分区。这可以确保对于相同键的消息顺序性。
        例如,订单系统可以使用订单 ID 作为键,这样同一个订单的所有操作消息都会进入同一个分区,从而保证该订单相关消息的顺序。
    c.消费者的顺序处理
        在消费端确保顺序也很关键。即使分区内消息是顺序的,如果消费者并行处理,顺序仍然可能被打乱。因此,确保单线程或顺序处理是必要的。

7.2 如何保证消息不丢失?

01.生产者端
    通过同步发送或添加异步回调函数来确保消息发送成功,并利用重试参数(RETRANCE)自动处理发送失败的情况

02.Broker端
    通过持久化消息到磁盘,并结合分区副本机制和ACKS参数来确保消息不丢失

03.消费者端
    只要生产者和Broker的消息可靠性得到保障,消费者端不太可能出现消息无法消费的问题,
    必要时可以通过调整offset值来重新消费

04.副本机制
    每个分区副本集包含唯一的Leader和多个Follower,Leader处理事务请求,Follower同步Leader数据

05.ACKS参数
    生产者可以设置ACKS参数结合Broker的副本机制,确保数据的可靠性

7.3 如何提升Kafka吞吐量?

00.总结
    1.生产者优化
    2.消费者优化
    3.Kafka Broker配置优化
    4.网络与硬件优化
    5.集群副本策略优化
    6.监控与压测

01.生产者优化
    生产者提升吞吐量的优化手段有以下几个:
    消息批量发送:增加 batch.size(批量消息数量设置)和适当调整 linger.ms(批次逗留时间),以允许生产者累积更多消息后再发送,减少网络请求次数。
    消息压缩:设置 compression.type(默认值为 none,不压缩),该参数为生产者发送数据的压缩方式,包括 gzip、snappy、lz4、zstd 等。启用消息压缩(如 Snappy、LZ4),减少网络传输的数据量,尽管这会增加 CPU 负担。
    增大缓冲区大小:通过增加 buffer.memory 配置(生产者内存缓冲区大小),允许生产者在等待发送时缓存更多消息。
    优化 acks 配置:适当降低 acks 级别以减少等待确认的时间,但需权衡数据的持久性。acks 级别含义如下:
    acks=0:生产者不会等待来自 Broker 的消息发送成功与否的确认,如果 Broker 没有收到消息,那生产者是不知道的。该配置吞吐量高,但可能会丢失数据。
    acks=1:默认值,生产者将消息写入 leader 副本后,就会收到 Broker 的确认消息。如果 leader 副本同步成功了,但还没有来得及同步给 follower 副本,此时就发生宕机了,那就会丢失数据。
    acks=-1:生产者将消息写入 leader 副本和所有 follower 副本后,才会收到 Broker 的确认消息。该配置可以保证不丢数据,但是吞吐量低。
    并行生产:利用多线程或多生产者实例并行发送消息。

02.消费者优化
    生产者提升吞吐量的优化手段有以下几个:
    增加消费者实例:确保每个分区至少有一个消费者,以充分利用并行处理能力。
    增加每次拉取的消息数量:通过调整 fetch.min.bytes(消息拉取最小容量)和 fetch.max.bytes(消息拉取最大容量)增加每次拉取的消息数量。
    并行处理:在消费者内部使用多线程处理消息。

03.Kafka Broker配置优化
    每个 broker 就是一个 Kafka 实例,它的优化手段有以下几个:
    增加分区数量:适当增加主题的分区数量,可以提高并行处理能力,但需避免过多分区导致的管理和协调开销。
    优化节点配置:包括但不限于 num.network.threads(网络线程数)、num.io.threads(I/O 线程数)、socket.send.buffer.bytes/socket.receive.buffer.bytes(套接字缓冲区大小)等,根据硬件资源和负载情况调整。
    磁盘优化:使用快速磁盘(如 SSD),并优化文件存储目录的布局以减少 I/O 竞争。
    JVM调优:Kafka 是运行在 JVM 上的,针对 Kafka 服务端的 JVM 进行适当的内存和 GC 优化,也可以提升有效的提升吞吐量。

04.网络与硬件优化
    网络和 Kafka 运行的硬件,也会影响 Kafka 的吞吐量,所以我们可以进行以下优化:
    网络优化:确保网络连接质量良好,减少网络延迟和丢包。
    硬件升级:增加服务器的 CPU、内存和磁盘性能。

05.集群副本策略优化
    合理配置副本放置,确保高可用的同时,减少跨数据中心的复制延迟,也可以有效的提升 Kafka 的吞吐量。

06.监控与压测
    持续监控:使用 Kafka 自带的监控工具或集成第三方监控系统(如 Prometheus+Grafana),持续监控性能指标。
    压测于调试:基于监控数据和性能测试结果,不断调整上述参数以找到最优配置。

7.4 流式处理?对用户的点赞和阅读行为计算,实时计算热门文章功能

00.流程
    a.用户点赞行为和用户阅读行为
        用户在应用中进行点赞和阅读操作。
    b.发送消息给stream流式处理
        用户的点赞和阅读行为会生成相应的消息,并通过Kafka发送到stream流式处理系统。
    c.Kafka Stream聚合处理
        Kafka Stream处理这些用户行为数据,对数据进行聚合和分析。
    d.更新数据库数量
        处理后的数据通过Kafka更新到数据库中,保持数据的最新状态。
    e.重新计算文章分值
        系统重新计算文章分值,计算时会考虑当日热度的权重整体*3,以增加当日热度的影响。
    f.查询Redis对应数据
        系统查询Redis中当前存储的文章分值数据。
    g.比较分值
        将重新计算的文章分值与Redis中的数据进行比较。
    h.替换
        如果重新计算的分值大于Redis中的分值,则更新Redis中的数据。
    i.存入Redis
        最终将当前频道和推荐的文章分值存入Redis,以便快速访问和推荐。