1 基础概念
01.MQ的优点
异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
日志处理 - 解决大量日志传输。
消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。
02.解耦、异步、削峰是什么?
解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
削峰:减少高峰时期对服务器压力。
03.消息队列有什么缺点
a.系统可用性降低:本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;
b.系统复杂度提高:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
c.一致性问题:A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。



2 集群搭建
2.1 master主节点
01.RocketMQ
a.下载
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
b.将安装包移动到/usr后,打开目录
cd /usr
c.解压
tar -zxvf rocketmq-all-4.4.0-bin-release.zip
d.重命名
mv rocketmq-all-4.4.0-bin-release rocketmq
02.域名映射 vi /etc/hosts
192.168.2.128 bigdata01
192.168.2.128 mqnameserver1
192.168.2.128 mqmaster1
192.168.2.129 mqnameserver2
192.168.2.129 mqmaster1slaver1
03.创建存储路径
a.打开目录
cd /usr/rocketmq/
b.新建目录
mkdir mqstore
c.创建存储路径目录
mkdir mqstore/commitlog 日志
mkdir mqstore/consumequeue 消费队列
mkdir mqstore/index 索引
04.主配置文件 /usr/rocketmq/conf/2m-2s-async/broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
listenPort=10911
storePathRootDir=/usr/rocketmq/mqstore
storePathCommitLog=/usr/rocketmq/mqstore/commitlog
storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue
storePathIndex=/usr/rocketmq/mqstore/index
maxMessageSize=65536
05.从配置文件 /usr/rocketmq/conf/2m-2s-async/broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
storePathRootDir=/usr/rocketmq/mqstore
storePathCommitLog=/usr/rocketmq/mqstore/commitlog
storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue
storePathIndex=/usr/rocketmq/mqstore/index
maxMessageSize=65536
06.配置日志
a.新建日志目录
mkdir /usr/rocketmq/logs
b.打开conf文件进行配置日志
cd /usr/rocketmq/conf
c.统一设置user.home
sed -i 's#${user.home}#/usr/rocketmq#g' *.xml
07.降低启动参数
a.打开bin目录
cd /usr/rocketmq/bin
b.vi runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g
c.vi runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g
08.启动mater(192.168.2.128)
a.防火墙
systemctl stop firewalld
systemctl disable firewalld
b.后台启动Namesrv
nohup sh mqnamesrv &
c.后台启动BrokerServer 主配置文件
nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties &
d.查看进程
jps
e.关闭
./mqshutdown broker
./mqshutdown namesrv

2.2 slave从节点
01.将主节点上的rocketmq 远程复制到 从节点
a.在主节点上,打开目录
cd /usr
b.从主节点拷贝到从节点
scp -r rocketmq/ root@192.168.2.129:/usr/
02.启动slave(192.168.2.129)
a.防火墙
systemctl stop firewalld
systemctl disable firewalld
b.后台启动Namesrv
nohup sh mqnamesrv &
c.后台启动BrokerServer,从配置文件
nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties &
d.查看进程
jps
e.关闭
./mqshutdown broker
./mqshutdown namesrv

2.3 搭建控制台
01.在终端打包console
mvn clean package -Dmaven.test.skip=true
02.执行jar包
java -jar rocketmq-console-ng-1.0.1.jar

2.4 启动主从同步
01.启动mater(192.168.2.128)
a.防火墙
systemctl stop firewalld
systemctl disable firewalld
b.后台启动Namesrv
nohup sh mqnamesrv &
c.后台启动BrokerServer 主配置文件
nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties &
d.查看进程
jps
e.关闭
./mqshutdown broker
./mqshutdown namesrv
02.启动slave(192.168.2.129)
a.防火墙
systemctl stop firewalld
systemctl disable firewalld
b.后台启动Namesrv
nohup sh mqnamesrv &
c.后台启动BrokerServer,从配置文件
nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a-s.properties &
d.查看进程
jps
e.关闭
./mqshutdown broker
./mqshutdown namesrv

2.5 测试主从同步

3 汇总
01.案例一、二
a.生产者
a.消息延迟
b.消息类型:同步消息、异步消息、单向消息
b.消费者
a.Push消费者
b.标签订阅
C.消费者的消息模型:集群模式(默认)、广播模式
d.监听器:并发消费
e.消费类型:迭代消费、一次性消费
f.其他常见API
02.案例三
a.消息局部顺序消费问题
a.生产者(MyProducerOrder,算法保证有序)
b.消费者(MyConsumerOrder,MessageListenerOrderly保证有序)
b.批量发送消息
a.生产者(MyProducerBatch,将数据封装到List集合中,一次性发送多条数据)
03.事务
a.事务补偿机制
生产者再向MQServer提交事务时,有两次机会,女如果第一次失败,贝则进行回查,回查后还会进行第二次提交/回滚。
注意:延迟消息、批量消息不支持事务机制
b.在MQserver中消息有三种状态
①提交状态:可以被消费者消费
②回滚状态:不能被消费者消费
③未知状态(中间状态):需要借助于本地事务,进行一次回查,事务补偿机制
4 案例一
01.生产者
a.消息延迟
b.消息类型:同步消息、异步消息、单向消息
02.消费者
a.Push消费者
b.标签订阅
C.消费者的消息模型:集群模式(默认)、广播模式
d.监听器:并发消费
e.消费类型:迭代消费、一次性消费
f.其他常见API

5 案例二
01.消费者
a.Pull消费者(DefaultMQPullConsumer) + 偏移量
b.Pull消费者(MQPullConsumerScheduleService,时间调度:每隔3秒主动拉去一次数据) + 偏移量

6 案例三
01.消息局部顺序消费问题
a.生产者(MyProducerOrder,算法保证有序)
b.消费者(MyConsumerOrder,MessageListenerOrderly保证有序)
02.批量发送消息
a.生产者(MyProducerBatch,将数据封装到List集合中,一次性发送多条数据)

7 消息事务机制
