1 基础概念

01.MQ的优点
    异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
    应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
    流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
    日志处理 - 解决大量日志传输。
    消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

02.解耦、异步、削峰是什么?
    解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
    就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
    异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
    削峰:减少高峰时期对服务器压力。

03.消息队列有什么缺点
    a.系统可用性降低:本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;
    b.系统复杂度提高:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
    c.一致性问题:A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

2 集群搭建

2.1 master主节点

01.RocketMQ
	a.下载
		http://rocketmq.apache.org/release_notes/release-notes-4.4.0/ 
  	b.将安装包移动到/usr后,打开目录
  		cd /usr
  	c.解压
		tar -zxvf rocketmq-all-4.4.0-bin-release.zip
 	d.重命名
		mv rocketmq-all-4.4.0-bin-release rocketmq
	
02.域名映射 vi /etc/hosts
	192.168.2.128 bigdata01
	192.168.2.128 mqnameserver1
	192.168.2.128 mqmaster1
	192.168.2.129 mqnameserver2
	192.168.2.129 mqmaster1slaver1

03.创建存储路径
	a.打开目录
		cd /usr/rocketmq/
	b.新建目录
		mkdir mqstore
	c.创建存储路径目录
		mkdir mqstore/commitlog			日志
		mkdir mqstore/consumequeue		消费队列
		mkdir mqstore/index				索引

04.主配置文件 /usr/rocketmq/conf/2m-2s-async/broker-a.properties
    brokerClusterName=DefaultCluster
    brokerName=broker-a
    brokerId=0
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
    defaultTopicQueueNums=4
    autoCreateTopicEnable=true
    listenPort=10911
    storePathRootDir=/usr/rocketmq/mqstore
    storePathCommitLog=/usr/rocketmq/mqstore/commitlog
    storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue
    storePathIndex=/usr/rocketmq/mqstore/index
    maxMessageSize=65536

05.从配置文件	/usr/rocketmq/conf/2m-2s-async/broker-a-s.properties
	brokerClusterName=DefaultCluster
    brokerName=broker-a
    brokerId=1
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
    defaultTopicQueueNums=4
    autoCreateTopicEnable=true
    listenPort=10911
    deleteWhen=04
    fileReservedTime=48
    storePathRootDir=/usr/rocketmq/mqstore
    storePathCommitLog=/usr/rocketmq/mqstore/commitlog
    storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue
    storePathIndex=/usr/rocketmq/mqstore/index
    maxMessageSize=65536

06.配置日志
	a.新建日志目录
		mkdir /usr/rocketmq/logs
	b.打开conf文件进行配置日志
		cd /usr/rocketmq/conf
	c.统一设置user.home
		sed -i 's#${user.home}#/usr/rocketmq#g'  *.xml
	
07.降低启动参数
	a.打开bin目录
		cd /usr/rocketmq/bin
	b.vi runbroker.sh
		JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g
	c.vi runserver.sh
		JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g

08.启动mater(192.168.2.128)
	a.防火墙
		systemctl stop firewalld
		systemctl disable firewalld
	b.后台启动Namesrv
		nohup sh mqnamesrv &
	c.后台启动BrokerServer 主配置文件
		nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties &
	d.查看进程
		jps
	e.关闭
		./mqshutdown broker
		./mqshutdown namesrv

2.2 slave从节点

01.将主节点上的rocketmq 远程复制到 从节点
	a.在主节点上,打开目录
		cd /usr
	b.从主节点拷贝到从节点
		scp -r rocketmq/ root@192.168.2.129:/usr/

02.启动slave(192.168.2.129)
	a.防火墙
		systemctl stop firewalld
		systemctl disable firewalld
	b.后台启动Namesrv
		nohup sh mqnamesrv &
	c.后台启动BrokerServer,从配置文件
		nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties &
	d.查看进程
		jps
	e.关闭
		./mqshutdown broker
		./mqshutdown namesrv

2.3 搭建控制台

01.在终端打包console
	mvn clean package -Dmaven.test.skip=true

02.执行jar包
	java -jar rocketmq-console-ng-1.0.1.jar

2.4 启动主从同步

01.启动mater(192.168.2.128)
	a.防火墙
		systemctl stop firewalld
		systemctl disable firewalld
	b.后台启动Namesrv
		nohup sh mqnamesrv &
	c.后台启动BrokerServer 主配置文件
		nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties &
	d.查看进程
		jps
	e.关闭
		./mqshutdown broker
		./mqshutdown namesrv

02.启动slave(192.168.2.129)
	a.防火墙
		systemctl stop firewalld
		systemctl disable firewalld
	b.后台启动Namesrv
		nohup sh mqnamesrv &
	c.后台启动BrokerServer,从配置文件
		nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a-s.properties &
	d.查看进程
		jps
	e.关闭
		./mqshutdown broker
		./mqshutdown namesrv

2.5 测试主从同步

3 汇总

01.案例一、二
    a.生产者
    	a.消息延迟
    	b.消息类型:同步消息、异步消息、单向消息
    b.消费者
    	a.Push消费者
    	b.标签订阅
    	C.消费者的消息模型:集群模式(默认)、广播模式
    	d.监听器:并发消费
    	e.消费类型:迭代消费、一次性消费
    	f.其他常见API 

02.案例三
    a.消息局部顺序消费问题
        a.生产者(MyProducerOrder,算法保证有序)
        b.消费者(MyConsumerOrder,MessageListenerOrderly保证有序)
    b.批量发送消息
        a.生产者(MyProducerBatch,将数据封装到List集合中,一次性发送多条数据)

03.事务
    a.事务补偿机制
        生产者再向MQServer提交事务时,有两次机会,女如果第一次失败,贝则进行回查,回查后还会进行第二次提交/回滚。
        注意:延迟消息、批量消息不支持事务机制
    b.在MQserver中消息有三种状态
        ①提交状态:可以被消费者消费
        ②回滚状态:不能被消费者消费
        ③未知状态(中间状态):需要借助于本地事务,进行一次回查,事务补偿机制

4 案例一

01.生产者
	a.消息延迟
	b.消息类型:同步消息、异步消息、单向消息

02.消费者
	a.Push消费者
	b.标签订阅
	C.消费者的消息模型:集群模式(默认)、广播模式
	d.监听器:并发消费
	e.消费类型:迭代消费、一次性消费
	f.其他常见API 

5 案例二

01.消费者
	a.Pull消费者(DefaultMQPullConsumer) + 偏移量
	b.Pull消费者(MQPullConsumerScheduleService,时间调度:每隔3秒主动拉去一次数据) + 偏移量

6 案例三

01.消息局部顺序消费问题
	a.生产者(MyProducerOrder,算法保证有序)
    b.消费者(MyConsumerOrder,MessageListenerOrderly保证有序)

02.批量发送消息
	a.生产者(MyProducerBatch,将数据封装到List集合中,一次性发送多条数据)

7 消息事务机制