1 RabbitMQ介绍

1.1 定义

01.RabbitMQ概念
    Broker: 简单来说就是消息队列服务器实体
    Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
    Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
    Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
    Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
    VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
    Producer: 消息生产者,就是投递消息的程序
    Consumer: 消息消费者,就是接受消息的程序
    Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
    ---------------------------------------------------------------------------------------------------------
    Producer: 消息生产者,就是投递消息的程序
    Consumer: 消息消费者,就是接受消息的程序
    Channel: 我们的客户端连接都会使用一个Channel,再通过Channel去访问到RabbitMQ服务器,注意通信协议不是http,而是amqp协议。
    Exchange:类似于交换机一样的存在,会根据我们的请求,转发给相应的消息队列,每个队列都可以绑定到Exchange上,这样Exchange就可以将数据转发给队列了,可以存在很多个,不同的Exchange类型可以用于实现不同消息的模式。
    Queue:消息队列本体,生产者所有的消息都存放在消息队列中,等待消费者取出。
    VirtualHost:有点类似于环境隔离,不同环境都可以单独配置一个VirtualHost,每个VirtualHost可以包含很多个Exchange和
    Queue,每个VirtualHost相互之间不影响。
    ---------------------------------------------------------------------------------------------------------
    生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
    交换机(Exchange):和生产者建立连接并接收生产者的消息。
    消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
    队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
    路由(Routes):交换机转发消息到队列的规则。

02.RabbitMQ特点
    RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
    1、可靠性
    2、灵活的路由
    3、消息集群
    4、高可用
    5、多种协议
    6、多语言客户端
    7、管理界面
    8、跟踪机制
    9、插件机制

03.Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers
    (1)direct:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
    (2)fanout:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
    (3)topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
    (4)headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了

04.RabbitMQ的工作模式
    一.simple模式(即最简单的收发模式)
        消息产生消息,将消息放入队列
        消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
    二、work工作模式(资源的竞争)
        消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
    三、publish/subscribe发布订阅(共享资源)
        每个消费者监听自己的队列;
        生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
    四、routing路由模式
        消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
        根据业务功能定义路由字符串
        从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
        业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
    五.topic 主题模式(路由模式的一种)
        星号井号代表通配符
        星号代表多个单词,井号代表一个单词
        路由功能添加模糊匹配
        消息产生者产生消息,把消息交给交换机
        交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

05.重要概念
    a.消息队列(Message Queue)
        定义:一种用于异步传递数据的通信机制,允许应用程序通过队列交换消息,解耦生产者和消费者。
        作用:提高系统的可靠性和伸缩性。
    b.生产者(Producer)
        定义:发送消息到 RabbitMQ 的应用程序或服务。
        作用:生成并发送消息到消息队列。
    c.消费者(Consumer)
        定义:接收和处理来自 RabbitMQ 消息队列的消息的应用程序或服务。
        作用:处理队列中的消息,执行任务。
    d.交换机(Exchange)
        定义:负责接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列。
        类型:
        Direct:精确匹配路由键的消息。消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
        Fanout:将消息广播到所有绑定的队列。把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
        Topic:根据通配符模式路由消息。通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
        Headers:根据消息头属性路由消息。不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
    e.队列(Queue)
        定义:存储消息的容器,消费者从队列中获取消息进行处理。
        作用:确保消息的顺序性和持久性。
    f.路由键(Routing Key)
        定义:用于将消息路由到特定队列的标识符。
        作用:在交换机和队列之间进行消息路由。
    g.绑定(Binding)
        定义:交换机和队列之间的连接关系,定义了如何将消息从交换机路由到队列。
        作用:配置消息的路由规则。
    h.消息确认(Acknowledgement)
        定义:消费者处理消息后发送的确认信号,通知 RabbitMQ 消息已被成功处理。
        作用:确保消息不会丢失。
    i.持久化(Persistence)
        定义:确保消息在 RabbitMQ 重启后仍然存在的机制。
        作用:通过设置消息和队列的持久化选项,防止数据丢失。
    j.死信队列(Dead-Letter Queue, DLQ)
        定义:用于存储无法成功处理的消息的队列。
        作用:处理和分析错误消息,以便于排查和解决问题。
    k.消费者确认(Consumer Acknowledgement)
        定义:确认消费者成功处理了消息后发送的信号。
        作用:确保消息被处理后才从队列中删除。
    l.集群(Cluster)
        定义:多个 RabbitMQ 节点组成的集群,共享消息队列数据,提高可用性和伸缩性。
        作用:提供高可用性和负载均衡。
    m.镜像队列(Mirrored Queue)
        定义:将消息队列的副本分布到多个节点上,以增强数据的持久性和高可用性。
        作用:防止数据丢失,确保队列的高可用性。

1.2 界面

1.3 集群搭建

01.两种模式
    a.普通集群
        普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
        此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
        当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
        这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
    b.镜像集群
        它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。
        也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,
        这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
    c.节点类型
        RabbitMQ 中的节点类型有两种:
        RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
        Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息
        -----------------------------------------------------------------------------------------------------
        RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,
        必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,
        但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,
        或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。

02.搭建普通集群
    a.预备知识
        搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,
        我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。
        -----------------------------------------------------------------------------------------------------
        RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。
        可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。
        如果主机名 ping 不通,RabbitMQ 服务启动会失败
        如果我们是在不同的服务器上搭建 RabbitMQ 集群,我们将通过 Docker 的容器连接 link 来实现容器之间的访问
    b.开始搭建
        a.创建三个 RabbitMQ 容器:
            docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
            docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
            docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management
            -------------------------------------------------------------------------------------------------
            三个节点现在就启动好了,注意在 mq02 和 mq03 中,分别使用了 --link 参数来实现容器连接,关于这个参数,
            另外还需要注意,mq03 容器中要既能够连接 mq01 也能够连接 mq02。
        b.接下来进入到 mq02 容器中,首先查看一下 hosts 文件,可以看到我们配置的容器连接已经生效了
            将来在 mq02 容器中,就可以通过 mylink01 或者 rabbit01 访问到 mq01 容器了。
        c.分别执行如下命令将 mq02 容器加入集群中:
            rabbitmqctl stop_app
            rabbitmqctl join_cluster rabbit@rabbit01
            rabbitmqctl start_app
        d.过相同的方式将 mq03 也加入到集群中
            rabbitmqctl stop_app
            rabbitmqctl join_cluster rabbit@rabbit01
            rabbitmqctl start_app
        e.输入如下命令我们可以查看集群的状态
            rabbitmqctl cluster_status
    c.代码测试
        a.applicaiton.properties
            spring.rabbitmq.addresses=localhost:5671,localhost:5672,localhost:5673
            spring.rabbitmq.username=guest
            spring.rabbitmq.password=guest
        b.简单的队列
            @Configuration
            public class RabbitConfig {
                public static final String MY_QUEUE_NAME = "my_queue_name";
                public static final String MY_EXCHANGE_NAME = "my_exchange_name";
                public static final String MY_ROUTING_KEY = "my_queue_name";

                @Bean
                Queue queue() {
                    return new Queue(MY_QUEUE_NAME, true, false, false);
                }

                @Bean
                DirectExchange directExchange() {
                    return new DirectExchange(MY_EXCHANGE_NAME, true, false);
                }

                @Bean
                Binding binding() {
                    return BindingBuilder.bind(queue())
                            .to(directExchange())
                            .with(MY_ROUTING_KEY);
                }
            }
        c.生产者
            @SpringBootTest
            class ProviderApplicationTests {

                @Autowired
                RabbitTemplate rabbitTemplate;

                @Test
                void contextLoads() {
                    rabbitTemplate.convertAndSend(null, RabbitConfig.MY_QUEUE_NAME, "hello 江南一点雨");
                }
            }
            -------------------------------------------------------------------------------------------------
            这条消息发送成功之后,在 RabbitMQ 的 Web 管理端,我们会看到三个 RabbitMQ 实例上都会显示有一条消息,
            但是实际上消息本身只存在于一个 RabbitMQ 实例。
        d.消费者
            @Component
            public class MsgReceiver {

                @RabbitListener(queues = RabbitConfig.MY_QUEUE_NAME)
                public void handleMsg(String msg) {
                    System.out.println("msg = " + msg);
                }
            }
            -------------------------------------------------------------------------------------------------
            当消息消费者启动成功后,这个方法中只收到一条消息,进一步验证了我们搭建的 RabbitMQ 集群是没问题的。

03.搭建镜像集群
    a.介绍
        所谓的镜像集群模式并不需要额外搭建,只需要我们将队列配置为镜像队列即可。
        这个配置可以通过网页配置,也可以通过命令行配置,我们分别来看。
    b.方式1:网页配置镜像队列
        点击 Admin 选项卡,然后点击右边的 Policies,再点击 Add/update a policy4
        -----------------------------------------------------------------------------------------------------
        接下来添加一个策略
        各参数含义如下:
        Name: policy 的名称。
        Pattern: queue 的匹配模式(正则表达式)。
        Definition:镜像定义,主要有三个参数:ha-mode, ha-params, ha-sync-mode。
        ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
        ha-params:ha-mode 模式需要用到的参数。
        ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
        priority 为可选参数,表示 policy 的优先级。
        -----------------------------------------------------------------------------------------------------
        配置完成后,点击下面的 add/update policy 按钮,完成策略的添加,
        -----------------------------------------------------------------------------------------------------
        添加完成后,我们可以进行一个简单的测试。
        首先确认三个 RabbitMQ 都启动了,然后用上面的 provider 向消息队列发送一条消息。
        发完之后关闭 mq01 实例。
        接下来启动 consumer,此时发现 consumer 可以完成消息的消费(注意和前面的反向测试区分),这就说明镜像队列已经搭建成功了。
    c.方式2:命令行配置镜像队列
        rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
        rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

1.4 用户、权限、VirtualHost

01.用户管理
    因为 vhost 通常跟用户一起出现,所以这里我也顺便说下 user 的相关操作。
    添加一个用户名为 javaboy,密码为 123 的用户,方式如下:
    rabbitmqctl add_user javaboy 123
    ---------------------------------------------------------------------------------------------------------
    通过如下命令可以修改用户密码(将 javaboy 的密码改为 123456):
    rabbitmqctl change_password javaboy 123456
    ---------------------------------------------------------------------------------------------------------
    通过如下命令可以验证用户密码:
    rabbitmqctl authenticate_user javaboy 123456
    ---------------------------------------------------------------------------------------------------------
    通过如下命令可以查看当前的所有用户:
    rabbitmqctl list_users
    ---------------------------------------------------------------------------------------------------------
    给用户设置角色的命令如下(给 javaboy 设置 administrator 角色):
    rabbitmqctl set_user_tags javaboy administrator
    ---------------------------------------------------------------------------------------------------------
    最后,删除一个用户的命令如下:
    rabbitmqctl delete_user javaboy

02.VirtualHost
    a.多租户
        我们安装一个 RabbitMQ 服务器,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
        本质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突。
        我们并不需要特别的去看待 vhost,他就跟普通的物理 RabbitMQ 一样,不同的 vhost 能够提供逻辑上的分离,确保不同的应用消息队列能够安全独立运行。
        要我来说,我们该怎么看待 vhost 和 RabbitMQ 的关系呢?RabbitMQ 相当于一个 Excel 文件,而 vhost 则是 Excel 文件中的一个个 sheet,我们所有的操作都是在某一个 sheet 上进行操作。
    b.方式1:命令
        因为松哥这里的 RabbitMQ 是用 docker 安装的,所以我们首先进入到 docker 容器中:
        docker exec -it some-rabbit /bin/bash
        -----------------------------------------------------------------------------------------------------
        然后执行如下命令创建一个名为 /myvh 的 vhost:
        rabbitmqctl add_vhost myvh
        -----------------------------------------------------------------------------------------------------
        然后通过如下命令可以查看已有的 vhost:
        rabbitmqctl list_vhosts
        -----------------------------------------------------------------------------------------------------
        可以通过如下命令删除一个 vhost:
        rabbitmqctl delete_vhost myvh
        当删除一个 vhost 的时候,与这个 vhost 相关的消息队列、交换机以及绑定关系等,统统都会被删除。
        -----------------------------------------------------------------------------------------------------
        给一个用户设置 vhost:
        rabbitmqctl set_permissions -p myvh guest ".*" ".*" ".*"
        前面参数都好说,最后面三个 ".*" 含义分别如下:
        用户在所有资源上都拥有可配置权限(创建/删除消息队列、创建/删除交换机等)。
        用户在所有资源上都拥有写权限(发消息)。
        用户在所有资源上都拥有读权限(消息消费,清空队列等)。
        -----------------------------------------------------------------------------------------------------
        禁止一个用户访问某个 vhost:
        rabbitmqctl clear_permissions -p myvh guest
    c.方式2:Web页面
        在 admin 选项卡中,点击右边的 Virtual Hosts,
        然后点击下边的 Add a new virtual host ,可以添加一个新的 vhost
        进入到某一个 vhost 之后,可以修改其权限以及删除一个 vhost

03.权限系统
    a.概念
        不管我们是通过网页还是通过命令行工具创建用户对象,刚创建好的用户对象都是没法直接使用的,
        需要我们首先把这个用户置于某一个 vhost 之下,然后再赋予其权限,有了权限,这个用户才可以正常使用。
    b.概念
        在这套 ACL 风格的权限管理系统中,允许非常多细粒度的权限控制,可以为不同用户分别设置读、写以及配置等权限。
        这里涉及到三种不同的权限:
        读:和消息消费有关的所有操作,包括清除整个队列的消息。
        写:发布消息。
        配置:消息队列、交换机等的创建和删除。
    c.方式1:命令
        a.权限操作命令
            rabbitmqctl set_permissions [-p vhosts] {user} {conf} {write} {read}
            -------------------------------------------------------------------------------------------------
            这里有几个参数:
            [-p vhost]:授予用户访问权限的 vhost 名称,如果不写默认为 /。
            user:用户名。
            conf:用户在哪些资源上拥有可配置权限(支持正则表达式)。
            write:用户在哪些资源上拥有写权限(支持正则表达式)。
            read:用户在哪些资源上拥有读权限(支持正则表达式)。
        b.假设我们有一个名为 zhangsan 的用户,我们希望该用户在 myvh 虚拟主机下具备所有权限,那么我们的操作命令如下:
            rabbitmqctl set_permissions -p myvh zhangsan ".*" ".*" ".*"
            执行如下命令可以验证授权是否成功:
            rabbitmqctl -p myvh list_permissions
            -------------------------------------------------------------------------------------------------
            在上面的授权命令中,我们用的都是 ".*",松哥再额外说下这个通配符:
            ".*":这个表示匹配所有的交换机和队列。
            "javaboy-.*":这个表示匹配名字以 javaboy- 开头的交换机和队列。
            "":这个表示不匹配任何队列与交换机(如果想撤销用户的权限可以使用这个)。
        c.我们可以使用如下命令来移除某一个用户在某一个 vhost 上的权限,例如移除 zhangsan 在 myvh 上的所有权限
            rabbitmqctl clear_permissions -p myvh zhangsan
            执行完成后,我们可以通过 rabbitmqctl -p myvh list_permissions 命令来查看执行结果是否生效,最终执行效果如下:
            -------------------------------------------------------------------------------------------------
            如果一个用户在多个 vhost 上都有对应的权限,按照上面的 rabbitmqctl -p myvh list_permissions 命令
            只能查看一个 vhost 上的权限,此时我们可以通过如下命令来查看 lisi 在所有 vhost 上的权限:
            rabbitmqctl list_user_permissions lisi
    d.方式2:Web页面
        可以设置权限,也可以清除权限

1.5 死信队列

00.总结
    01.第1种:单条消息过期
        体现(生产者)
        // 设置消息的过期时间为10000毫秒(10秒)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .expiration("10000") // 设置过期时间
            .build();
        channel.basicPublish("", "simple-queue1", props, message.getBytes());
    02.第2种:队列消息过期
        体现(生产者)
        // 设置消息过期时间为 10000 毫秒(10秒)
        channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});
    03.第3种:特殊情况
        还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
        这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
        是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)
    04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
        消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
        消息过期
        队列达到最大长度
        -----------------------------------------------------------------------------------------------------
        步骤如下:
        1: 创建连接工厂,设置连接属性
        2: 从连接工厂中获取
        3: 从连接中打开通道channel
        4.1: 声明死信交换机、死信队列、绑定死信队列和死信交换机
        4.2: 声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
        5: 通过channel发送消息
        -----------------------------------------------------------------------------------------------------
        体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定

00.TTL
    a.概念
        默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,
        如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
        -----------------------------------------------------------------------------------------------------
        TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,
        那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信,
    b.TTL 的设置有两种不同的方式:
        方式1:在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。
        方式2:在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。
        -----------------------------------------------------------------------------------------------------
        那如果两个都设置了呢?
        以时间短的为准。
    c.当我们设置了消息有效期后,消息过期了就会被从队列中删除了,但是两种方式对应的删除时机有一些差异:
        a.第一种方式
            当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,
            队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,
            有的话就直接删除。
        b.第二种方式
            当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,
            因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,
            当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除。

01.第1种:单条消息过期
    a.application.properties
        spring.rabbitmq.host=127.0.0.1
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.virtual-host=/
    b.消息队列
        @Configuration
        public class QueueConfig {

            public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo";
            public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo";
            public static final String HELLO_ROUTING_KEY = "hello_routing_key";

            @Bean
            Queue queue() {
                return new Queue(JAVABOY_QUEUE_DEMO, true, false, false);
            }

            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false);
            }

            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue())
                        .to(directExchange())
                        .with(HELLO_ROUTING_KEY);
            }
        }
        -------------------------------------------------------------------------------------------------
        这个配置类主要干了三件事:配置消息队列、配置交换机以及将两者绑定在一起。
        首先配置一个消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。
        配置一个 DirectExchange 交换机。
        将交换机和队列绑定到一起。
    c.生产者
        @RestController
        public class HelloController {
            @Autowired
            RabbitTemplate rabbitTemplate;

            @GetMapping("/hello")
            public void hello() {
                Message message = MessageBuilder.withBody("hello javaboy".getBytes())
                        .setExpiration("10000")
                        .build();
                rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
            }
        }
        -------------------------------------------------------------------------------------------------
        在创建 Message 对象的时候我们可以设置消息的过期时间,这里设置消息的过期时间为 10 秒。
    d.测试
        接下来我们启动项目,进行消息发送测试。当消息发送成功之后,由于没有消费者,所以这条消息并不会被消费。
        打开 RabbitMQ 管理页面,点击到 Queues 选项卡,10s 之后,我们会发现消息已经不见了:
        单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可。

02.第2种:队列消息过期
    a.application.properties
        spring.rabbitmq.host=127.0.0.1
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.virtual-host=/
    b.给队列设置消息过期时间
        @Configuration
        public class QueueConfig {

            public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo";
            public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo";
            public static final String HELLO_ROUTING_KEY = "hello_routing_key";

            @Bean
            Queue queue() {
                Map<String, Object> args = new HashMap<>();
                args.put("x-message-ttl", 10000);
                return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
            }

            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false);
            }

            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue())
                        .to(directExchange())
                        .with(HELLO_ROUTING_KEY);
            }
        }
        -------------------------------------------------------------------------------------------------
        给队列设置消息过期时间
        @Bean
        Queue queue() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 10000);
            return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
        }
    c.生产者
        @RestController
        public class HelloController {
            @Autowired
            RabbitTemplate rabbitTemplate;

            @GetMapping("/hello")
            public void hello() {
                Message message = MessageBuilder.withBody("hello javaboy".getBytes())
                        .build();
                rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
            }
        }
    d.测试
        可以看到,消息正常发送即可,不用设置消息过期时间。
        OK,启动项目,发送一条消息进行测试。查看 RabbitMQ 管理页面,如下:
        可以看到,消息队列的 Features 属性为 D 和 TTL,D 表示消息队列中消息持久化,TTL 则表示消息会过期。
        10s 之后刷新页面,发现消息数量已经恢复为 0。
        这就是给消息队列设置消息过期时间,一旦设置了,所有进入到该队列的消息都有一个过期时间了。

03.第3种:特殊情况
    还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
    这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
    是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)

04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
    a.介绍
        a.概念
            有小伙伴不禁要问,被删除的消息去哪了?真的被删除了吗?非也非也!这就涉及到死信队列了,接下来我们来看看死信队列。
            -------------------------------------------------------------------------------------------------
            死信交换机,Dead-Letter-Exchange 即 DLX。
            死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?
        b.一般消息变成死信消息有如下几种情况:
            消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
            消息过期
            队列达到最大长度
        c.总结
            当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
            DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,
            当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
        d.死信队列
            绑定了死信交换机的队列就是死信队列
    b.代码实现
        a.application.properties
            spring.rabbitmq.host=127.0.0.1
            spring.rabbitmq.port=5672
            spring.rabbitmq.username=guest
            spring.rabbitmq.password=guest
            spring.rabbitmq.virtual-host=/
        b.创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起。
            这其实跟普通的交换机,普通的消息队列没啥两样
            -------------------------------------------------------------------------------------------------
            public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
            public static final String DLX_QUEUE_NAME = "dlx_queue_name";
            public static final String DLX_ROUTING_KEY = "dlx_routing_key";

            /**
             * 配置死信交换机
             *
             * @return
             */
            @Bean
            DirectExchange dlxDirectExchange() {
                return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
            }
            /**
             * 配置死信队列
             * @return
             */
            @Bean
            Queue dlxQueue() {
                return new Queue(DLX_QUEUE_NAME);
            }
            /**
             * 绑定死信队列和死信交换机
             * @return
             */
            @Bean
            Binding dlxBinding() {
                return BindingBuilder.bind(dlxQueue())
                        .to(dlxDirectExchange())
                        .with(DLX_ROUTING_KEY);
            }
            -------------------------------------------------------------------------------------------------
            为消息队列配置死信交换机
            @Bean
            Queue queue() {
                Map<String, Object> args = new HashMap<>();
                //设置消息过期时间
                args.put("x-message-ttl", 0);
                //设置死信交换机
                args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
                //设置死信 routing_key
                args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
                return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
            }
            -------------------------------------------------------------------------------------------------
            就两个参数:
            x-dead-letter-exchange:配置死信交换机。
            x-dead-letter-routing-key:配置死信 routing_key。
            -------------------------------------------------------------------------------------------------
            将来发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,
            就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
        c.消费者(死信消息队列的消费和普通消息队列的消费并无二致)
            @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
            public void dlxHandle(String msg) {
                System.out.println("dlx msg = " + msg);
            }

1.6 延迟队列

00.介绍
    a.场景
        定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,
        这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,
        向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,
    b.场景
        因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如:
        在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
        我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
        公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
        安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
        用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
    c.在 RabbitMQ 上实现定时任务有两种方式:
        方式1:利用 RabbitMQ 自带的消息过期和死信队列机制,实现定时任务。(DLX)
        方式2:使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。

01.方式1:利用 RabbitMQ 自带的消息过期和死信队列机制,实现定时任务,即DLX 实现延迟队列
    a.介绍
        DLX 实现延迟队列
        延迟队列实现的思路也很简单,就是上篇文章我们所说的 DLX(死信交换机)+TTL(消息超时时间)
        -----------------------------------------------------------------------------------------------------
        我们可以把死信队列就当成延迟队列,也就是延迟队列的实现思路
        假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,
        同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,
        那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,
        此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
    b.代码实现
        a.application.properties
            spring.rabbitmq.host=localhost
            spring.rabbitmq.username=guest
            spring.rabbitmq.password=guest
            spring.rabbitmq.port=5672
        b.配置两个消息队列:一个普通队列,一个死信队列
            @Configuration
            public class QueueConfig {
                public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
                public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
                public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key";
                public static final String DLX_QUEUE_NAME = "dlx_queue_name";
                public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
                public static final String DLX_ROUTING_KEY = "dlx_routing_key";

                /**
                 * 死信队列
                 * @return
                 */
                @Bean
                Queue dlxQueue() {
                    return new Queue(DLX_QUEUE_NAME, true, false, false);
                }

                /**
                 * 死信交换机
                 * @return
                 */
                @Bean
                DirectExchange dlxExchange() {
                    return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
                }

                /**
                 * 绑定死信队列和死信交换机
                 * @return
                 */
                @Bean
                Binding dlxBinding() {
                    return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                            .with(DLX_ROUTING_KEY);
                }

                /**
                 * 普通消息队列
                 * @return
                 */
                @Bean
                Queue javaboyQueue() {
                    Map<String, Object> args = new HashMap<>();
                    //设置消息过期时间
                    args.put("x-message-ttl", 1000*10);
                    //设置死信交换机
                    args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
                    //设置死信 routing_key
                    args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
                    return new Queue(JAVABOY_QUEUE_NAME, true, false, false, args);
                }

                /**
                 * 普通交换机
                 * @return
                 */
                @Bean
                DirectExchange javaboyExchange() {
                    return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
                }

                /**
                 * 绑定普通队列和与之对应的交换机
                 * @return
                 */
                @Bean
                Binding javaboyBinding() {
                    return BindingBuilder.bind(javaboyQueue())
                            .to(javaboyExchange())
                            .with(JAVABOY_ROUTING_KEY);
                }
            }
            -------------------------------------------------------------------------------------------------
            配置可以分为两组,第一组配置死信队列,第二组配置普通队列。每一组都由消息队列、消息交换机以及 Binding 三者组成。
            配置消息队列时,为消息队列指定死信队列,
            配置队列中的消息过期时间时,默认的时间单位时毫秒。
        c.为死信队列配置一个消费者
            @Component
            public class DlxConsumer {
                private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

                @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
                public void handle(String msg) {
                    logger.info(msg);
                }
            }
        d.生产者
            @SpringBootTest
            class DelayQueueApplicationTests {

                @Autowired
                RabbitTemplate rabbitTemplate;

                @Test
                void contextLoads() {
                    System.out.println(new Date());
                    rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!");
                }

            }

02.方式2:使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
    a.安装插件
        下载 rabbitmq_delayed_message_exchange 插件
        -----------------------------------------------------------------------------------------------------
        下载完成后在命令行执行如下命令将下载文件拷贝到 Docker 容器中去:
        docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez some-rabbit:/plugins
        这里第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置。
        -----------------------------------------------------------------------------------------------------
        再执行如下命令进入到 RabbitMQ 容器中:
        docker exec -it some-rabbit /bin/bash
        进入到容器之后,执行如下命令启用插件:
        rabbitmq-plugins enable rabbitmq_delayed_message_exchange
        启用成功之后,还可以通过如下命令查看所有安装的插件,看看是否有我们刚刚安装过的插件,如下:
        rabbitmq-plugins list
    b.代码实现
        a.application.properties
            spring.rabbitmq.host=localhost
            spring.rabbitmq.password=guest
            spring.rabbitmq.username=guest
            spring.rabbitmq.virtual-host=/
        b.配置类
            @Configuration
            public class RabbitConfig {
                public static final String QUEUE_NAME = "javaboy_delay_queue";
                public static final String EXCHANGE_NAME = "javaboy_delay_exchange";
                public static final String EXCHANGE_TYPE = "x-delayed-message";

                @Bean
                Queue queue() {
                    return new Queue(QUEUE_NAME, true, false, false);
                }

                @Bean
                CustomExchange customExchange() {
                    Map<String, Object> args = new HashMap<>();
                    args.put("x-delayed-type", "direct");
                    return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
                }

                @Bean
                Binding binding() {
                    return BindingBuilder.bind(queue())
                            .to(customExchange()).with(QUEUE_NAME).noargs();
                }
            }
            -------------------------------------------------------------------------------------------------
            这里主要是交换机的定义有所不同,小伙伴们需要注意。
            这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:
                交换机名称。
                交换机类型,这个地方是固定的。
                交换机是否持久化。
                如果没有队列绑定到交换机,交换机是否删除。
                其他参数。
            最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,
            用了哪种类型,将来交换机分发消息就按哪种方式来。
        c.消费者
            @Component
            public class MsgReceiver {
                private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
                @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
                public void handleMsg(String msg) {
                    logger.info("handleMsg,{}",msg);
                }
            }
        d.生产者
            @SpringBootTest
            class MqDelayedMsgDemoApplicationTests {

                @Autowired
                RabbitTemplate rabbitTemplate;
                @Test
                void contextLoads() throws UnsupportedEncodingException {
                    Message msg = MessageBuilder.withBody(("hello 江南一点雨"+new Date()).getBytes("UTF-8")).setHeader("x-delay", 3000).build();
                    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg);
                }

            }
            -------------------------------------------------------------------------------------------------
            在消息头中设置消息的延迟时间。

1.7 发送可靠性?确认机制、事务

00.总结
    开启事务机制
    发送方确认机制
    失败重试

01.开启事务机制
    a.提供一个事务管理器
        @Bean
        RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
        }
    b.生产者:添加事务注解、设置通信信道为事务模式
        @Service
        public class MsgService {
            @Autowired
            RabbitTemplate rabbitTemplate;

            @Transactional
            public void send() {
                rabbitTemplate.setChannelTransacted(true);
                rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
                int i = 1 / 0;
            }
        }
        -----------------------------------------------------------------------------------------------------
        发送消息的方法上添加 @Transactional 注解标记事务。
        调用 setChannelTransacted 方法设置为 true 开启事务模式。
        -----------------------------------------------------------------------------------------------------
        当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
        1.客户端发出请求,将信道设置为事务模式。
        2.服务端给出回复,同意将信道设置为事务模式。
        3.客户端发送消息。
        4.客户端提交事务。
        5.服务端给出响应,确认事务提交。
        -----------------------------------------------------------------------------------------------------
        上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。
        所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。
        我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
        -----------------------------------------------------------------------------------------------------
        所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,
        这种方式,性能要远远高于事务模式,一起来看下。

02.发送方确认机制
    a.application.properties 配置开启消息发送方确认机制
        spring.rabbitmq.publisher-confirm-type=correlated
        spring.rabbitmq.publisher-returns=true
        -----------------------------------------------------------------------------------------------------
        第一行是配置消息到达交换器的确认回调,第二行则是配置消息到达队列的回调。
        第一行属性的配置有三个取值:
        none:表示禁用发布确认模式,默认即此。
        correlated:表示成功发布消息到交换器后会触发的回调方法。
        simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
    b.生产者
        @Service
        public class MsgService {
            @Autowired
            RabbitTemplate rabbitTemplate;

            public void send() {
                rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
                int i = 1 / 0;
            }
        }
    c.开启两个监听
        @Configuration
        public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
            public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
            public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
            private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Bean
            Queue queue() {
                return new Queue(JAVABOY_QUEUE_NAME);
            }
            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(JAVABOY_EXCHANGE_NAME);
            }
            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue())
                        .to(directExchange())
                        .with(JAVABOY_QUEUE_NAME);
            }

            @PostConstruct
            public void initRabbitTemplate() {
                rabbitTemplate.setConfirmCallback(this);
                rabbitTemplate.setReturnsCallback(this);
            }

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    logger.info("{}:消息成功到达交换器",correlationData.getId());
                }else{
                    logger.error("{}:消息发送失败", correlationData.getId());
                }
            }

            @Override
            public void returnedMessage(ReturnedMessage returned) {
                logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
            }
        }
        -----------------------------------------------------------------------------------------------------
        关于这个配置类,我说如下几点:
        定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
        定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。
    d.单条消息处理,接下来我们对消息发送进行测试
        首先我们尝试将消息发送到一个不存在的交换机中,像下面这样:
        rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
        注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报错
        -----------------------------------------------------------------------------------------------------
        接下来我们给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
        注意此时第二个参数是一个字符串,不是变量。
        -----------------------------------------------------------------------------------------------------
        可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列(因为队列不存在)。
        这是一条消息的发送,我们再来看看消息的批量发送。
    e.消息批量处理
        如果是消息批量处理,那么发送成功的回调监听是一样的,这里不再赘述。
        这就是 publisher-confirm 模式。
        相比于事务,这种模式下的消息吞吐量会得到极大的提升。

03.失败重试
    a.概念
        失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
    b.自带重试机制
        前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。
        如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,
        这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
        spring.rabbitmq.template.retry.enabled=true
        spring.rabbitmq.template.retry.initial-interval=1000ms
        spring.rabbitmq.template.retry.max-attempts=10
        spring.rabbitmq.template.retry.max-interval=10000ms
        spring.rabbitmq.template.retry.multiplier=2
        -----------------------------------------------------------------------------------------------------
        从上往下配置含义依次是:
        开启重试机制。
        重试起始间隔时间。
        最大重试次数。
        最大重试间隔时间。
        间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
        -----------------------------------------------------------------------------------------------------
        配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。
    c.业务重试
        业务重试主要是针对消息没有到达交换器的情况。如果消息没有成功到达交换器,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!
        a.第一步
            首先创建一张表,用来记录发送到中间件上的消息,像下面这样:
            每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
            status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
            tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
            count:表示消息重试次数
        b.第二步
            在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
        c.第三步
            在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1
            (在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
        d.第四步
            另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,
            把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,
            表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
        e.总结
            当然这种思路有两个弊端:
            去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
            按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。

1.8 消费可靠性?确认机制、幂等性

00.总结
    两种消费思路
    确保消费成功两种思路
    消息拒绝
    消息确认
    幂等性

01.两种消费思路
    a.两种消费思路
        RabbitMQ 的消息消费,整体上来说有两种不同的思路:
        推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
        拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。
    b.推(push)
        通过 @RabbitListener 注解去标记消费者,如下:
        @Component
        public class ConsumerDemo {
            @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
            public void handle(String msg) {
                System.out.println("msg = " + msg);
            }
        }
        当监听的队列中有消息时,就会触发该方法。
    c.拉(pull)
        @Test
        public void test01() throws UnsupportedEncodingException {
            Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
            System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
        }
        -----------------------------------------------------------------------------------------------------
        调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,
        如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,
        可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,
        则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,
        3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
    d.总结
        如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。
        切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。

02.确保消费成功两种思路
    a.概念
        为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。
        当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
        -----------------------------------------------------------------------------------------------------
        当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
        当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
    b.在 RabbitMQ 的 web 管理页面
        Ready 表示待消费的消息数量。
        Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
        这是我们可以从 UI 层面观察消息的消费情况确认情况。
    c.当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
        待消费的消息
        已经投递给消费者,但是还没有被消费者确认的消息
        -----------------------------------------------------------------------------------------------------
        换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,
        当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。
        如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,
        那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
    d.总结
        综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。
        当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。

03.消息拒绝
    a.当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
        @Component
        public class ConsumerDemo {
            @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
            public void handle(Channel channel, Message message) {
                //获取消息编号
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                try {
                    //拒绝消息
                    channel.basicReject(deliveryTag, true);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    b.消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
        1.获取消息编号 deliveryTag。
        2.调用 basicReject 方法拒绝消息。
    c.总结
        调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。
        如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;
        如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
        需要注意的是,basicReject 方法一次只能拒绝一条消息。

04.消息确认
    a.自动确认
        在 Spring Boot 中,默认情况下,消息消费就是自动确认的
        -----------------------------------------------------------------------------------------------------
        @Component
        public class ConsumerDemo {
            @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
            public void handle2(String msg) {
                System.out.println("msg = " + msg);
                int i = 1 / 0;
            }
        }
        -----------------------------------------------------------------------------------------------------
        通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,
        默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待
        下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
    b.手动确认
        a.推模式手动确认
            a.application.properties
                要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:
                spring.rabbitmq.listener.simple.acknowledge-mode=manual
                这个配置表示将消息的确认模式改为手动确认。
            b.消费者
                @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
                public void handle3(Message message,Channel channel) {
                    long deliveryTag = message.getMessageProperties().getDeliveryTag();
                    try {
                        //消息消费的代码写到这里
                        String s = new String(message.getBody());
                        System.out.println("s = " + s);
                        //消费完成后,手动 ack
                        channel.basicAck(deliveryTag, false);
                    } catch (Exception e) {
                        //手动 nack
                        try {
                            channel.basicNack(deliveryTag, false, true);
                        } catch (IOException ex) {
                            ex.printStackTrace();
                        }
                    }
                }
                ---------------------------------------------------------------------------------------------
                将消费者要做的事情放到一个 try..catch 代码块中。
                如果消息正常消费成功,则执行 basicAck 完成确认。
                如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。
                ---------------------------------------------------------------------------------------------
                这里涉及到两个方法:
                basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
                basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。
        b.拉模式手动确认
            a.说明
                拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法
            b.原生办法
                public void receive2() {
                    Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
                    long deliveryTag = 0L;
                    try {
                        GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
                        deliveryTag = getResponse.getEnvelope().getDeliveryTag();
                        System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
                        channel.basicAck(deliveryTag, false);
                    } catch (IOException e) {
                        try {
                            channel.basicNack(deliveryTag, false, true);
                        } catch (IOException ex) {
                            ex.printStackTrace();
                        }
                    }
                }

05.幂等性
    a.场景
        消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ
        并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,
        这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次
        (参见四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?)。
        种种原因导致我们在消费消息时,一定要处理好幂等性问题。
    b.思路
        采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
        id-0(正在执行业务)
        id-1(执行业务成功)
        -----------------------------------------------------------------------------------------------------
        如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,
        如果 key 已经存在(说明之前有人消费过该消息),获取他的值,
        如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
        -----------------------------------------------------------------------------------------------------
        极端情况:第一个消费者在执行业务时,出现了死锁,
        在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。

2 RabbitMQ消费

2.1 图示

2.2 Hello World

00.介绍
    a.概念
        咦?这个咋没有交换机?
        这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。
    b.说明
        默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
        当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
        例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。

01.代码实现
    a.队列
        @Configuration
        public class HelloWorldConfig {

            public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";

            @Bean
            Queue queue1() {
                return new Queue(HELLO_WORLD_QUEUE_NAME);
            }
        }
    b.生产者
        @SpringBootTest
        class RabbitmqdemoApplicationTests {

            @Autowired
            RabbitTemplate rabbitTemplate;

            @Test
            void contextLoads() {
                rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
            }
        }
    c.消费者
        @Component
        public class HelloWorldConsumer {
            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
            public void receive(String msg) {
                System.out.println("msg = " + msg);
            }
        }

2.3 Work queues

00.介绍
    a.概念
        一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者
    b.说明
        一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。
        消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

01.代码实现
    a.队列
        @Configuration
        public class HelloWorldConfig {

            public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";

            @Bean
            Queue queue1() {
                return new Queue(HELLO_WORLD_QUEUE_NAME);
            }
        }
    b.生产者
        @SpringBootTest
        class RabbitmqdemoApplicationTests {

            @Autowired
            RabbitTemplate rabbitTemplate;

            @Test
            void contextLoads() {
                for (int i = 0; i < 10; i++) {
                    rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
                }
            }

        }
    c.消费者
        @Component
        public class HelloWorldConsumer {
            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
            public void receive(String msg) {
                System.out.println("receive = " + msg);
            }

            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
            public void receive2(String msg) {
                System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
            }
        }
    d.说明1
        可以看到,第二个消费者我配置了 concurrency 为 10,此时,
        对于第二个消费者,将会同时存在 10 个子线程去消费消息。
        -----------------------------------------------------------------------------------------------------
        启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者。
        消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异),
        消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。
    e.说明2
        消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息
        配置手动 ack 的方式如下:
        spring.rabbitmq.listener.simple.acknowledge-mode=manual
        -----------------------------------------------------------------------------------------------------
        消费者代码如下:
        @Component
        public class HelloWorldConsumer {
            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
            public void receive(Message message,Channel channel) throws IOException {
                System.out.println("receive="+message.getPayload());
                channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
            }

            @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
            public void receive2(Message message, Channel channel) throws IOException {
                System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());
                channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
            }
        }
        -----------------------------------------------------------------------------------------------------
        此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息。
        这就是 Work queues 这种情况。

2.4 Publish/Subscribe

00.介绍
    a.概念
        一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,
        每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
        需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,
        这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,
    b.有四种交换机可供选择
        direct 直连交换机:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
        fanout 扇形交换机:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
        topic 主题交换机:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
        headers 头交换机:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了

01.direct:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
    a.介绍
        DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
        当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
        例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
    b.DirectExchange交换机
        @Configuration
        public class RabbitDirectConfig {
            public final static String DIRECTNAME = "javaboy-direct";
            @Bean
            Queue queue() {
                return new Queue("hello-queue");
            }

            @Bean
            DirectExchange directExchange() {
                return new DirectExchange(DIRECTNAME, true, false);
            }

            @Bean
            Binding binding() {
                return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
            }
        }
        -----------------------------------------------------------------------------------------------------
        首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。
        创建一个Binding对象将Exchange和Queue绑定在一起。
        DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。
    c.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void directTest() {
                rabbitTemplate.convertAndSend("hello-queue", "hello direct!");
            }
        }
        -----------------------------------------------------------------------------------------------------
        在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送
    d.消费者
        @Component
        public class DirectReceiver {
            @RabbitListener(queues = "hello-queue")
            public void handler1(String msg) {
                System.out.println("DirectReceiver:" + msg);
            }
        }

02.fanout:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
    a.介绍
        FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,
        在这种策略中,routingkey 将不起任何作用
    b.FanoutExchange交换机
        @Configuration
        public class RabbitFanoutConfig {
            public final static String FANOUTNAME = "sang-fanout";
            @Bean
            FanoutExchange fanoutExchange() {
                return new FanoutExchange(FANOUTNAME, true, false);
            }
            @Bean
            Queue queueOne() {
                return new Queue("queue-one");
            }
            @Bean
            Queue queueTwo() {
                return new Queue("queue-two");
            }
            @Bean
            Binding bindingOne() {
                return BindingBuilder.bind(queueOne()).to(fanoutExchange());
            }
            @Bean
            Binding bindingTwo() {
                return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
            }
        }
    c.创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上
        @Component
        public class FanoutReceiver {
            @RabbitListener(queues = "queue-one")
            public void handler1(String message) {
                System.out.println("FanoutReceiver:handler1:" + message);
            }

            @RabbitListener(queues = "queue-two")
            public void handler2(String message) {
                System.out.println("FanoutReceiver:handler2:" + message);
            }
        }
    d.两个消费者
        @Component
        public class FanoutReceiver {
            @RabbitListener(queues = "queue-one")
            public void handler1(String message) {
                System.out.println("FanoutReceiver:handler1:" + message);
            }
            @RabbitListener(queues = "queue-two")
            public void handler2(String message) {
                System.out.println("FanoutReceiver:handler2:" + message);
            }
        }
    e.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void fanoutTest() {
                rabbitTemplate
                .convertAndSend(RabbitFanoutConfig.FANOUTNAME,
                        null, "hello fanout!");
            }
        }
        -----------------------------------------------------------------------------------------------------
        注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。

03.topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
    a.介绍
        TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到
        TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者
        多个 Queue 上。
    b.TopicExchange交换机
        @Configuration
        public class RabbitTopicConfig {
            public final static String TOPICNAME = "sang-topic";
            @Bean
            TopicExchange topicExchange() {
                return new TopicExchange(TOPICNAME, true, false);
            }
            @Bean
            Queue xiaomi() {
                return new Queue("xiaomi");
            }
            @Bean
            Queue huawei() {
                return new Queue("huawei");
            }
            @Bean
            Queue phone() {
                return new Queue("phone");
            }
            @Bean
            Binding xiaomiBinding() {
                return BindingBuilder.bind(xiaomi()).to(topicExchange())
                        .with("xiaomi.#");
            }
            @Bean
            Binding huaweiBinding() {
                return BindingBuilder.bind(huawei()).to(topicExchange())
                        .with("huawei.#");
            }
            @Bean
            Binding phoneBinding() {
                return BindingBuilder.bind(phone()).to(topicExchange())
                        .with("#.phone.#");
            }
        }
        -----------------------------------------------------------------------------------------------------
        首先创建 TopicExchange,参数和前面的一致。
        然后创建三个 Queue,
        第一个 Queue 用来存储和 “xiaomi” 有关的消息,
        第二个 Queue 用来存储和 “huawei” 有关的消息,
        第三个 Queue 用来存储和 “phone” 有关的消息。
        -----------------------------------------------------------------------------------------------------
        将三个 Queue 分别绑定到 TopicExchange 上,
        第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以“xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,
        第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,
        第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。
    c.三个消费者
        @Component
        public class TopicReceiver {
            @RabbitListener(queues = "phone")
            public void handler1(String message) {
                System.out.println("PhoneReceiver:" + message);
            }
            @RabbitListener(queues = "xiaomi")
            public void handler2(String message) {
                System.out.println("XiaoMiReceiver:"+message);
            }
            @RabbitListener(queues = "huawei")
            public void handler3(String message) {
                System.out.println("HuaWeiReceiver:"+message);
            }
        }
    d.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void topicTest() {
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机..");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻..");
            }
        }
        -----------------------------------------------------------------------------------------------------
        第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,
        第二条消息将被路由到名为 “huawei” 的 Queue 上,
        第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,
        第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,
        最后一条消息则将被路由到名为 “phone” 的 Queue 上。

04.headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
    a.介绍
        HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
    b.HeadersExchange交换机
        @Configuration
        public class RabbitHeaderConfig {
            public final static String HEADERNAME = "javaboy-header";
            @Bean
            HeadersExchange headersExchange() {
                return new HeadersExchange(HEADERNAME, true, false);
            }
            @Bean
            Queue queueName() {
                return new Queue("name-queue");
            }
            @Bean
            Queue queueAge() {
                return new Queue("age-queue");
            }
            @Bean
            Binding bindingName() {
                Map<String, Object> map = new HashMap<>();
                map.put("name", "sang");
                return BindingBuilder.bind(queueName())
                        .to(headersExchange()).whereAny(map).match();
            }
            @Bean
            Binding bindingAge() {
                return BindingBuilder.bind(queueAge())
                        .to(headersExchange()).where("age").exists();
            }
        }
        -----------------------------------------------------------------------------------------------------
        ,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,
        就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。
        whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含
        age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。
    c.创建两个消息消费者
        @Component
        public class HeaderReceiver {
            @RabbitListener(queues = "name-queue")
            public void handler1(byte[] msg) {
                System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length));
            }
            @RabbitListener(queues = "age-queue")
            public void handler2(byte[] msg) {
                System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length));
            }
        }
    d.生产者
        @RunWith(SpringRunner.class)
        @SpringBootTest
        public class RabbitmqApplicationTests {
            @Autowired
            RabbitTemplate rabbitTemplate;
            @Test
            public void headerTest() {
                Message nameMsg = MessageBuilder
                        .withBody("hello header! name-queue".getBytes())
                        .setHeader("name", "sang").build();
                Message ageMsg = MessageBuilder
                        .withBody("hello header! age-queue".getBytes())
                        .setHeader("age", "99").build();
                rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
                rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
            }
        }
        -----------------------------------------------------------------------------------------------------
        这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。

2.5 Routing

00.介绍
    a.概念
        一个生产者,一个交换机,两个队列,两个消费者,
        生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。
    b.说明
        这个就是按照 routing key 去路由消息

2.6 Topics

00.介绍
    a.概念
        一个生产者,一个交换机,两个队列,两个消费者,
        生产者创建 Topic 的 Exchange 并且绑定到队列中,
        这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写。
    b.说明
        通过 * 和 # 关键字,对指定 RoutingKey 内容

2.7 RPC

00.介绍
    a.概念
        关于 RabbitMQ 实现 RPC 调用,有的小伙伴可能会有一些误解,心想这还不简单?搞两个消息队列 queue_1 和 queue_2,
        首先客户端发送消息到 queue_1 上,服务端监听 queue_1 上的消息,收到之后进行处理;处理完成后,
        服务端发送消息到 queue_2 队列上,然后客户端监听 queue_2 队列上的消息,这样就知道服务端的处理结果了。
    b.说明
        这种方式不是不可以,就是有点麻烦!RabbitMQ 中提供了现成的方案可以直接使用,非常方便。接下来我们就一起来学习下。

01.架构图
    首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字。
    Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中。
    Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了。

02.生产者
    a.application.properties
        spring.rabbitmq.host=localhost
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.publisher-confirm-type=correlated
        spring.rabbitmq.publisher-returns=true
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-confirm-type=correlated
        首先是配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,
        只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-returns=true
        最后一行配置则是开启发送失败退回
    b.配置类
        @Configuration
        public class RabbitConfig {

            public static final String RPC_QUEUE1 = "queue_1";
            public static final String RPC_QUEUE2 = "queue_2";
            public static final String RPC_EXCHANGE = "rpc_exchange";

            /**
             * 设置消息发送RPC队列
             */
            @Bean
            Queue msgQueue() {
                return new Queue(RPC_QUEUE1);
            }

            /**
             * 设置返回队列
             */
            @Bean
            Queue replyQueue() {
                return new Queue(RPC_QUEUE2);
            }

            /**
             * 设置交换机
             */
            @Bean
            TopicExchange exchange() {
                return new TopicExchange(RPC_EXCHANGE);
            }

            /**
             * 请求队列和交换器绑定
             */
            @Bean
            Binding msgBinding() {
                return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
            }

            /**
             * 返回队列和交换器绑定
             */
            @Bean
            Binding replyBinding() {
                return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
            }


            /**
             * 使用 RabbitTemplate发送和接收消息
             * 并设置回调队列地址
             */
            @Bean
            RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
                RabbitTemplate template = new RabbitTemplate(connectionFactory);
                template.setReplyAddress(RPC_QUEUE2);
                template.setReplyTimeout(6000);
                return template;
            }


            /**
             * 给返回队列设置监听器
             */
            @Bean
            SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                container.setConnectionFactory(connectionFactory);
                container.setQueueNames(RPC_QUEUE2);
                container.setMessageListener(rabbitTemplate(connectionFactory));
                return container;
            }
        }
        -----------------------------------------------------------------------------------------------------
        配置了消息发送队列 msgQueue 和消息返回队列 replyQueue,然后将这两个队列和消息交换机进行绑定
    c.生产者
        @RestController
        public class RpcClientController {

            private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class);

            @Autowired
            private RabbitTemplate rabbitTemplate;

            @GetMapping("/send")
            public String send(String message) {
                // 创建消息对象
                Message newMessage = MessageBuilder.withBody(message.getBytes()).build();

                logger.info("client send:{}", newMessage);

                //客户端发送消息
                Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);

                String response = "";
                if (result != null) {
                    // 获取已发送的消息的 correlationId
                    String correlationId = newMessage.getMessageProperties().getCorrelationId();
                    logger.info("correlationId:{}", correlationId);

                    // 获取响应头信息
                    HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();

                    // 获取 server 返回的消息 id
                    String msgId = (String) headers.get("spring_returned_message_correlation");

                    if (msgId.equals(correlationId)) {
                        response = new String(result.getBody());
                        logger.info("client receive:{}", response);
                    }
                }
                return response;
            }
        }
        -----------------------------------------------------------------------------------------------------
        消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息。
        服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,这个就是消息发送时候的
        correlation_id,通过消息发送时候的 correlation_id 以及返回消息头中的 spring_returned_message_correlation
        字段值,我们就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的。
        -----------------------------------------------------------------------------------------------------
        这就是整个客户端的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够
        。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,
        这样就无法将返回的消息内容和发送的消息内容关联起来。

03.消费者
    a.application.properties
        spring.rabbitmq.host=localhost
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest
        spring.rabbitmq.publisher-confirm-type=correlated
        spring.rabbitmq.publisher-returns=true
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-confirm-type=correlated
        首先是配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,
        只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。
        -----------------------------------------------------------------------------------------------------
        spring.rabbitmq.publisher-returns=true
        最后一行配置则是开启发送失败退回
    b.配置类
        @Configuration
        public class RabbitConfig {

            public static final String RPC_QUEUE1 = "queue_1";
            public static final String RPC_QUEUE2 = "queue_2";
            public static final String RPC_EXCHANGE = "rpc_exchange";

            /**
             * 配置消息发送队列
             */
            @Bean
            Queue msgQueue() {
                return new Queue(RPC_QUEUE1);
            }

            /**
             * 设置返回队列
             */
            @Bean
            Queue replyQueue() {
                return new Queue(RPC_QUEUE2);
            }

            /**
             * 设置交换机
             */
            @Bean
            TopicExchange exchange() {
                return new TopicExchange(RPC_EXCHANGE);
            }

            /**
             * 请求队列和交换器绑定
             */
            @Bean
            Binding msgBinding() {
                return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
            }

            /**
             * 返回队列和交换器绑定
             */
            @Bean
            Binding replyBinding() {
                return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
            }
        }
    c.消费者
        @Component
        public class RpcServerController {
            private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class);
            @Autowired
            private RabbitTemplate rabbitTemplate;

            @RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
            public void process(Message msg) {
                logger.info("server receive : {}",msg.toString());
                Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
                CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
                rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
            }
        }
        -----------------------------------------------------------------------------------------------------
        这里的逻辑就比较简单了:
        服务端首先收到消息并打印出来。
        服务端提取出原消息中的 correlation_id。
        服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。

2.8 Publisher Confirms

00.RabbitMQ消息发送机制
    a.概念
        RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,
        然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。
    b.说明
        大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:
        消息成功到达 Exchange
        消息成功到达 Queue
        如果能确认这两步,那么我们就可以认为消息发送成功了。
        -----------------------------------------------------------------------------------------------------
        如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,
        多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
    c.说明
        经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
        1.确认消息到达 Exchange。
        2.确认消息到达 Queue。
        3.开启定时任务,定时投递那些发送失败的消息。
        上面提出的三个步骤,第三步需要我们自己实现,前两步 RabbitMQ 则有现成的解决方案。
        -----------------------------------------------------------------------------------------------------
        如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
        开启事务机制
        发送方确认机制
        -----------------------------------------------------------------------------------------------------
        这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报错

3 RabbitMQ使用

3.1 定义

01.RabbitMQ
    a.概念
        Exchange    消息交换机,它指定消息按什么规则,路由到哪个队列
        Queue       消息队列,每个消息都会被投入到一个或多个队列
        Binding     绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来
        Routing Key 路由关键字,exchange 根据这个关键字进行消息投递
        Vhost       虚拟主机,可以开设多个 vhost,用作不同用户的权限分离
        Producer    消息生产者,就是投递消息的程序
        Consumer    消息消费者,就是接受消息的程序
        Channel     消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel 代表一个会话任务
    b.流程
        RabbitMQ 投递过程:
        1.客户端连接到消息队列服务器,打开一个 channel。
        2.客户端声明一个 exchange,并设置相关属性。
        3.客户端声明一个 queue,并设置相关属性。
        4.客户端使用 routing key,在 exchange 和 queue 之间建立好绑定关系。
        5.客户端投递消息到 exchange。
        6.客户端从指定的 queue 中消费信息。
        -----------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,两个队列,两个消费者
        一个队列中的每条消息只会被一个消费者消费。
        但同一个队列可以有多个消费者,消息会在这些消费者之间轮询分发。

02.消费模式
    一个队列中的每条消息只会被一个消费者消费。
    但同一个队列可以有多个消费者,消息会在这些消费者之间轮询分发。
    a.Simple简单队列:最简单的收发模式
        一个生产者,一个消费者,一个队列(一个默认的交换机)
        -------------------------------------------------------------------------------------------------
        消息产生消息,将消息放入队列
        消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,
        自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,
        这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
    b.Work工作队列:资源的竞争
        方式1:轮询模式
        方式2:公平分发(能者多劳)
        -----------------------------------------------------------------------------------------------------
        一个生产者,两个消费者,一个队列(一个默认的交换机)
        -----------------------------------------------------------------------------------------------------
        消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。
        C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息
        (隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,
        可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
    c.publish/subscribe发布订阅:共享资源
        a.背景
            不管是简单队列模型还是工作队列模式只用到了 RabbitMQ 中的 Queue 队列,
            消息直接发送到队列中,消费者直接从队列中获取消息。
            -------------------------------------------------------------------------------------------------
            现在我们学习另一种消息的处理方式,消息生产者将消息发送到 Exchange 交换机中,
            再由交换机将消息投递到 Queue队列中,交换机和队列的关系是多对多的关系,
            表示一个交换机可以通过一定的规矩将消息投递到多个队列中,同样一个队列也可以接收多个交换机投递的消息
            -------------------------------------------------------------------------------------------------
            一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,
            每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
            需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,
            这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,
            -------------------------------------------------------------------------------------------------
            每个消费者监听自己的队列;
            生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
            -------------------------------------------------------------------------------------------------
            Direct 消息流转过程:
            1.生产者向Exchange发送消息。
            2.队列使用路由密钥绑定到Exchange。
            3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
            4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
            5.订阅队列的使用者接收消息并进行处理。
        b.direct路由模式,直连交换机,依赖routingkey
            消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中
            基于完全匹配、单播的模式
            -------------------------------------------------------------------------------------------------
            DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
            当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
            例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
            -------------------------------------------------------------------------------------------------
            处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
            如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa,
            也不会转发dog.123,只会转发test。
            -------------------------------------------------------------------------------------------------
            Direct 消息流转过程:
            1.生产者向Exchange发送消息。
            2.队列使用路由密钥绑定到Exchange。
            3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
            4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
            5.订阅队列的使用者接收消息并进行处理。
            -------------------------------------------------------------------------------------------------
            一个生产者,一个交换机,两个队列,两个消费者
            特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
        c.fanout发布订阅,扇形交换机,不依赖routingkey
            把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
            -------------------------------------------------------------------------------------------------
            FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,
            在这种策略中,routingkey 将不起任何作用
            -------------------------------------------------------------------------------------------------
            不处理路由键。你只需要简单的将队列绑定到交换机上。
            一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
            很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
            -------------------------------------------------------------------------------------------------
            一个生产者,一个交换机,三个队列,三个消费者
            特点:发布与订阅模式,是一种广播机制,它是没有路由key的模式。
        d.topic主题模式,主题交换机,依赖routingkey
            通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
            -------------------------------------------------------------------------------------------------
            星号井号代表通配符
            星号代表多个单词,井号代表一个单词
            路由功能添加模糊匹配
            消息产生者产生消息,把消息交给交换机
            交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费 
            -------------------------------------------------------------------------------------------------
            消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
            根据业务功能定义路由字符串
            从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
            业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
            -------------------------------------------------------------------------------------------------
            将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。
            因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs
            -------------------------------------------------------------------------------------------------
            Topic 中的路由键设置规则:
            1.Topic Exchange中的路由关键字必须包含零个或多个由 . 点分隔的单词,例如health.education。
            2.Topic Exchange中的路由键通常称为路由模式。
            3.路由键允许只包含 星号(*)和 井号 (#)的正则表达式组成
            4.星号(*****)表示正好允许一个字。
            5.同样,井号(#)表示允许的单词数为零或更多。
            6.点号(.)表示–单词定界符。多个关键术语用点定界符分隔。
            7.如果路由模式为**health.\***,则意味着以第一个单词为首的路由键运行状况发送的任何消息都将到达队列。例如,health.education将到达此队列,但sports.health将不起作用。
            -------------------------------------------------------------------------------------------------
            Topic 中的消息流转过程:
            1.一个Queue队列通过路由键(P)绑定到 Exchange。
            2.Producer 将带有P路由键(K)的消息发送到 Topic Exchange。
            3.如果P与K匹配,则消息被传递到队列。路由密钥匹配的确定如下所述。
            4.订阅队列的使用者将收到消息。
            -------------------------------------------------------------------------------------------------
            一个生产者,一个交换机,两个队列,两个消费者
            特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
            生产者创建Topic的exchange并且绑定到队列中,绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx 去别写。* -》代表xxx,# -》代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底时什么
            简单的说:* 表示一个,# 表示 0个 或者 多个
        e.headers x-mactch模式,头交换机,不依赖routingkey
            不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
            -------------------------------------------------------------------------------------------------
            Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。
            Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,
            接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。
            匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。
            all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。
            fanout,direct,topic exchange的routingKey都需要要字符串形式的,
            而headers exchange则没有这个要求,因为键值对的值可以是任何类型
            -------------------------------------------------------------------------------------------------
            Headers 中消息流转过程:
            1.一个或多个队列使用标头属性(H)绑定(链接)到标头交换。
            2.生产者将带有标头属性(MH)的消息发送到此Exchange。
            3.如果MH与H匹配,则消息将转发到队列。
            4.监听队列的使用者接收消息并对其进行处理。

03.应用场景
    01.第1种:单条消息过期
        体现(生产者)
        // 设置消息的过期时间为10000毫秒(10秒)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .expiration("10000") // 设置过期时间
            .build();
        channel.basicPublish("", "simple-queue1", props, message.getBytes());
    02.第2种:队列消息过期
        体现(生产者)
        // 设置消息过期时间为 10000 毫秒(10秒)
        channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});
    03.第3种:特殊情况
        还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
        这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
        是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)
    04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
        消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
        消息过期
        队列达到最大长度
        -----------------------------------------------------------------------------------------------------
        步骤如下:
        1: 创建连接工厂,设置连接属性
        2: 从连接工厂中获取
        3: 从连接中打开通道channel
        4.1: 声明死信交换机、死信队列、绑定死信队列和死信交换机
        4.2: 声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
        5: 通过channel发送消息
        -----------------------------------------------------------------------------------------------------
        体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机                       重
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key        重
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定
    05.第5种:rabbitmq_delayed_message_exchange 插件 实现延迟队列
        // 4: 声明延迟交换机
        channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);      重
        channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
        channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

        // 5: 准备发送消息的内容
        String message = "你好,消息队列!!! 5000毫秒";
        // 设置延迟时间(例如,5000毫秒)
        int delay = 5000;
        // 6: 发送消息给延迟交换机
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                .build();
        channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
        System.out.println("消息发送成功!");
    06.第6种:发送可靠性?确认机制、事务 / 消费可靠性?确认机制、幂等性
        a.方式1:通过AMQP提供的事务机制实现
            -- 开启事务
            channel.txSelect();
            -- 提交事务
            channel.txCommit();
            -- 回滚事务
            channel.txRollback();
        b.方式2:消息的发送者确认模式
            a.说明
                消息的发送者确认模式的使用和事务类似,也是通过channel进行发送确认的,
                但该模式和事务有着本质的区别就是发送消息丢失的时候不会像事务一样停止发送消息,
                而是补发消息直到消息发送到对方确认为止。
            b.三种实现方式
                a.方法1
                    channel.waitForConfirms()普通发送消息确认模式
                    该模式是不断的等待接收方来返回消息,如果有返回消息则说明消息发送成功
                    -----------------------------------------------------------------------------------------
                    boolean flag= channel.waitForConfirms();会堵塞线程等带服务器来返回确认消息。
                    可以为这个函数指定一个毫秒值用于等待服务器的确认超时时间。
                    如果抛出异常表示服务器出了问题,需要补发消息。
                    无论是返回false还是抛出异常都有可能消息发送成功或者没有发送成功。
                    补发消息可以将消息缓存到Redis中稍后使用定时任务来补发,或者使用递归的方法来补发
                b.方法2
                    channel.waitForConfirmsOrDie()函数批量确认模式
                    -----------------------------------------------------------------------------------------
                    channel.waitForConfirmsOrDie();
                    该函数会同时向服务中确认之前当前通道中发送消息是否已经全部写入成功,该函数没有返回值,
                    如果服务器中没有一条消息能够发送成功或者向服务器发送确认时服务不可访问都被认定为消息发送失败。
                    可能消息发送成功,也可能消息没发送成功
                    -----------------------------------------------------------------------------------------
                    channel.waitForConfirmsOrDie();
                    也可以指定一个毫秒值来用于等带服务器的确认时间,如果超过这个时间就抛出异常,表示确认失败需要补发
                    -----------------------------------------------------------------------------------------
                    注意:
                    批量确认消息比普通确认要快,但是如果一但出现了消息补发的情况,就不能确定是哪条消息需要补发,
                    所以就会将本次发送的所有消息进行补发。
                c.方法3
                    channel.addConfirmListener()异步监听发送确认模式。需要new ConfirmListener()来实现里面的回调函数
                    public void handleAck(long l, boolean b) throws IOException
                    public void handleNack(long l, boolean b) throws IOException 
    07.第7种:Stream流
        a.说明
            a.概念
                abbitMQ 从 v3.9 版本开始引入了一个名为 RabbitMQ Streams 的新功能
                它允许消息以高吞吐量和低延迟的方式通过流的形式进行传递
            b.特点
                RabbitMQ Streams 专为处理大量数据和实时数据流设计,它与传统的 RabbitMQ 消息队列模型有所不同,主要特点包括:
                1.高吞吐量:RabbitMQ Streams 可以处理数百万条消息的高并发流式数据传输,适用于实时分析和大数据处理场景。
                2.持久化流:消息可以以流的形式持久化,这意味着即使消费者处理较慢,数据也不会丢失。
                3.多消费者读取:同一条消息可以被多个消费者读取,而不会影响消息的传递和持久化。
                4.按消息索引读取:消费者可以根据消息的索引选择从特定位置开始读取消息,而不像传统的队列那样必须按顺序处理。
                5.消费分区:类似于 Kafka,RabbitMQ Streams 也支持分区机制,帮助提高吞吐量和并行性。
        b.代码实现
            a.准备
                a.安装RabbitMQ并启用stream插件
                    rabbitmq-plugins enable rabbitmq_stream
                b.依赖
                    <dependency>
                        <groupId>com.rabbitmq</groupId>
                        <artifactId>stream-client</artifactId>
                        <version>0.5.1</version>
                    </dependency>
                c.在RabbitMQ中创建Stream队列
                    rabbitmqadmin declare queue name=my-stream durable=true type=stream

3.2 汇总

01.基本概念
    a.核心组件
        Producer(生产者): 发送消息到 RabbitMQ。
        Exchange(交换机): 接收来自生产者的消息,并根据路由规则将其路由到一个或多个队列。
        Queue(队列): 存储消息,直到被消费者消费。
        Binding(绑定): 定义交换机与队列之间的路由规则。
        Consumer(消费者): 从队列中接收并处理消息。
    b.消息类型与数据格式
        RabbitMQ 本身不限制消息的内容类型,支持任何二进制数据。常见的数据类型和示例包括:
        a.文本消息: JSON、XML、纯文本
            示例: {"user":"Alice","action":"login"}
        b.二进制数据: 图片、视频、文件
            示例: JPEG 图片文件的字节流
        c.序列化对象: 使用 Protocol Buffers、Thrift 等序列化的对象
            示例: Java 对象序列化后的字节数组

02.消息传递模式
    a.一对一(点对点)
        描述: 一个生产者发送消息到一个队列,一个消费者从该队列中消费消息。
        应用场景: 任务队列,如处理订单、日志记录。
    b.一对多(发布/订阅)
        描述: 一个生产者发送消息到交换机,交换机将消息路由到多个绑定的队列,多个消费者各自从队列中消费消息。
        应用场景: 广播消息,如实时通知、多租户消息分发。
    c.主题(Topic)
        描述: 基于路由键的模式匹配,将消息路由到符合主题模式的队列。
        应用场景: 日志系统中按模块或级别分发日志。
    d.路由(Direct)
        描述: 交换机根据精确匹配的路由键将消息路由到队列。
        应用场景: 按类型或关键字分发任务。
    e.扇出(Fanout)
        描述: 交换机将收到的消息广播到所有绑定的队列,无视路由键。
        应用场景: 广播通知、实时更新。

03.消息传递机制
    a.流(Stream)与批量发送
        流式传输: 持续发送和接收消息,适用于实时数据处理。
        批量发送: 将多个消息打包成一个批次发送,提高吞吐量。
        示例: 使用 basic.publish 批量发送多个消息到队列。
    b.消息延迟
        描述: 延迟一定时间后再将消息投递到队列。
        实现方式:使用 TTL(Time-To-Live) 设置消息的过期时间,并结合 死信交换机 实现延迟队列。
        应用场景: 定时任务、延迟重试。

04.生产者特性
    a.消息类型
        a.同步消息: 生产者在发送消息后等待 RabbitMQ 的确认(ACK)。
            优点: 确保消息已被接收。
            缺点: 性能较低。
        b.异步消息: 生产者发送消息后不等待确认,提升吞吐量。
            适用场景: 高性能要求下的消息传递。
        c.单向消息: 生产者发送消息后不关心结果,不进行确认。
            适用场景: 不关心消息是否被处理的日志记录。
    b.消息发布确认(Publisher Confirms)
        描述: 生产者在发送消息后接收 RabbitMQ 的确认,确保消息已被路由到队列。
        应用场景: 需要确保消息不丢失的场景。
    c.消息发布返回(Return Listener)
        描述: 当消息无法路由到任何队列时,生产者接收回退消息。
        应用场景: 处理路由失败的消息,如错误日志。

05.消费者特性
    a.消费模式
        a.Push消费者
            描述: RabbitMQ 将消息主动推送给消费者。
            实现方式: 消费者通过 basic.consume 订阅队列,RabbitMQ 将消息推送过来。
            优点: 实时性高。
            缺点: 需要处理好消息的并发和流量控制。
        b.Pull消费者
            描述: 消费者主动从队列中拉取消息。
            实现方式: 使用 basic.get 或 basic.consume 配合预取计数(prefetch count)控制。
            优点: 消费者可以控制消费速率。
            缺点: 实现复杂度较高,实时性稍差。
    b.消费者类型
        a.队列绑定与标签订阅
            描述: 消费者通过绑定不同的队列和路由键实现基于标签的消息订阅。
            示例: 使用不同的路由键将消息发送到特定队列,消费者订阅对应队列。
        b.消息模型
            a.集群模式(Clustered)
                描述: 多个 RabbitMQ 节点组成集群,共享队列和交换机。
                优点: 提高可用性和扩展性。
                配置方式: 使用镜像队列(Mirrored Queues)保证数据在集群中的高可用性。
            b.广播模式(Broadcast)
                描述: 类似于发布/订阅,一条消息被广播到所有绑定的队列。
                实现方式: 使用 Fanout Exchange。
    c.监听器与并发消费
        描述: 消费者使用监听器(Listener)接收并处理消息,可以并发处理多条消息以提高吞吐量。
        实现方式: 配置消费者线程数,使用多线程或异步处理机制。
    d.消费类型
        a.迭代消费
            描述: 消费者逐条处理队列中的消息。
            适用场景: 消息处理需要顺序或依赖上一个消息的结果。
        b.一次性消费
            描述: 消费者一次性消费多个消息,提高效率。
            实现方式: 设置预取计数(prefetch count),批量处理消息。
    e.常见 API
        基本操作: basic.publish, basic.consume, basic.ack, basic.nack
        高级操作: 事务管理(事务机制)、确认机制(Publisher Confirms)

06.事务机制
    a.事务模式
        描述: 将一系列消息操作作为一个原子事务执行,确保消息的可靠性。
        实现方式: 使用 tx.select, tx.commit, tx.rollback 命令。
        缺点: 性能较低,不推荐在高吞吐量场景中使用。
    b.发布确认(推荐)
        描述: 生产者在发送消息后接收确认,确保消息已被 RabbitMQ 接收和路由。
        优点: 性能较高,易于使用。
        推荐使用场景: 需要可靠消息传递但对事务的复杂性要求不高的场景。

07.消息顺序与批量发送
    a.消息局部顺序消费
        问题: 保证某一类消息的顺序被消费者严格按照生产顺序消费。
        解决方案:
        生产者: 使用单一生产者线程或算法确保消息按序发送。
        消费者: 使用单一消费者实例或配置消费者按顺序处理消息,避免并发处理导致顺序错乱。
    b.批量发送消息
        描述: 生产者将多条消息封装到一个批次中一次性发送,减少网络开销,提高发送效率。
        实现方式: 利用 RabbitMQ 客户端的批量发布功能,如使用 basic.publish 循环发送多条消息或使用事务/确认机制批量确认。

08.高级特性
    a.高可用性与集群
        集群模式: 多个 RabbitMQ 节点组成集群,共享交换机和队列。
        镜像队列: 将队列复制到多个节点,确保节点故障时消息不丢失。
    b.消息持久化
        描述: 将消息存储到磁盘,确保在 RabbitMQ 重启后消息不丢失。
        实现方式: 设置队列和消息的持久化属性 (durable, persistent).
    c.消息确认与重试
        消息确认: 消费者处理完消息后发送 ACK,确保消息被成功消费。
        消息拒绝与重回队列: 消费者可以拒绝消息(NACK),消息可以被重新投递到队列中。
        死信队列(Dead Letter Queue): 处理被拒绝或过期的消息。
    d.安全性
        身份验证: 使用用户名和密码进行身份验证。
        权限控制: 定义用户对交换机、队列的访问权限。
        加密: 支持 TLS/SSL 加密通信,确保消息传输安全。
    e.管理与监控
        管理插件: 提供 Web 管理界面,监控队列、交换机、连接等。
        监控指标: 消息吞吐量、队列长度、消费者状态等。
    f.插件扩展
        常用插件:
        Shovel: 跨 RabbitMQ 实例复制消息。
        Federation: 跨数据中心分发消息。
        RabbitMQ Streams: 支持高吞吐量的流式消息处理。

09.RabbitMQ 与其他中间件的对比
    a.RabbitMQ
        强大的路由能力,支持多种交换机类型。
        支持丰富的消息确认和事务机制。
        适用于复杂的消息传递场景,如 RPC、实时通知。
    b.Kafka
        高吞吐量,适合大数据流处理。
        基于日志的持久化,消息顺序保证。
        更适合事件源和流处理应用。
    c.RocketMQ
        支持顺序消息、定时消息等特性。
        分布式架构,适合电商、金融等场景。

3.3 admin

01.rabbitmqadmin
    a.介绍
        我们自己平时做练习,一般都会开启 RabbitMQ 的 Web 管理页面,然而在生产环境下,经常是没有 Web 管理页面的,只能通过 CLI 命令去管理 MQ。
        其实呀,Web 管理页面虽然友好,但是很多时候没有 CLI 快捷,而且通过 CLI 命令行的操作,我们可以做更多的定制,例如将关键信息查出来后提供给集中的监控系统以触发报警。
        直接操作 CLI 命令行有点麻烦,RabbitMQ 提供了 CLI 管理工具 rabbitmqadmin ,其实就是基于 RabbitMQ 的 HTTP API,用 Python 写的一个脚本。因为 REST API 手动写请求还是挺麻烦的,这些脚本刚好替我们简化了这个操作,让这个事情变得更加简单了。
    b.安装
        如果我们创建 RabbitMQ 容器的时候使用的是 rabbitmq:3-management 镜像,那么默认情况下,rabbitmqadmin 就是安装好的。
        首先确认你的设备上安装了 Python,这是最基本的,因为 rabbitmqadmin 这个工具就是 Python 脚本。
        然后开启 RabbitMQ 的 Web 管理页面,然后输入如下地址(我的管理页面度那口映射为 25672):
        http://localhost:25672/cli/index.html
        -----------------------------------------------------------------------------------------------------
        在打开的页面中就可以看到 rabbitmqadmin 的下载链接。将 rabbitmqadmin 下载下来后,然后赋予其可执行权限即可:
        chmod +x rabbitmqadmin
        -----------------------------------------------------------------------------------------------------
        下载后的 rabbitmqadmin 我们可以直接用记事本打开,里边其实就是一堆 Python 脚本。
    c.总结
        这套流程操作下来还是挺麻烦的,所以,我建议大家直接使用 rabbitmq:3-management 镜像,一步到位。

02.功能
    列出 exchanges, queues, bindings, vhosts, users, permissions, connections and channels。
    创建和删除 exchanges, queues, bindings, vhosts, users and permissions。
    发布和获取消息,以及消息详情。
    关闭连接和清空队列。
    导入导出配置。

03.列出各种信息
    a.查看所有交换机
        python rabbitmqadmin list exchanges
    b.查看所有队列
        python rabbitmqadmin list queues
    c.查看所有 Binding
        python rabbitmqadmin list bindings
    d.查看所有虚拟主机
        python rabbitmqadmin list vhosts
    e.查看所有用户信息
        python rabbitmqadmin list users
    f.查看所有权限信息
        python rabbitmqadmin list permissions
    g.查看所有连接信息
        python rabbitmqadmin list connections
    h.查看所有通道信息
        python rabbitmqadmin list channels

04.一个完整的例子
    a.步骤1
        a.创建一个名为 javaboy-exchange 的交换机
            python rabbitmqadmin declare exchange name=javaboy-exchange durable=true auto_delete=false type=direct
            -------------------------------------------------------------------------------------------------
            declare exchange:这是创建一个交换机的命令。
            name=javaboy-exchange:指定交换机的名称为 javaboy-exchange。
            durable=true:表示这个交换机会在 RabbitMQ 服务器重启后仍然存在(持久化)。
            auto_delete=false:设置为 false 表示当没有队列绑定到该交换机时,交换机不会自动删除。
            type=direct:指定交换机的类型为 direct,即交换机会根据消息的 routing_key 精确匹配到绑定的队列。
        b.查看交换机
            python rabbitmqadmin list exchanges
    b.步骤2
        a.创建一个名为 javaboy-queue 的队列
            python rabbitmqadmin declare queue name=javaboy-queue durable=true auto_delete=false
            -------------------------------------------------------------------------------------------------
            declare queue:这是创建一个队列的命令。
            name=javaboy-queue:指定队列的名称为 javaboy-queue。
            durable=true:表示该队列会在 RabbitMQ 服务器重启后仍然存在(持久化队列)。
            auto_delete=false:设置为 false,表示当队列不再被使用时,不会自动删除
        b.查看队列
            python rabbitmqadmin list queues
    c.步骤3
        a.创建一个 Binding,将交换机和消息队列绑定起来
            python rabbitmqadmin declare binding source=javaboy-exchange destination=javaboy-queue routing_key=javaboy-routing
            -------------------------------------------------------------------------------------------------
            declare binding:这是为交换机和队列创建绑定关系的命令。
            source=javaboy-exchange:指定绑定的源头是交换机 javaboy-exchange。
            destination=javaboy-queue:指定绑定的目的地是队列 javaboy-queue。
            routing_key=javaboy-routing:指定绑定时的路由键为 javaboy-routing,表示只有携带这个路由键的消息会被交换机发送到这个队列中。
        b.查看绑定关系
            python rabbitmqadmin list bindings
    d.步骤4
        a.发布一条消息
            python rabbitmqadmin publish routing_key=javaboy-queue payload="hello javaboy"
            -------------------------------------------------------------------------------------------------
            publish:这是向队列或交换机发送消息的命令。
            routing_key=javaboy-queue:指定发送消息的路由键为 javaboy-queue,这意味着消息会根据绑定关系发送到与此路由键匹配的队列中。
            payload="hello javaboy":指定消息的内容为 "hello javaboy"。
        b.查看消息,不消费
            python rabbitmqadmin get queue=javaboy-queue
            -------------------------------------------------------------------------------------------------
            get:这是从队列中获取消息的命令。
            queue=javaboy-queue:指定从队列 javaboy-queue 中查看消息。
            默认情况下,get 命令不会消费消息(即不会从队列中移除消息),只会返回消息的内容以供查看。
        c.查看消息,并消费
            python rabbitmqadmin get queue=javaboy-queue requeue=false
        d.删除队列中的所有消息
            python rabbitmqadmin purge queue name=javaboy-queue
            -------------------------------------------------------------------------------------------------
            purge queue:这是清空队列中所有消息的命令。
            name=javaboy-queue:指定要清空的队列是 javaboy-queue。
    e.附说明
        a.查看 交换机、队列、绑定
            打开 RabbitMQ 管理界面,地址为 http://localhost:15672。
            登录后,点击左侧导航中的 Queues 或 Exchanges。
            在 Queues 页面,选择你感兴趣的队列,进入详情页后,可以看到与该队列绑定的所有交换机和路由键。
            在 Exchanges 页面,选择对应的交换机,同样可以查看其绑定的队列和路由键信息。
        b.查看 消费消息
            打开 RabbitMQ 管理界面 http://localhost:15672。
            登录后,点击左侧导航栏中的 Queues。
            找到 javaboy-queue 队列并点击它。
            在队列的详情页中,找到 Get messages 部分。
            选择你要消费的消息数量,点击 Get Message(s) 按钮。可以选择是否移除消息(即是否 ack 消息),如果选择 Auto-ack,消息将会被移除。

05.命令汇总
    python rabbitmqadmin list users                                                                             | 查看所有用户 User
    python rabbitmqadmin list users name                                                                        | 查看所有用户名 Username
    python rabbitmqadmin list users tags                                                                        | 查看所有用户角色
    python rabbitmqadmin list vhosts                                                                            | 查看所有虚拟主机
    python rabbitmqadmin list connections                                                                       | 查看所有连接
    python rabbitmqadmin list exchanges                                                                         | 查看所有路由 Exchange
    python rabbitmqadmin list bindings                                                                          | 查看所有路由与队列的关系绑定 Binding
    python rabbitmqadmin list permissions                                                                       | 查看所有角色的权限 Permission
    python rabbitmqadmin list channels                                                                          | 查看所有通道 Channel
    python rabbitmqadmin list consumers                                                                         | 查看所有消费者 Consumer
    python rabbitmqadmin list queues                                                                            | 查看所有消息队列 Queue
    python rabbitmqadmin list nodes                                                                             | 查看所有节点 Node
    python rabbitmqadmin show overview                                                                          | 概览 Overview
    python rabbitmqadmin list bindings source destination_type destination properties_key                       | 查看所有路由与队列的关系绑定的详细信息 Binding
    python rabbitmqadmin declare queue name=test durable=true                                                   | 定义一个队列queue,durable=true代表持久化打开。
    python rabbitmqadmin declare exchange name=my.fanout type=fanout                                            | 定义一个Fanout路由
    python rabbitmqadmin declare exchange name=my.direct type=direct                                            | 定义一个Direct路由
    python rabbitmqadmin declare exchange name=my.topic type=topic                                              | 定义一个Topic路由
    python rabbitmqadmin declare binding source=my.fanout destination=test routing_key=first                    | 定义 binding
    python rabbitmqadmin publish routing_key=test payload="hello world"                                         | 发布一条消息
    python rabbitmqadmin publish routing_key=my.test exchange=my.topic payload="hello world"                    | 使用路由转发消息
    python rabbitmqadmin get queue=test requeue=true                                                            | 查看消息,不消费
    python rabbitmqadmin get queue=test requeue=false                                                           | 查看消息,并消费
    python rabbitmqadmin purge queue name=test                                                                  | 删除队列中的所有消息
    python rabbitmqadmin delete queue name=hello                                                                | 删除消息队列 Queue
    python rabbitmqadmin delete user name=test                                                                  | 删除用户 User
    python rabbitmqadmin delete exchange name=test                                                              | 删除路由器 Exchange
    python rabbitmqadmin delete binding source='kk' destination_type=queue destination=test properties_key=test | 删除路由器与消息队列的关系绑定 Binding
    python rabbitmqadmin -f raw_json list users                                                                 | raw_json 格式化输出
    python rabbitmqadmin -f long list users                                                                     | 格式化输出
    python rabbitmqadmin -f pretty_json list users                                                              | pretty_json 格式化输出
    python rabbitmqadmin -f kvp list users                                                                      | 格式化输出
    python rabbitmqadmin -f tsv list users                                                                      | 格式化输出
    python rabbitmqadmin -f table list users                                                                    | table 格式化输出
    python rabbitmqadmin -f bash list users                                                                     | bash 格式化输出

3.4 restapi

01.开启REST API
    a.概念
        REST(Representational State Transfer)是一种 Web 软件架构风格,它是一种风格,而不是标准,匹配或兼容这种架构风格的的网络服务称为 REST 服务。
        REST 服务简洁并且有层次,它通常基于 HTTP、URI、XML 以及 HTML 这些现有的广泛流行的协议和标准。在 REST 中,资源是由 URI 来指定,对资源的增删改查操作可以通过 HTTP 协议提供的 GET、POST、PUT、DELETE 等方法实现。
    b.开启 Web 管理页面
        a.方式1:安装 RabbitMQ 的时候,直接选择 rabbitmq:3-management 镜像,安装命令如下:
            docker run -d --rm --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
            这样安装好的 RabbitMQ 就可以直接使用 Web 管理页面了。
        b.方式2:安装的时候就选择正常的普通镜像 rabbitmq:3,安装命令如下:
            docker run -d --hostname my-rabbit --name some-rabbit2 -p 5673:5672 -p 25672:15672 rabbitmq:3
            这个安装好之后,需要我们进入到容器中,然后手动开启 Web 管理插件,命令如下:
            ---------------------------------------------------------------------------------------------
            docker exec -it some-rabbit2 /bin/bash
            rabbitmq-plugins enable rabbitmq_management
            -------------------------------------------------------------------------------------------------
            第一条命令是进入到容器中,第二条命令开启 Web 管理插件

02.操作
    a.查看虚拟主机 myvh 下 hello-queue 队列的数据统计
        curl -i -u guest:guest http://localhost:15672/api/queues/myvh/hello-queue
    b.在 /myvh 虚拟主机下创建一个名为 javaboy-queue 的队列
        curl -i -u guest:guest -XPUT -H "Content-Type:application/json" -d '{"auto_delete":false,"durable":true}' http://localhost:15672/api/queues/myvh/javaboy-queue
    c.查看当前连接信息
        curl -i -u guest:guest http://localhost:15672/api/connections
    d.查看当前用户信息
        curl -i -u guest:guest http://localhost:15672/api/users
    e.建一个名为 zhangsan,密码是 123 ,角色是 administrator 的用户
        curl -i -u guest:guest -H "{Content-Type:application/json}" -d '{"password":"123","tags":"administrator"}' -XPUT http://localhost:15672/api/users/zhangsan
    f.将名为 zhangsan 的用户设置到名为 myvh 的 vhost 下
        curl -i -u guest:guest -H "{Content-Type:application/json}" -d '{"configure":".*","write":".*","read":".*"}' -XPUT http://localhost:15672/api/permissions/myvh/zhangsan

3.5 消费模式

00.消费模式
    一个队列中的每条消息只会被一个消费者消费。
    但同一个队列可以有多个消费者,消息会在这些消费者之间轮询分发。
    a.Simple简单队列:最简单的收发模式
        一个生产者,一个消费者,一个队列(一个默认的交换机)
        -------------------------------------------------------------------------------------------------
        消息产生消息,将消息放入队列
        消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,
        自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,
        这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
    b.Work工作队列:资源的竞争
        方式1:轮询模式
        方式2:公平分发(能者多劳)
        -----------------------------------------------------------------------------------------------------
        一个生产者,两个消费者,一个队列(一个默认的交换机)
        -----------------------------------------------------------------------------------------------------
        消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。
        C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息
        (隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,
        可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
    c.publish/subscribe发布订阅:共享资源
        a.背景
            不管是简单队列模型还是工作队列模式只用到了 RabbitMQ 中的 Queue 队列,
            消息直接发送到队列中,消费者直接从队列中获取消息。
            -------------------------------------------------------------------------------------------------
            现在我们学习另一种消息的处理方式,消息生产者将消息发送到 Exchange 交换机中,
            再由交换机将消息投递到 Queue队列中,交换机和队列的关系是多对多的关系,
            表示一个交换机可以通过一定的规矩将消息投递到多个队列中,同样一个队列也可以接收多个交换机投递的消息
            -------------------------------------------------------------------------------------------------
            一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,
            每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
            需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,
            这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,
            -------------------------------------------------------------------------------------------------
            每个消费者监听自己的队列;
            生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
            -------------------------------------------------------------------------------------------------
            Direct 消息流转过程:
            1.生产者向Exchange发送消息。
            2.队列使用路由密钥绑定到Exchange。
            3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
            4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
            5.订阅队列的使用者接收消息并进行处理。
        b.direct路由模式,直连交换机,依赖routingkey
            消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中
            基于完全匹配、单播的模式
            -------------------------------------------------------------------------------------------------
            DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
            当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
            例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
            -------------------------------------------------------------------------------------------------
            处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
            如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa,
            也不会转发dog.123,只会转发test。
            -------------------------------------------------------------------------------------------------
            Direct 消息流转过程:
            1.生产者向Exchange发送消息。
            2.队列使用路由密钥绑定到Exchange。
            3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
            4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
            5.订阅队列的使用者接收消息并进行处理。
            -------------------------------------------------------------------------------------------------
            一个生产者,一个交换机,两个队列,两个消费者
            特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
        c.fanout发布订阅,扇形交换机,不依赖routingkey
            把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
            -------------------------------------------------------------------------------------------------
            FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,
            在这种策略中,routingkey 将不起任何作用
            -------------------------------------------------------------------------------------------------
            不处理路由键。你只需要简单的将队列绑定到交换机上。
            一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
            很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
            -------------------------------------------------------------------------------------------------
            一个生产者,一个交换机,三个队列,三个消费者
            特点:发布与订阅模式,是一种广播机制,它是没有路由key的模式。
        d.topic主题模式,主题交换机,依赖routingkey
            通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
            -------------------------------------------------------------------------------------------------
            星号井号代表通配符
            星号代表多个单词,井号代表一个单词
            路由功能添加模糊匹配
            消息产生者产生消息,把消息交给交换机
            交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费 
            -------------------------------------------------------------------------------------------------
            消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
            根据业务功能定义路由字符串
            从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
            业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
            -------------------------------------------------------------------------------------------------
            将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。
            因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs
            -------------------------------------------------------------------------------------------------
            Topic 中的路由键设置规则:
            1.Topic Exchange中的路由关键字必须包含零个或多个由 . 点分隔的单词,例如health.education。
            2.Topic Exchange中的路由键通常称为路由模式。
            3.路由键允许只包含 星号(*)和 井号 (#)的正则表达式组成
            4.星号(*****)表示正好允许一个字。
            5.同样,井号(#)表示允许的单词数为零或更多。
            6.点号(.)表示–单词定界符。多个关键术语用点定界符分隔。
            7.如果路由模式为**health.\***,则意味着以第一个单词为首的路由键运行状况发送的任何消息都将到达队列。例如,health.education将到达此队列,但sports.health将不起作用。
            -------------------------------------------------------------------------------------------------
            Topic 中的消息流转过程:
            1.一个Queue队列通过路由键(P)绑定到 Exchange。
            2.Producer 将带有P路由键(K)的消息发送到 Topic Exchange。
            3.如果P与K匹配,则消息被传递到队列。路由密钥匹配的确定如下所述。
            4.订阅队列的使用者将收到消息。
            -------------------------------------------------------------------------------------------------
            一个生产者,一个交换机,两个队列,两个消费者
            特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
            生产者创建Topic的exchange并且绑定到队列中,绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx 去别写。* -》代表xxx,# -》代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底时什么
            简单的说:* 表示一个,# 表示 0个 或者 多个
        e.headers x-mactch模式,头交换机,不依赖routingkey
            不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
            -------------------------------------------------------------------------------------------------
            Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。
            Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,
            接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。
            匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。
            all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。
            fanout,direct,topic exchange的routingKey都需要要字符串形式的,
            而headers exchange则没有这个要求,因为键值对的值可以是任何类型
            -------------------------------------------------------------------------------------------------
            Headers 中消息流转过程:
            1.一个或多个队列使用标头属性(H)绑定(链接)到标头交换。
            2.生产者将带有标头属性(MH)的消息发送到此Exchange。
            3.如果MH与H匹配,则消息将转发到队列。
            4.监听队列的使用者接收消息并对其进行处理。

01.Simple简单队列
    a.概念
        如果把使用 RabbitMQ 进行消息发送的过程比喻成邮寄邮件。
        那么简单队列的场景是,只有一个邮箱、一个邮局、一个投递员,消息通过 RabbitMQ 进行一对一发送,发送过程最简单。
        -------------------------------------------------------------------------------------------------
        一个生产者,一个消费者,一个队列(一个默认的交换机)
        -------------------------------------------------------------------------------------------------
        消息产生消息,将消息放入队列
        消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,
        自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,
        这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
    b.一个生产者,一个消费者,一个队列(一个默认的交换机)
        a.生产者
            package com.ruoyi.rabbitmq1.demo01;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    channel.queueDeclare("simple-queue1", false, false, false, null);
                    // 5: 准备发送消息的内容
                    String message = "你好,消息队列!!!";
                    // 6: 发送消息给队列queue1
                    /*
                     * @params1: 交换机exchange
                     * @params2: 队列名称、路由key(routing)
                     * @params3: 属性配置
                     * @params4: 发送消息的内容
                     **/
                    // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
                    channel.basicPublish("", "simple-queue1", null, message.getBytes());
                    System.out.println("消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo01;
            
            import com.rabbitmq.client.*;
            import java.io.IOException;
            
            public class Consumer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    /*
                     * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                     * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                     */
                    try (Connection connection = connectionFactory.newConnection("消费者");
                         Channel channel = connection.createChannel()){
            
                        // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
                        //channel.queueDeclare("queue1", false, false, false, null);
            
                        // 接收消息,监听对应的队列名即可
                        /*
                         *  @params1: queue 队列的名称
                         *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                         *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                         *  @params4: cancelCallback 消费失败回调
                         * */
                        channel.basicConsume("simple-queue1", true, new DeliverCallback() {
                            @Override
                            public void handle(String consumerTag, Delivery delivery) throws IOException {
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                            }
                        }, new CancelCallback() {
                            @Override
                            public void handle(String s)  {
                                System.out.println("接受失败了...");
                            }
                        });
            
                        // 让程序停止,好接收消费
                        System.out.println("开始接受消息");
                        System.in.read();
                    }
            
                }
            }
        c.运行
            先运行消费者,然后运行生产者。最后消费者控制台输出:
            开始接受消息
            收到消息是你好,消息队列!!!

02.Work工作队列
    a.概念
        在前面我们学了Simple Queue 模型,Simple Queue 模型消息生产者和消费者是一 一对应的,
        消息由生产者发送到队列并由消费者消费。假设有一种消息,消息生产者一次性发送了10条消息,
        消费者消费一条消息执行耗时1秒,如果使用简单队列模式总共耗时10秒;
        如果使用工作队列模式有3个消费者共同消费这些消息理想情况下只需要3秒就可以处理完这些消息。
        -----------------------------------------------------------------------------------------------------
        方式1:轮询模式
        方式2:公平分发(能者多劳)
        -----------------------------------------------------------------------------------------------------
        一个生产者,两个消费者,一个队列(一个默认的交换机)
        -----------------------------------------------------------------------------------------------------
        消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。
        C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息
        (隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,
        可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
    b.轮询模式
        a.生产者
            package com.ruoyi.rabbitmq1.demo02.demo01;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    channel.queueDeclare("work-queue1", false, false, false, null);
            
                    // 5: 准备发送消息的内容
                    // 6: 发送消息给队列queue1
                    /*
                     * @params1: 交换机exchange
                     * @params2: 队列名称、路由key(routing)
                     * @params3: 属性配置
                     * @params4: 发送消息的内容
                     **/
                    for (int i = 1; i <= 20; i++) {
                        channel.basicPublish("", "work-queue1", null, ("work-轮询模式:"+ i).getBytes());
                    }
                    System.out.println("消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            a.消费者1
                package com.ruoyi.rabbitmq1.demo02.demo01;
                
                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;
                
                public class Consumer1 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");
                
                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者1");
                        Channel channel = connection.createChannel();
                
                        // 接收消息,监听对应的队列名即可
                        channel.basicConsume("work-queue1", true, (consumerTag, delivery) ->
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8")),
                                consumerTag  -> {
                        });
                 
                    }
                }
            b.消费者2
                package com.ruoyi.rabbitmq1.demo02.demo01;
                
                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;
                
                public class Consumer2 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");
                
                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者2");
                        Channel channel = connection.createChannel();
                
                        // 接收消息,监听对应的队列名即可
                        channel.basicConsume("work-queue1", true, (consumerTag, delivery) ->
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8")),
                                consumerTag  -> {
                        });
                
                    }
                }
        c.运行
            先运行消费者,然后运行生产者。最后消费者控制台输出:
            消费者1:
            收到消息是work-轮询模式:1
            收到消息是work-轮询模式:3
            收到消息是work-轮询模式:5
            收到消息是work-轮询模式:7
            收到消息是work-轮询模式:9
            收到消息是work-轮询模式:11
            收到消息是work-轮询模式:13
            收到消息是work-轮询模式:15
            收到消息是work-轮询模式:17
            收到消息是work-轮询模式:19
            消费者2:
            收到消息是work-轮询模式:2
            收到消息是work-轮询模式:4
            收到消息是work-轮询模式:6
            收到消息是work-轮询模式:8
            收到消息是work-轮询模式:10
            收到消息是work-轮询模式:12
            收到消息是work-轮询模式:14
            收到消息是work-轮询模式:16
            收到消息是work-轮询模式:18
            收到消息是work-轮询模式:20
    c.公平分发(能者多劳)
        a.生产者
            package com.ruoyi.rabbitmq1.demo02.demo02;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    channel.queueDeclare("work-queue2", false, false, false, null);
            
                    // 5: 准备发送消息的内容
                    // 6: 发送消息给队列queue1
                    /*
                     * @params1: 交换机exchange
                     * @params2: 队列名称、路由key(routing)
                     * @params3: 属性配置
                     * @params4: 发送消息的内容
                     **/
                    for (int i = 1; i <= 20; i++) {
                        channel.basicPublish("", "work-queue2", null, ("work-公平分发:"+ i).getBytes());
                    }
                    System.out.println("消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            a.消费者1
                package com.ruoyi.rabbitmq1.demo02.demo02;
                
                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;
                
                import com.rabbitmq.client.*;
                import java.util.concurrent.TimeUnit;
                
                public class Consumer1 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");
                
                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者1");
                        Channel channel = connection.createChannel();
                
                        // 同一时刻服务器只会发送一条消息给消费者
                        channel.basicQos(1);
                
                        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                            try {
                                // 加一个睡眠,模拟消费者1消费慢
                                TimeUnit.MILLISECONDS.sleep(1000);
                                System.out.println("Work1-收到消息是" + new String(delivery.getBody(), "UTF-8"));
                                // 确认消息消费,参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        };
                
                        // 接收消息,开启手动确认消费消息机制,设置autoAck为false,防止消息一下子都进入消费者
                        channel.basicConsume("work-queue2", false, deliverCallback,consumerTag  -> {
                        });
                
                    }
                }
            b.消费者2
                package com.ruoyi.rabbitmq1.demo02.demo02;
                
                import com.rabbitmq.client.Channel;
                import com.rabbitmq.client.Connection;
                import com.rabbitmq.client.ConnectionFactory;
                import com.rabbitmq.client.*;
                import java.util.concurrent.TimeUnit;
                
                public class Consumer2 {
                    public static void main(String[] args) throws Exception {
                        // 1: 创建连接工厂,设置连接属性
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("127.0.0.1");
                        connectionFactory.setPort(5672);
                        connectionFactory.setVirtualHost("/");
                        connectionFactory.setUsername("guest");
                        connectionFactory.setPassword("guest");
                
                        // 2: 创建连接,获取通道
                        Connection connection = connectionFactory.newConnection("消费者2");
                        Channel channel = connection.createChannel();
                
                        // 同一时刻服务器只会发送一条消息给消费者
                        channel.basicQos(1);
                
                        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                            try {
                                // 加一个睡眠,模拟消费者1消费慢
                                TimeUnit.MILLISECONDS.sleep(100);
                                System.out.println("Work2-收到消息是" + new String(delivery.getBody(), "UTF-8"));
                                // 确认消息消费,参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
                                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        };
                
                        // 接收消息,开启确认机制,设置autoAck为false
                        channel.basicConsume("work-queue2", false, deliverCallback, consumerTag  -> {
                        });
                
                    }
                }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            消费者1:
            Work1-收到消息是work-公平分发:1
            Work1-收到消息是work-公平分发:2
            Work1-收到消息是work-公平分发:3
            Work1-收到消息是work-公平分发:4
            Work1-收到消息是work-公平分发:5
            Work1-收到消息是work-公平分发:6
            Work1-收到消息是work-公平分发:7
            Work1-收到消息是work-公平分发:8
            Work1-收到消息是work-公平分发:9
            Work1-收到消息是work-公平分发:10
            Work1-收到消息是work-公平分发:14
            消费者2:
            Work2-收到消息是work-公平分发:11
            Work2-收到消息是work-公平分发:12
            Work2-收到消息是work-公平分发:13
            Work2-收到消息是work-公平分发:15
            Work2-收到消息是work-公平分发:16
            Work2-收到消息是work-公平分发:17
            Work2-收到消息是work-公平分发:18
            Work2-收到消息是work-公平分发:19
            Work2-收到消息是work-公平分发:20

03.direct路由模式,直连交换机,依赖routingkey
    a.概念
        消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中
        基于完全匹配、单播的模式
        -------------------------------------------------------------------------------------------------
        DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,
        当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,
        例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
        -------------------------------------------------------------------------------------------------
        处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
        如果一个队列绑定到该交换机上要求路由键 test,则只有被标记为test的消息才被转发,不会转发test.aaa,
        也不会转发dog.123,只会转发test。
        -------------------------------------------------------------------------------------------------
        Direct 消息流转过程:
        1.生产者向Exchange发送消息。
        2.队列使用路由密钥绑定到Exchange。
        3.通常,使用相同/不同的路由密钥有多个队列绑定到Exchange。
        4.发送到Exchange的消息包含路由密钥。根据路由密钥,消息将转发到一个或多个队列。
        5.订阅队列的使用者接收消息并进行处理。
        -------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,两个队列,两个消费者
        特点:Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo03;
            
            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();
            
                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
                    String exchangeName = "direct-exchange";
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机
                    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
                    // 4-2: 声明队列
                    channel.queueDeclare("direct-info", false, false, false, null);
                    channel.queueDeclare("direct-error", false, false, false, null);
                    // 4-3: 绑定交换机和队列
                    channel.queueBind("direct-info", exchangeName, "info");
                    channel.queueBind("direct-error", exchangeName, "error");
            
                    // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                    channel.basicPublish(exchangeName, "info", null, "你好,消息队列!".getBytes("UTF-8"));
                    channel.basicPublish(exchangeName, "error", null, "你好,消息队列!".getBytes("UTF-8"));
            
                    System.out.println("消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo03;
            
            import com.rabbitmq.client.CancelCallback;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import com.rabbitmq.client.DeliverCallback;
            import com.rabbitmq.client.Delivery;
            import java.io.IOException;
            
            public class Consumer {
            
                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
            
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();
            
                         // 接收消息,监听对应的队列名即可
                        /*
                         *  @params1: queue 队列的名称
                         *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                         *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                         *  @params4: cancelCallback 消费失败回调
                         * */
                        // 3:定义接受消息的回调
                        channel.basicConsume(queueName, true, new DeliverCallback() {
                            @Override
                            public void handle(String consumerTag, Delivery delivery) throws IOException {
                                System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                            }
                        }, new CancelCallback() {
                            @Override
                            public void handle(String s)  {
                                System.out.println("接受失败了...");
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };
            
                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "direct-info").start();
                    new Thread(runnable, "direct-error").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            direct-error:收到消息是:你好,消息队列!
            direct-info:收到消息是:你好,消息队列!

04.fanout发布订阅,扇形交换机,不依赖routingkey
    a.概念
        把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
        -------------------------------------------------------------------------------------------------
        FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,
        在这种策略中,routingkey 将不起任何作用
        -------------------------------------------------------------------------------------------------
        不处理路由键。你只需要简单的将队列绑定到交换机上。
        一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
        很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
        -------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,三个队列,三个消费者
        特点:发布与订阅模式,是一种广播机制,它是没有路由key的模式。
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo04;
            
            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 从连接工厂中获取
                    Connection connection = connectionFactory.newConnection("生产者");
                    // 3: 从连接中打开通道channel
                    Channel channel = connection.createChannel();
            
                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
                    String exchangeName = "fanout-exchange";
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机(也可通过web页面创建)
                    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
                    // 4-2: 声明队列
                    channel.queueDeclare("fanout-queue1", false, false, false, null);
                    channel.queueDeclare("fanout-queue2", false, false, false, null);
                    channel.queueDeclare("fanout-queue3", false, false, false, null);
                    // 4-3: 绑定交换机和队列
                    channel.queueBind("fanout-queue1", exchangeName, "");
                    channel.queueBind("fanout-queue2", exchangeName, "");
                    channel.queueBind("fanout-queue3", exchangeName, "");
            
                    // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                    channel.basicPublish(exchangeName, "", null, "你好,消息队列!".getBytes("UTF-8"));
                    System.out.println("消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo04;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import com.rabbitmq.client.DeliverCallback;
            
            public class Consumer {
            
                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();
            
                        // 6: 定义接受消息的回调
                        DeliverCallback deliverCallback = (consumerTag, delivery) ->
                                System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
            
                        channel.basicConsume(queueName, true, deliverCallback, consumerTag  -> {
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };
            
                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "fanout-queue1").start();
                    new Thread(runnable, "fanout-queue2").start();
                    new Thread(runnable, "fanout-queue3").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            fanout-queue3:收到消息是:你好,消息队列!
            fanout-queue1:收到消息是:你好,消息队列!
            fanout-queue2:收到消息是:你好,消息队列!

05.topic主题模式,主题交换机,依赖routingkey
    a.概念
        通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
        -------------------------------------------------------------------------------------------------
        星号井号代表通配符
        星号代表多个单词,井号代表一个单词
        路由功能添加模糊匹配
        消息产生者产生消息,把消息交给交换机
        交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费 
        -------------------------------------------------------------------------------------------------
        消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
        根据业务功能定义路由字符串
        从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
        业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
        -------------------------------------------------------------------------------------------------
        将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号#匹配一个或多个词,符号* 匹配不多不少一个词。
        因此audit.#能够匹配到audit.irs.corporate,但是audit.* 只会匹配到audit.irs
        -------------------------------------------------------------------------------------------------
        Topic 中的路由键设置规则:
        1.Topic Exchange中的路由关键字必须包含零个或多个由 . 点分隔的单词,例如health.education。
        2.Topic Exchange中的路由键通常称为路由模式。
        3.路由键允许只包含 星号(*)和 井号 (#)的正则表达式组成
        4.星号(*****)表示正好允许一个字。
        5.同样,井号(#)表示允许的单词数为零或更多。
        6.点号(.)表示–单词定界符。多个关键术语用点定界符分隔。
        7.如果路由模式为**health.\***,则意味着以第一个单词为首的路由键运行状况发送的任何消息都将到达队列。例如,health.education将到达此队列,但sports.health将不起作用。
        -------------------------------------------------------------------------------------------------
        Topic 中的消息流转过程:
        1.一个Queue队列通过路由键(P)绑定到 Exchange。
        2.Producer 将带有P路由键(K)的消息发送到 Topic Exchange。
        3.如果P与K匹配,则消息被传递到队列。路由密钥匹配的确定如下所述。
        4.订阅队列的使用者将收到消息。
        -------------------------------------------------------------------------------------------------
        一个生产者,一个交换机,两个队列,两个消费者
        特点:Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。
        生产者创建Topic的exchange并且绑定到队列中,绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx 去别写。* -》代表xxx,# -》代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底时什么
        简单的说:* 表示一个,# 表示 0个 或者 多个
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo05;
            
            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            
            public class Producer {
                public static void main(String[] args) throws IOException, TimeoutException {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 创建连接,获取通道
                    Connection connection = connectionFactory.newConnection("生产者");
                    Channel channel = connection.createChannel();
            
                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key(fanout模式时路由key为空字符串)
                    String exchangeName = "topic-exchange";
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机(也可通过web页面创建)
                    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
            
                    // 4-2: 声明队列
                    channel.queueDeclare("topic-queue1", false, false, false, null);
                    channel.queueDeclare("topic-queue2", false, false, false, null);
                    channel.queueDeclare("topic-queue3", false, false, false, null);
            
                    // 4-3: 绑定交换机和队列
                    channel.queueBind("topic-queue1", exchangeName, "#.order");
                    channel.queueBind("topic-queue2", exchangeName, "*.user");
            
                    // 5: @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                    // topic-queue1、topic-queue2、topic-queue3
                    channel.basicPublish(exchangeName, "com.course.order", null, "routingKey:#.order".getBytes("UTF-8"));
                    // topic-queue3
                    channel.basicPublish(exchangeName, "com.order.user", null, "routingKey:#.user".getBytes("UTF-8"));
            
                    System.out.println("消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo05;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Consumer {
            
                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();
            
                        // 6: 定义接受消息的回调
                        channel.basicConsume(queueName, true,
                                (consumerTag, delivery) -> System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")),
                                consumerTag  -> {
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.err.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };
            
                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "topic-queue1").start();
                    new Thread(runnable, "topic-queue2").start();
                    new Thread(runnable, "topic-queue3").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            topic-queue1:收到消息是:routingKey:#.order

06.headers x-mactch模式,头交换机,不依赖routingkey
    a.概念
        不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
        -------------------------------------------------------------------------------------------------
        Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。
        Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,
        接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。
        匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。
        all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。
        fanout,direct,topic exchange的routingKey都需要要字符串形式的,
        而headers exchange则没有这个要求,因为键值对的值可以是任何类型
        -------------------------------------------------------------------------------------------------
        Headers 中消息流转过程:
        1.一个或多个队列使用标头属性(H)绑定(链接)到标头交换。
        2.生产者将带有标头属性(MH)的消息发送到此Exchange。
        3.如果MH与H匹配,则消息将转发到队列。
        4.监听队列的使用者接收消息并对其进行处理。
    b.代码
        a.生产者
            package com.ruoyi.rabbitmq1.demo06;
            
            import com.rabbitmq.client.*;
            
            import java.util.HashMap;
            import java.util.Map;
            
            public class Producer {
                public static void main(String[] args) throws Exception {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    // 2: 创建连接,获取通道
                    Connection connection = connectionFactory.newConnection("生产者");
                    Channel channel = connection.createChannel();
            
                    // 3: 定义队列名称-随机生成队列名、交换机名、路由key
            
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                    /*
                     *  申明队列:如果队列不存在会自动创建。
                     *  注意:
                     *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                     *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                     *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                     *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                     *
                     *  @params1: queue 队列的名称
                     *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                     *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                     *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                     *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                     * */
                    // 4-1: 声明交换机
                    channel.exchangeDeclare("header-exchange", BuiltinExchangeType.HEADERS);
                    // 4-2: 声明队列
                    channel.queueDeclare("header-queue-noe", false, false, false, null);
                    channel.queueDeclare("header-queue-two", false, false, false, null);
                    // 4-3: 绑定交换机和队列并加上配置header匹配规则
                    Map<String, Object> oneArgs = new HashMap<>();
                    // 匹配其中一个key/value 就能匹配成功
                    oneArgs.put("x-match", "any");
                    oneArgs.put("h1", "Header1");
                    oneArgs.put("h2", "Header2");
                    channel.queueBind("header-queue-noe", "header-exchange", "", oneArgs);
                    Map<String, Object> twoArgs = new HashMap<>();
                    // 必须匹配所有key/value 才能匹配成功
                    twoArgs.put("x-match", "all");
                    twoArgs.put("h1", "Header1");
                    twoArgs.put("h2", "Header2");
                    channel.queueBind("header-queue-two", "header-exchange", "", twoArgs);
            
                    /**
                     * 定义需要发送的消息和属性
                     */
                    Map<String, Object> headerMap = new HashMap<>();
                    headerMap.put("h1", "Header1");
                    headerMap.put("h3", "Header3");
            
                    AMQP.BasicProperties basicPropertiesOne = new AMQP.BasicProperties()
                            .builder().headers(headerMap).build();
                    channel.basicPublish("header-exchange", "", basicPropertiesOne, "Header Exchange example 1".getBytes("UTF-8"));
                    System.out.println("h1,h3:消息发送成功!");
            
                    headerMap.put("h2", "Header2");
                    AMQP.BasicProperties basicPropertiesTwo = new AMQP.BasicProperties()
                            .builder().headers(headerMap).build();
                    channel.basicPublish("header-exchange", "", basicPropertiesTwo, "Header Exchange example 2".getBytes("UTF-8"));
                    System.out.println("h1,h2,h3:消息发送成功!");
            
                    // 最后关闭通关和连接
                    channel.close();
                    connection.close();
                }
            }
        b.消费者
            package com.ruoyi.rabbitmq1.demo06;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            public class Consumer {
            
                private static Runnable runnable = () -> {
                    // 1: 创建连接工厂,设置连接属性
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        // 2: 创建连接,获取通道(消费者一般不增加自动关闭)
                        connection = connectionFactory.newConnection("消费者");
                        channel = connection.createChannel();
                        // 获取队列的名称
                        final String queueName = Thread.currentThread().getName();
            
                        // 6: 定义接受消息的回调
                        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
                            System.out.println(queueName + ":收到消息是: " + new String(message.getBody()));
                        }), consumerTag -> {
                            System.out.println(consumerTag);
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("发送消息出现异常...");
                    } finally {
                        // 此处应该是需要关闭通道的,不过为了测试咱们不关闭了
                    }
                };
            
                public static void main(String[] args) {
                    // 启动三个线程去执行
                    new Thread(runnable, "header-queue-noe").start();
                    new Thread(runnable, "header-queue-two").start();
                }
            }
        c.运行
            先运行生产者,然后运行消费者。最后消费者控制台输出:
            header-queue-noe:收到消息是: Header Exchange example 1
            header-queue-two:收到消息是: Header Exchange example 2
            header-queue-noe:收到消息是: Header Exchange example 2

3.6 应用场景

00.应用场景
    01.第1种:单条消息过期
        体现(生产者)
        // 设置消息的过期时间为10000毫秒(10秒)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .expiration("10000") // 设置过期时间
            .build();
        channel.basicPublish("", "simple-queue1", props, message.getBytes());
    02.第2种:队列消息过期
        体现(生产者)
        // 设置消息过期时间为 10000 毫秒(10秒)
        channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});
    03.第3种:特殊情况
        还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
        这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
        是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)
    04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
        消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
        消息过期
        队列达到最大长度
        -----------------------------------------------------------------------------------------------------
        步骤如下:
        1: 创建连接工厂,设置连接属性
        2: 从连接工厂中获取
        3: 从连接中打开通道channel
        4.1: 声明死信交换机、死信队列、绑定死信队列和死信交换机
        4.2: 声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
        5: 通过channel发送消息
        -----------------------------------------------------------------------------------------------------
        体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机                       重
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key        重
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定
    05.第5种:rabbitmq_delayed_message_exchange 插件 实现延迟队列
        // 4: 声明延迟交换机
        channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);      重
        channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
        channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

        // 5: 准备发送消息的内容
        String message = "你好,消息队列!!! 5000毫秒";
        // 设置延迟时间(例如,5000毫秒)
        int delay = 5000;
        // 6: 发送消息给延迟交换机
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                .build();
        channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
        System.out.println("消息发送成功!");
    06.第6种:发送可靠性?确认机制、事务 / 消费可靠性?确认机制、幂等性
        a.方式1:通过AMQP提供的事务机制实现
            -- 开启事务
            channel.txSelect();
            -- 提交事务
            channel.txCommit();
            -- 回滚事务
            channel.txRollback();
        b.方式2:消息的发送者确认模式
            a.说明
                消息的发送者确认模式的使用和事务类似,也是通过channel进行发送确认的,
                但该模式和事务有着本质的区别就是发送消息丢失的时候不会像事务一样停止发送消息,
                而是补发消息直到消息发送到对方确认为止。
            b.三种实现方式
                a.方法1
                    channel.waitForConfirms()普通发送消息确认模式
                    该模式是不断的等待接收方来返回消息,如果有返回消息则说明消息发送成功
                    -----------------------------------------------------------------------------------------
                    boolean flag= channel.waitForConfirms();会堵塞线程等带服务器来返回确认消息。
                    可以为这个函数指定一个毫秒值用于等待服务器的确认超时时间。
                    如果抛出异常表示服务器出了问题,需要补发消息。
                    无论是返回false还是抛出异常都有可能消息发送成功或者没有发送成功。
                    补发消息可以将消息缓存到Redis中稍后使用定时任务来补发,或者使用递归的方法来补发
                b.方法2
                    channel.waitForConfirmsOrDie()函数批量确认模式
                    -----------------------------------------------------------------------------------------
                    channel.waitForConfirmsOrDie();
                    该函数会同时向服务中确认之前当前通道中发送消息是否已经全部写入成功,该函数没有返回值,
                    如果服务器中没有一条消息能够发送成功或者向服务器发送确认时服务不可访问都被认定为消息发送失败。
                    可能消息发送成功,也可能消息没发送成功
                    -----------------------------------------------------------------------------------------
                    channel.waitForConfirmsOrDie();
                    也可以指定一个毫秒值来用于等带服务器的确认时间,如果超过这个时间就抛出异常,表示确认失败需要补发
                    -----------------------------------------------------------------------------------------
                    注意:
                    批量确认消息比普通确认要快,但是如果一但出现了消息补发的情况,就不能确定是哪条消息需要补发,
                    所以就会将本次发送的所有消息进行补发。
                c.方法3
                    channel.addConfirmListener()异步监听发送确认模式。需要new ConfirmListener()来实现里面的回调函数
                    public void handleAck(long l, boolean b) throws IOException
                    public void handleNack(long l, boolean b) throws IOException 
    07.第7种:Stream流
        a.说明
            a.概念
                abbitMQ 从 v3.9 版本开始引入了一个名为 RabbitMQ Streams 的新功能
                它允许消息以高吞吐量和低延迟的方式通过流的形式进行传递
            b.特点
                RabbitMQ Streams 专为处理大量数据和实时数据流设计,它与传统的 RabbitMQ 消息队列模型有所不同,主要特点包括:
                1.高吞吐量:RabbitMQ Streams 可以处理数百万条消息的高并发流式数据传输,适用于实时分析和大数据处理场景。
                2.持久化流:消息可以以流的形式持久化,这意味着即使消费者处理较慢,数据也不会丢失。
                3.多消费者读取:同一条消息可以被多个消费者读取,而不会影响消息的传递和持久化。
                4.按消息索引读取:消费者可以根据消息的索引选择从特定位置开始读取消息,而不像传统的队列那样必须按顺序处理。
                5.消费分区:类似于 Kafka,RabbitMQ Streams 也支持分区机制,帮助提高吞吐量和并行性。
        b.代码实现
            a.准备
                a.安装RabbitMQ并启用stream插件
                    rabbitmq-plugins enable rabbitmq_stream
                b.依赖
                    <dependency>
                        <groupId>com.rabbitmq</groupId>
                        <artifactId>stream-client</artifactId>
                        <version>0.5.1</version>
                    </dependency>
                c.在RabbitMQ中创建Stream队列
                    rabbitmqadmin declare queue name=my-stream durable=true type=stream

01.第1种:单条消息过期
    a.生产者
        package com.ruoyi.rabbitmq2.demo01;
        
        import com.rabbitmq.client.AMQP;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        
        public class Producer {
        
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();
        
                // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                /*
                 *  申明队列:如果队列不存在会自动创建。
                 *  注意:
                 *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                 *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                 *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                 * */
                channel.queueDeclare("simple-queue1", true, false, false, null);
                // 5: 准备发送消息的内容
                String message = "你好,消息队列!!!";
                // 6: 发送消息给队列queue1
                /*
                 * @params1: 交换机exchange
                 * @params2: 队列名称、路由key(routing)
                 * @params3: 属性配置
                 * @params4: 发送消息的内容
                 **/
                // 设置消息的过期时间为10000毫秒(10秒)
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .expiration("10000") // 设置过期时间
                    .build();
                channel.basicPublish("", "simple-queue1", props, message.getBytes());
                System.out.println("消息发送成功!");
        
                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    b.消费者
        package com.ruoyi.rabbitmq2.demo01;
        
        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;
        
        public class Consumer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                     Channel channel = connection.createChannel()){
        
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
                    //channel.queueDeclare("queue1", false, false, false, null);
        
                    // 接收消息,监听对应的队列名即可
                    /*
                     *  @params1: queue 队列的名称
                     *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                     *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                     *  @params4: cancelCallback 消费失败回调
                     * */
                    channel.basicConsume("simple-queue1", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s)  {
                            System.out.println("接受失败了...");
                        }
                    });
        
                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }
        
            }
        }
    c.体现(生产者)
        // 设置消息的过期时间为10000毫秒(10秒)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .expiration("10000") // 设置过期时间
            .build();
        channel.basicPublish("", "simple-queue1", props, message.getBytes());

02.第2种:队列消息过期
    a.生产者
        package com.ruoyi.rabbitmq2.demo02;
        
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import java.util.HashMap;
        
        public class Producer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();
        
                // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                /*
                 *  申明队列:如果队列不存在会自动创建。
                 *  注意:
                 *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                 *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                 *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                 * */
                // 设置消息过期时间为 10000 毫秒(10秒)
                channel.queueDeclare("simple-queue2", true, false, false,
                    new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});
                // 5: 准备发送消息的内容
                String message = "你好,消息队列!!!";
                // 6: 发送消息给队列queue1
                /*
                 * @params1: 交换机exchange
                 * @params2: 队列名称、路由key(routing)
                 * @params3: 属性配置
                 * @params4: 发送消息的内容
                 **/
                // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
                channel.basicPublish("", "simple-queue2", null, message.getBytes());
                System.out.println("消息发送成功!");
        
                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    b.消费者
        package com.ruoyi.rabbitmq2.demo02;
        
        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;
        
        public class Consumer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                     Channel channel = connection.createChannel()){
        
                    // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息(声明队列可以在生产者或者消费者端)
                    //channel.queueDeclare("queue1", false, false, false, null);
        
                    // 接收消息,监听对应的队列名即可
                    /*
                     *  @params1: queue 队列的名称
                     *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                     *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                     *  @params4: cancelCallback 消费失败回调
                     * */
                    channel.basicConsume("simple-queue2", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s)  {
                            System.out.println("接受失败了...");
                        }
                    });
        
                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }
        
            }
        }
    c.体现(生产者)
        // 设置消息过期时间为 10000 毫秒(10秒)
        channel.queueDeclare("simple-queue1", false, false, false, new HashMap<String, Object>(){{put("x-message-ttl", 5000);}});

03.第3种:特殊情况
    还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,
    这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,
    是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)

04.第4种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
    a.生产者
        package com.ruoyi.rabbitmq2.demo03;
        
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import java.util.HashMap;
        import java.util.Map;
        
        public class Producer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();
        
                // 4: 通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
                /*
                 *  申明队列:如果队列不存在会自动创建。
                 *  注意:
                 *  1.Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *  2.队列声明可以放在生产者、消费者或web页面上创建。但是在消费者启动监听之前队列一定要创建好。
                 *    如果要先启动消费者,建议把声明队列放在消费者端。否在消费者监听队列不存在会报异常。
                 *    如果要先启动生产者,建议在生产者端声明队列。虽然发送消息时队列不存不会报错,但第一次发送时队列不存在相当于白发送了。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化(即存盘),false = 非持久化 true = 持久化,非持久化会存盘吗?会存盘,但是会随从重启服务会丢失
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
                 * */
                // 4.1
                // 声明死信交换机
                channel.exchangeDeclare("dlx-exchange", "direct");
                // 声明死信队列
                channel.queueDeclare("dlx-queue", true, false, false, null);
                // 绑定死信队列和死信交换机
                channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");
        
                // 4.2
                // 声明普通交换机
                // 使用默认 (AMQP Default)
                // 声明普通队列,设置TTL和DLX
                Map<String, Object> map = new HashMap<>();
                map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
                map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
                map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key
                channel.queueDeclare("simple-queue3", true, false, false, map);
                // 绑定普通队列和普通交换机
                // 无需绑定
                
                // 5.通过channel发送消息
                // @params1: 交换机名  @params2 队列/路由key @params 属性配置  @params4 消息内容
                channel.basicPublish("", "simple-queue3", null, "你好,消息队列 dlx!!!".getBytes());
                System.out.println("消息发送成功!");
        
                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    b.消费者
        package com.ruoyi.rabbitmq2.demo03;
        
        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;
        
        public class Consumer {
        
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                    Channel channel = connection.createChannel()) {
                    // 接收死信队列的消息
                    channel.basicConsume("dlx-queue", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到死信队列消息: " + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s) {
                            System.out.println("接受失败了...");
                        }
                    });
        
                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }
        
            }
        }
    c.体现(生产者)
        // 4.1
        // 声明死信交换机
        channel.exchangeDeclare("dlx-exchange", "direct");
        // 声明死信队列
        channel.queueDeclare("dlx-queue", true, false, false, null);
        // 绑定死信队列和死信交换机
        channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

        // 4.2
        // 声明普通交换机
        // 使用默认 (AMQP Default)
        // 声明普通队列,设置TTL和DLX
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
        map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机
        map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key
        channel.queueDeclare("simple-queue3", true, false, false, map);
        // 绑定普通队列和普通交换机
        // 无需绑定

05.第5种:rabbitmq_delayed_message_exchange 插件 实现延迟队列
    a.安装
        下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
        放入文件夹 C:\software\RabbitMQ\rabbitmq_server-3.8.9\plugins
        启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
        查看插件 rabbitmq-plugins list
    b.生产者
        package com.ruoyi.rabbitmq2.demo04;
        
        import com.rabbitmq.client.AMQP;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import java.util.HashMap;
        
        public class Producer {
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                // 2: 从连接工厂中获取
                Connection connection = connectionFactory.newConnection("生产者");
                // 3: 从连接中打开通道channel
                Channel channel = connection.createChannel();
        
                // 4: 声明延迟交换机
                channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);
                channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
                channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机
        
                // 5: 准备发送消息的内容
                String message = "你好,消息队列!!! 5000毫秒";
                // 设置延迟时间(例如,5000毫秒)
                int delay = 5000;
                // 6: 发送消息给延迟交换机
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                        .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                        .build();
                channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
                System.out.println("消息发送成功!");
        
                // 5: 准备发送消息的内容
                String message2 = "你好,消息队列!!! 10000毫秒";
                // 设置延迟时间(例如,10000毫秒)
                int delay2 = 10000;
                // 6: 发送消息给延迟交换机
                AMQP.BasicProperties props2 = new AMQP.BasicProperties.Builder()
                        .headers(new HashMap<String, Object>(){{put("x-delay", delay2);}})
                        .build();
                channel.basicPublish("delayed_exchange", "", props2, message2.getBytes()); // 修改为使用空的路由键
                System.out.println("消息发送成功!");
        
                // 最后关闭通关和连接
                channel.close();
                connection.close();
            }
        }
    c.消费者
        package com.ruoyi.rabbitmq2.demo04;
        
        import com.rabbitmq.client.CancelCallback;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.DeliverCallback;
        import com.rabbitmq.client.Delivery;
        import java.io.IOException;
        
        public class Consumer {
        
            public static void main(String[] args) throws Exception {
                // 1: 创建连接工厂,设置连接属性
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("guest");
                connectionFactory.setPassword("guest");
        
                /*
                 * 2: 从连接工厂中获取/创建连接(断点到此步可以发现web界面Connection下会出现此连接信息)
                 * 3: 从连接中获取通道channel(断点到此步可以发现web界面Channel下会出现此连接信息)
                 */
                try (Connection connection = connectionFactory.newConnection("消费者");
                    Channel channel = connection.createChannel()) {
                    // 接收消息,监听对应的队列名即可
                    /*
                     *  @params1: queue 队列的名称
                     *  @params2: autoAck 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
                     *  @params3: deliverCallback 指定消费回调,开启监听队列queue1
                     *  @params4: cancelCallback 消费失败回调
                     * */
                    channel.basicConsume("delayed_queue", true, new DeliverCallback() {
                        @Override
                        public void handle(String consumerTag, Delivery delivery) throws IOException {
                            System.out.println("收到消息是" + new String(delivery.getBody(), "UTF-8"));
                        }
                    }, new CancelCallback() {
                        @Override
                        public void handle(String s) {
                            System.out.println("接受失败了...");
                        }
                    });
        
                    // 让程序停止,好接收消费
                    System.out.println("开始接受消息");
                    System.in.read();
                }
        
            }
        }
    d.体现(生产者)
        // 4: 声明延迟交换机
        channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);
        channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
        channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

        // 5: 准备发送消息的内容
        String message = "你好,消息队列!!! 5000毫秒";
        // 设置延迟时间(例如,5000毫秒)
        int delay = 5000;
        // 6: 发送消息给延迟交换机
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
                .build();
        channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
        System.out.println("消息发送成功!");

06.第6种:发送可靠性?确认机制、事务 / 消费可靠性?确认机制、幂等性
    a.方式1:通过AMQP提供的事务机制实现
        a.说明
            启动事务,启动事务以后所有写入队列中的消息,必须显示调用事务的txCommit()函数来提交事务,获取txRollback()回滚事务
            注意:如果调用channel.txSelect()函数来开启事务,那么必须显示的调用事务的提交函数,否则消息不会进入到消息队列中
            注意:回滚事务必须在channel关闭之前
            -- 开启事务
            channel.txSelect();
            -- 提交事务
            channel.txCommit();
            -- 回滚事务
            channel.txRollback();
        b.生产者
            package com.ruoyi.rabbitmq2.demo05;
            
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import java.io.IOException;
            
            public class Producer {
            
                public static void main(String[] args) {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        connection = connectionFactory.newConnection();
                        channel = connection.createChannel();
            
                        // 创建队列
                        channel.queueDeclare("transactionQueue", true, false, false, null);
                        // 创建交换机
                        channel.exchangeDeclare("directTransactionExchange", "direct", true);
                        // 创建绑定关系
                        channel.queueBind("transactionQueue", "directTransactionExchange", "transactionRoutingKey");
            
                        // 开启事务
                        channel.txSelect();
            
                        // 发送消息
                        String message = "事务测试消息";
                        channel.basicPublish("directTransactionExchange", "transactionRoutingKey", null, message.getBytes("utf-8"));
            
                        // 提交事务
                        channel.txCommit();
                        System.out.println("消息发送成功");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if (channel != null) {
                            try {
                                // 回滚事务
                                channel.txRollback();
                                channel.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        c.消费者
            package com.ruoyi.rabbitmq2.demo05;
            
            import com.rabbitmq.client.AMQP;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import com.rabbitmq.client.DefaultConsumer;
            import com.rabbitmq.client.Envelope;
            import java.io.IOException;
            
            public class Consumer {
            
                public static void main(String[] args) {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost("127.0.0.1");
                    connectionFactory.setPort(5672);
                    connectionFactory.setVirtualHost("/");
                    connectionFactory.setUsername("guest");
                    connectionFactory.setPassword("guest");
            
                    Connection connection = null;
                    Channel channel = null;
                    try {
                        connection = connectionFactory.newConnection();
                        channel = connection.createChannel();
            
                        // 创建队列
                        channel.queueDeclare("transactionQueue", true, false, false, null);
                        // 创建交换机
                        channel.exchangeDeclare("directTransactionExchange", "direct", true);
                        // 创建绑定关系
                        channel.queueBind("transactionQueue", "directTransactionExchange", "transactionRoutingKey");
            
                        // 开启事务
                        channel.txSelect();
            
                        // 获取消息
                        channel.basicConsume("transactionQueue", true, "", new DefaultConsumer(channel) {
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                String message = new String(body);
                                System.out.println(message);
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
    b.方式2:消息的发送者确认模式
        a.说明
            消息的发送者确认模式的使用和事务类似,也是通过channel进行发送确认的,
            但该模式和事务有着本质的区别就是发送消息丢失的时候不会像事务一样停止发送消息,
            而是补发消息直到消息发送到对方确认为止。
        b.三种实现方式
            a.方法1
                channel.waitForConfirms()普通发送消息确认模式
                该模式是不断的等待接收方来返回消息,如果有返回消息则说明消息发送成功
                ---------------------------------------------------------------------------------------------
                boolean flag= channel.waitForConfirms();会堵塞线程等带服务器来返回确认消息。
                可以为这个函数指定一个毫秒值用于等待服务器的确认超时时间。
                如果抛出异常表示服务器出了问题,需要补发消息。
                无论是返回false还是抛出异常都有可能消息发送成功或者没有发送成功。
                补发消息可以将消息缓存到Redis中稍后使用定时任务来补发,或者使用递归的方法来补发
            b.方法2
                channel.waitForConfirmsOrDie()函数批量确认模式
                ---------------------------------------------------------------------------------------------
                channel.waitForConfirmsOrDie();
                该函数会同时向服务中确认之前当前通道中发送消息是否已经全部写入成功,该函数没有返回值,
                如果服务器中没有一条消息能够发送成功或者向服务器发送确认时服务不可访问都被认定为消息发送失败。
                可能消息发送成功,也可能消息没发送成功
                ---------------------------------------------------------------------------------------------
                channel.waitForConfirmsOrDie();
                也可以指定一个毫秒值来用于等带服务器的确认时间,如果超过这个时间就抛出异常,表示确认失败需要补发
                ---------------------------------------------------------------------------------------------
                注意:
                批量确认消息比普通确认要快,但是如果一但出现了消息补发的情况,就不能确定是哪条消息需要补发,
                所以就会将本次发送的所有消息进行补发。
            c.方法3
                channel.addConfirmListener()异步监听发送确认模式。需要new ConfirmListener()来实现里面的回调函数
                public void handleAck(long l, boolean b) throws IOException
                public void handleNack(long l, boolean b) throws IOException 

07.第7种:Stream流
    a.说明
        a.概念
            abbitMQ 从 v3.9 版本开始引入了一个名为 RabbitMQ Streams 的新功能
            它允许消息以高吞吐量和低延迟的方式通过流的形式进行传递
        b.特点
            RabbitMQ Streams 专为处理大量数据和实时数据流设计,它与传统的 RabbitMQ 消息队列模型有所不同,主要特点包括:
            1.高吞吐量:RabbitMQ Streams 可以处理数百万条消息的高并发流式数据传输,适用于实时分析和大数据处理场景。
            2.持久化流:消息可以以流的形式持久化,这意味着即使消费者处理较慢,数据也不会丢失。
            3.多消费者读取:同一条消息可以被多个消费者读取,而不会影响消息的传递和持久化。
            4.按消息索引读取:消费者可以根据消息的索引选择从特定位置开始读取消息,而不像传统的队列那样必须按顺序处理。
            5.消费分区:类似于 Kafka,RabbitMQ Streams 也支持分区机制,帮助提高吞吐量和并行性。
    b.代码实现
        a.准备
            a.安装RabbitMQ并启用stream插件
                rabbitmq-plugins enable rabbitmq_stream
            b.依赖
                <dependency>
                    <groupId>com.rabbitmq</groupId>
                    <artifactId>stream-client</artifactId>
                    <version>0.5.1</version>
                </dependency>
            c.在RabbitMQ中创建Stream队列
                rabbitmqadmin declare queue name=my-stream durable=true type=stream
        b.生产者
            import com.rabbitmq.stream.Environment;
            import com.rabbitmq.stream.Producer;
            import com.rabbitmq.stream.ProducerBuilder;
            import com.rabbitmq.stream.Message;
            import com.rabbitmq.stream.MessageBuilder;
            
            public class StreamProducer {
                public static void main(String[] args) {
                    // 创建一个环境(连接到RabbitMQ的Stream服务)
                    Environment environment = Environment.builder()
                            .host("localhost")  // RabbitMQ 服务器地址
                            .port(5552)         // Stream 插件的默认端口
                            .build();
            
                    // 创建生产者并指定Stream队列
                    Producer producer = environment.producerBuilder()
                            .stream("my-stream") // 指定队列名称
                            .build();
            
                    // 构建消息
                    Message message = MessageBuilder
                            .withBody("Hello, RabbitMQ Stream!".getBytes()) // 消息内容
                            .build();
            
                    // 发送消息
                    producer.send(message, confirmationStatus -> {
                        if (confirmationStatus.isConfirmed()) {
                            System.out.println("消息已确认发送成功!");
                        } else {
                            System.out.println("消息发送失败!");
                        }
                    });
            
                    // 关闭生产者
                    producer.close();
                    environment.close();
                }
            }
        c.消费者
            import com.rabbitmq.stream.Environment;
            import com.rabbitmq.stream.Consumer;
            import com.rabbitmq.stream.ConsumerBuilder;
            
            public class StreamConsumer {
                public static void main(String[] args) {
                    // 创建一个环境(连接到RabbitMQ的Stream服务)
                    Environment environment = Environment.builder()
                            .host("localhost")  // RabbitMQ 服务器地址
                            .port(5552)         // Stream 插件的默认端口
                            .build();
            
                    // 创建消费者并指定Stream队列
                    Consumer consumer = environment.consumerBuilder()
                            .stream("my-stream") // 指定队列名称
                            .messageHandler((context, message) -> {
                                String body = new String(message.getBodyAsBinary());
                                System.out.println("收到消息: " + body);
                            })
                            .build();
            
                    // 消费者会持续监听队列消息
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        consumer.close();
                        environment.close();
                    }));
                }
            }

4 RabbitMQ场景

4.1 如何实现延迟队列?

01.第1种:DLX 实现延迟队列,DLX(死信交换机)+TTL(消息超时时间)
    消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
    消息过期
    队列达到最大长度
    -----------------------------------------------------------------------------------------------------
    步骤如下:
    1: 创建连接工厂,设置连接属性
    2: 从连接工厂中获取
    3: 从连接中打开通道channel
    4.1: 声明死信交换机、死信队列、绑定死信队列和死信交换机
    4.2: 声明普通交换机、声明普通队列,设置TTL和DLX、绑定普通队列和普通交换机
    5: 通过channel发送消息
    -----------------------------------------------------------------------------------------------------
    体现(生产者)
    // 4.1
    // 声明死信交换机
    channel.exchangeDeclare("dlx-exchange", "direct");
    // 声明死信队列
    channel.queueDeclare("dlx-queue", true, false, false, null);
    // 绑定死信队列和死信交换机
    channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

    // 4.2
    // 声明普通交换机
    // 使用默认 (AMQP Default)
    // 声明普通队列,设置TTL和DLX
    Map<String, Object> map = new HashMap<>();
    map.put("x-message-ttl", 5000); // 消息超时时间为5000毫秒
    map.put("x-dead-letter-exchange", "dlx-exchange"); // 设置死信交换机                       重
    map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 设置死信交换机的路由key        重
    channel.queueDeclare("simple-queue3", true, false, false, map);
    // 绑定普通队列和普通交换机
    // 无需绑定

02.第2种:rabbitmq_delayed_message_exchange 插件 实现延迟队列
    // 4: 声明延迟交换机
    channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, null);      重
    channel.queueDeclare("delayed_queue", true, false, false, null); // 修改为延迟队列名称
    channel.queueBind("delayed_queue", "delayed_exchange", ""); // 绑定到延迟交换机

    // 5: 准备发送消息的内容
    String message = "你好,消息队列!!! 5000毫秒";
    // 设置延迟时间(例如,5000毫秒)
    int delay = 5000;
    // 6: 发送消息给延迟交换机
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .headers(new HashMap<String, Object>(){{put("x-delay", delay);}})
            .build();
    channel.basicPublish("delayed_exchange", "", props, message.getBytes()); // 修改为使用空的路由键
    System.out.println("消息发送成功!");

4.2 消费端怎么进行限流?

00.背景
    当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器奔溃,这种情况下需要对消费端限流

02.限流
    a.介绍
        Spring RabbitMQ 提供参数 prefetch 可以设置单个请求处理的消息个数
        如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息
    b.开启消费端限流
        ##在单个请求中处理的消息个数,unack的最大数量
        spring.rabbitmq.listener.simple.prefetch=2
    c.原生 RabbitMQ 还提供 prefetchSize 和 global 两个参数。Spring RabbitMQ没有这两个参数
        //单条消息大小限制,0代表不限制
        //global:限制限流功能是channel级别的还是consumer级别。当设置为false,consumer级别,限流功能生效,设置为true没有了限流功能,因为channel级别尚未实现。
        void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

4.3 消息重复消费怎么处理?

01.原因
    1.生产时消息重复
    2.消费时消息重复

02.场景
    生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
    消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。
    这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。

03.解决方法
    发送消息时让每个消息携带一个全局的唯一ID,在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性
    具体消费过程为:
    1.消费者获取到消息后先根据id去查询redis/db是否存在该消息
    2.如果不存在,则正常消费,消费完毕后写入redis/db
    3.如果存在,则证明消息被消费过,直接丢弃

4.4 用户订单15分钟内支付,否则取消订单?延迟队列/死信队列

00.使用Redis延迟队列实现(zset)
    添加任务:在用户操作(如发起订单、发布文章)时,计算任务的执行时间,并将任务添加到有序集合中。
    轮询定时处理:后台线程定期检查有序集合中到期的任务,并处理这些任务。
    任务处理:根据任务ID执行具体的业务逻辑,例如发布文章或审核文章。

00.采用RabbitMQ的延时队列而不采用Redis的延时队列
    采用RabbitMQ的延时队列,可以利用其可靠的消息持久化、原生的延时消息支持、高并发处理能力和丰富的路由功能,
    更适合发布文章、审核文章和取消订单等需要可靠延时处理的场景。
    而Redis虽然可以通过定制实现延时队列,但在可靠性和功能性上有所不足。

01.使用RabbitMQ延迟队列实现
    a.安装插件
        启用rabbitmq_delayed_message_exchange插件。
        rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    b.创建延迟交换机和队列
        创建一个x-delayed-message类型的交换机。
        创建一个普通队列,并将其绑定到延迟交换机。
    c.发送延迟消息
        在用户发起订单时,向延迟交换机发送一条消息,设置消息的延迟时间为15分钟(900000毫秒)。
    d.消费延迟消息
        消费者监听队列,当延迟时间到达时,接收消息并检查订单支付状态。
        如果订单未支付,则执行取消订单操作。

02.使用RabbitMQ死信队列实现
    a.创建普通队列和死信交换机
        创建一个普通队列,并设置其消息TTL(Time-To-Live)为15分钟。
        创建一个死信交换机和死信队列。
    b.配置死信队列
        配置普通队列的死信参数,将消息过期后转发到死信交换机。
        将死信交换机绑定到死信队列。
    c.发送订单消息
        在用户发起订单时,向普通队列发送一条消息。
    d.消费死信队列消息
        消费者监听死信队列,当消息从普通队列转发到死信队列时,接收消息并检查订单支付状态。
        如果订单未支付,则执行取消订单操作。

03.对比总结
    延迟队列:消息在指定时间后才会被处理。适用于需要精确定时的任务。
    死信队列:消息在过期或被拒绝后转发到另一个队列处理。适用于处理失败或需要额外处理的消息。

4.5 死信队列实现延迟队列效果?用于发布文章、审核文章场景

01.死信队列
    a.定义
        消费失败的消息存放的队列。
    b.消息消费失败的原因:
        消息被拒绝并且消息没有重新入队(requeue=false)
        消息超时未消费
        达到最大队列长度
    c.工作原理
        当普通队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。
        可以监听死信队列中的消息做相应的处理。

02.实现方法
    a.创建主队列
        首先,你需要创建一个主队列,用于发布文章。文章将被发送到这个主队列,但不会立即处理。
    b.创建死信队列
        接下来,你需要创建一个死信队列。这个队列将用于存储那些需要延迟发布的文章。
    c.定义延迟规则
        在发布文章时,将文章消息发布到主队列,但是设置一个延迟时间。你可以使用 RabbitMQ 的消息属性来指定延迟时间
    d.设置消费者(审核文章)
        创建一个消费者,用于从死信队列中获取延迟发布的文章消息,并将其发布到主队列中以实现实际发布。
    e.启动消费者(审核文章)
        启动上述消费者程序,它会监听死信队列,并在延迟时间结束后将文章发布到主队列,实现延迟发布效果。