01.UserMapper
@Mapper
public interface UserMapper {
@Select("select * from DB_USER where uid = #{uid}")
User getUserById(int uid);
@Select("select book_count from DB_USER where uid = #{uid}")
int getUserBookRemain(int uid);
@Update("update DB_USER set book_count = #{count} where uid = #{uid}")
int updateBookCount(int uid, int count);
}
02.UserServiceImpl
@Service
public class UserServiceImpl implements UserService {
@Resource
UserMapper mapper;
@Override
public User getUserById(int uid) {
return mapper.getUserById(uid);
}
@Override
public int getRemain(int uid) {
return mapper.getUserBookRemain(uid);
}
@Override
public boolean setRemain(int uid, int count) {
return mapper.updateBookCount(uid, count) > 0;
}
}
03.UserController
@RestController
public class UserController {
@Resource
UserService service;
@RequestMapping("/user/{uid}")
public User findUserById(@PathVariable("uid") int uid){
return service.getUserById(uid);
}
@RequestMapping("/user/remain/{uid}")
public int userRemain(@PathVariable("uid") int uid){
return service.getRemain(uid);
}
@RequestMapping("/user/borrow/{uid}")
public boolean userBorrow(@PathVariable("uid") int uid){
int remain = service.getRemain(uid);
return service.setRemain(uid, remain - 1);
}
}
8.3.2 图书服务
01.BookMapper
@Mapper
public interface BookMapper {
@Select("select * from DB_BOOK where bid = #{bid}")
Book getBookById(int bid);
@Select("select count from DB_BOOK where bid = #{bid}")
int getRemain(int bid);
@Update("update DB_BOOK set count = #{count} where bid = #{bid}")
int setRemain(int bid, int count);
}
02.BookServiceImpl
@Service
public class BookServiceImpl implements BookService {
@Resource
BookMapper mapper;
@Override
public Book getBookById(int bid) {
return mapper.getBookById(bid);
}
@Override
public boolean setRemain(int bid, int count) {
return mapper.setRemain(bid, count) > 0;
}
@Override
public int getRemain(int bid) {
return mapper.getRemain(bid);
}
}
03.BookController
@RestController
public class BookController {
@Resource
BookService service;
@RequestMapping("/book/{bid}")
Book findBookById(@PathVariable("bid") int bid){
return service.getBookById(bid);
}
@RequestMapping("/book/remain/{bid}")
public int bookRemain(@PathVariable("bid") int uid){
return service.getRemain(uid);
}
@RequestMapping("/book/borrow/{bid}")
public boolean bookBorrow(@PathVariable("bid") int uid){
int remain = service.getRemain(uid);
return service.setRemain(uid, remain - 1);
}
}
8.3.3 借阅服务
01.UserClient
@FeignClient(value = "userservice")
public interface UserClient {
@RequestMapping("/user/{uid}")
User getUserById(@PathVariable("uid") int uid);
@RequestMapping("/user/borrow/{uid}")
boolean userBorrow(@PathVariable("uid") int uid);
@RequestMapping("/user/remain/{uid}")
int userRemain(@PathVariable("uid") int uid);
}
02.BookClient
@FeignClient("bookservice")
public interface BookClient {
@RequestMapping("/book/{bid}")
Book getBookById(@PathVariable("bid") int bid);
@RequestMapping("/book/borrow/{bid}")
boolean bookBorrow(@PathVariable("bid") int bid);
@RequestMapping("/book/remain/{bid}")
int bookRemain(@PathVariable("bid") int bid);
}
04.BorrowController
@RestController
public class BorrowController {
@Resource
BorrowService service;
@RequestMapping("/borrow/{uid}")
UserBorrowDetail findUserBorrows(@PathVariable("uid") int uid){
return service.getUserBorrowDetailByUid(uid);
}
@RequestMapping("/borrow/take/{uid}/{bid}")
JSONObject borrow(@PathVariable("uid") int uid,
@PathVariable("bid") int bid){
service.doBorrow(uid, bid);
JSONObject object = new JSONObject();
object.put("code", "200");
object.put("success", false);
object.put("message", "借阅成功!");
return object;
}
}
05.BorrowServiceImpl
@Service
public class BorrowServiceImpl implements BorrowService{
@Resource
BorrowMapper mapper;
@Resource
UserClient userClient;
@Resource
BookClient bookClient;
@Override
public UserBorrowDetail getUserBorrowDetailByUid(int uid) {
List<Borrow> borrow = mapper.getBorrowsByUid(uid);
User user = userClient.getUserById(uid);
List<Book> bookList = borrow
.stream()
.map(b -> bookClient.getBookById(b.getBid()))
.collect(Collectors.toList());
return new UserBorrowDetail(user, bookList);
}
@Override
public boolean doBorrow(int uid, int bid) {
//1. 判断图书和用户是否都支持借阅
if(bookClient.bookRemain(bid) < 1)
throw new RuntimeException("图书数量不足");
if(userClient.userRemain(uid) < 1)
throw new RuntimeException("用户借阅量不足");
//2. 首先将图书的数量-1
if(!bookClient.bookBorrow(bid))
throw new RuntimeException("在借阅图书时出现错误!");
//3. 添加借阅信息
if(mapper.getBorrow(uid, bid) != null)
throw new RuntimeException("此书籍已经被此用户借阅了!");
if(mapper.addBorrow(uid, bid) <= 0)
throw new RuntimeException("在录入借阅信息时出现错误!");
//4. 用户可借阅-1
if(!userClient.userBorrow(uid))
throw new RuntimeException("在借阅时出现错误!");
//完成
return true;
}
}
01.将我们的各个服务作为Seate的客户端,只需要导入依赖即可
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
02.添加配置
seata:
service:
vgroup-mapping:
# 这里需要对事务组做映射,默认的分组名为 应用名称-seata-service-group,将其映射到default集群
# 这个很关键,一定要配置对,不然会找不到服务
book-service-seata-service-group: default
grouplist:
default: localhost:8091
seata:
service:
vgroup-mapping:
# 这里需要对事务组做映射,默认的分组名为 应用名称-seata-service-group,将其映射到default集群
# 这个很关键,一定要配置对,不然会找不到服务
user-service-seata-service-group: default
grouplist:
default: localhost:8091
seata:
service:
vgroup-mapping:
# 这里需要对事务组做映射,默认的分组名为 应用名称-seata-service-group,将其映射到default集群
# 这个很关键,一定要配置对,不然会找不到服务
borrow-service-seata-service-group: default
grouplist:
default: localhost:8091
03.配置开启分布式事务,首先在启动类添加注解,此注解会添加一个后置处理器将数据源封装为支持分布式事务的代理数据源(虽然官方表示配置文件中已经默认开启了自动代理,但是UP主实测1.4.2版本下只能打注解的方式才能生效)
@EnableAutoDataSourceProxy
@SpringBootApplication
public class BookApplication {
public static void main(String[] args) {
SpringApplication.run(BookApplication.class, args);
}
}
@SpringBootApplication
@EnableAutoDataSourceProxy
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
}
@EnableFeignClients
@SpringBootApplication
@EnableAutoDataSourceProxy
public class BorrowApplication {
public static void main(String[] args) {
SpringApplication.run(BorrowApplication.class, args);
}
}
04.在开启分布式事务的方法上添加`@GlobalTransactional`注解
@GlobalTransactional
@Override
public boolean doBorrow(int uid, int bid) {
//这里打印一下XID看看,其他的服务业添加这样一个打印,如果一会都打印的是同一个XID,表示使用的就是同一个事务
System.out.println(RootContext.getXID());
if(bookClient.bookRemain(bid) < 1)
throw new RuntimeException("图书数量不足");
if(userClient.userRemain(uid) < 1)
throw new RuntimeException("用户借阅量不足");
if(!bookClient.bookBorrow(bid))
throw new RuntimeException("在借阅图书时出现错误!");
if(mapper.getBorrow(uid, bid) != null)
throw new RuntimeException("此书籍已经被此用户借阅了!");
if(mapper.addBorrow(uid, bid) <= 0)
throw new RuntimeException("在录入借阅信息时出现错误!");
if(!userClient.userBorrow(uid))
throw new RuntimeException("在借阅时出现错误!");
return true;
}
05.Seata会分析修改数据的sql,同时生成对应的反向回滚SQL,这个回滚记录会存放在undo_log 表中。所以要求每一个Client 都有一个对应的undo_log表(也就是说每个服务连接的数据库都需要创建这样一个表,这里由于我们三个服务都用的同一个数据库,所以说就只用在这个数据库中创建undo_log表即可),表SQL定义如下:
CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
06.说明
创建完成之后,我们现在就可以启动三个服务了,我们来测试一下当出现异常的时候是不是会正常回滚
首先第一次肯定是正常完成借阅操作的,接着我们再次进行请求,肯定会出现异常
如果能在栈追踪信息中看到seata相关的包,那么说明分布式事务已经开始工作了,通过日志我们可以看到,出现了回滚操作
并且数据库中确实是回滚了扣除操作
8.5 使用nacos模式部署
8.5.1 服务端
01.registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "localhost:8848"
group = "SEATA_GROUP"
namespace = "61fbe599-6501-4faa-a139-993bc5bfa83a"
cluster = "default"
# Nacos的用户名和密码
username = "nacos"
password = "nacos"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "61fbe599-6501-4faa-a139-993bc5bfa83a"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
02.命令行
C:\Users\mysla\Desktop\TODO5\seata-1.4.2\script\config-center\nacos>sh nacos-config.sh -h 127.0.0.1 -p 8848 -t 61fbe599-6501-4faa-a139-993bc5bfa83a -g SEATA_GROUP
set nacosAddr=127.0.0.1:8848
set group=SEATA_GROUP
Set transport.type=TCP successfully
Set transport.server=NIO successfully
Set transport.heartbeat=true successfully
Set transport.enableClientBatchSendRequest=false successfully
Set transport.threadFactory.bossThreadPrefix=NettyBoss successfully
Set transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker successfully
Set transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler successfully
Set transport.threadFactory.shareBossWorker=false successfully
Set transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector successfully
Set transport.threadFactory.clientSelectorThreadSize=1 successfully
Set transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread successfully
Set transport.threadFactory.bossThreadSize=1 successfully
Set transport.threadFactory.workerThreadSize=default successfully
Set transport.shutdown.wait=3 successfully
Set service.vgroupMapping.my_test_tx_group=default successfully
Set service.default.grouplist=127.0.0.1:8091 successfully
Set service.enableDegrade=false successfully
Set service.disableGlobalTransaction=false successfully
Set client.rm.asyncCommitBufferLimit=10000 successfully
Set client.rm.lock.retryInterval=10 successfully
Set client.rm.lock.retryTimes=30 successfully
Set client.rm.lock.retryPolicyBranchRollbackOnConflict=true successfully
Set client.rm.reportRetryCount=5 successfully
Set client.rm.tableMetaCheckEnable=false successfully
Set client.rm.tableMetaCheckerInterval=60000 successfully
Set client.rm.sqlParserType=druid successfully
Set client.rm.reportSuccessEnable=false successfully
Set client.rm.sagaBranchRegisterEnable=false successfully
Set client.tm.commitRetryCount=5 successfully
Set client.tm.rollbackRetryCount=5 successfully
Set client.tm.defaultGlobalTransactionTimeout=60000 successfully
Set client.tm.degradeCheck=false successfully
Set client.tm.degradeCheckAllowTimes=10 successfully
Set client.tm.degradeCheckPeriod=2000 successfully
Set store.mode=db successfully
Set store.publicKey= failure
Set store.file.dir=file_store/data successfully
Set store.file.maxBranchSessionSize=16384 successfully
Set store.file.maxGlobalSessionSize=512 successfully
Set store.file.fileWriteBufferCacheSize=16384 successfully
Set store.file.flushDiskMode=async successfully
Set store.file.sessionReloadReadSize=100 successfully
Set store.db.datasource=druid successfully
Set store.db.dbType=mysql successfully
Set store.db.driverClassName=com.mysql.jdbc.Driver successfully
Set store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true successfully
Set store.db.user=root successfully
Set store.db.password=123456 successfully
Set store.db.minConn=5 successfully
Set store.db.maxConn=30 successfully
Set store.db.globalTable=global_table successfully
Set store.db.branchTable=branch_table successfully
Set store.db.queryLimit=100 successfully
Set store.db.lockTable=lock_table successfully
Set store.db.maxWait=5000 successfully
Set store.redis.mode=single successfully
Set store.redis.single.host=127.0.0.1 successfully
Set store.redis.single.port=6379 successfully
Set store.redis.sentinel.masterName= failure
Set store.redis.sentinel.sentinelHosts= failure
Set store.redis.maxConn=10 successfully
Set store.redis.minConn=1 successfully
Set store.redis.maxTotal=100 successfully
Set store.redis.database=0 successfully
Set store.redis.password= failure
Set store.redis.queryLimit=100 successfully
Set server.recovery.committingRetryPeriod=1000 successfully
Set server.recovery.asynCommittingRetryPeriod=1000 successfully
Set server.recovery.rollbackingRetryPeriod=1000 successfully
Set server.recovery.timeoutRetryPeriod=1000 successfully
Set server.maxCommitRetryTimeout=-1 successfully
Set server.maxRollbackRetryTimeout=-1 successfully
Set server.rollbackRetryTimeoutUnlockEnable=false successfully
Set client.undo.dataValidation=true successfully
Set client.undo.logSerialization=jackson successfully
Set client.undo.onlyCareUpdateColumns=true successfully
Set server.undo.logSaveDays=7 successfully
Set server.undo.logDeletePeriod=86400000 successfully
Set client.undo.logTable=undo_log successfully
Set client.undo.compress.enable=true successfully
Set client.undo.compress.type=zip successfully
Set client.undo.compress.threshold=64k successfully
Set log.exceptionRate=100 successfully
Set transport.serialization=seata successfully
Set transport.compressor=none successfully
Set metrics.enabled=false successfully
Set metrics.registryType=compact successfully
Set metrics.exporterList=prometheus successfully
Set metrics.exporterPrometheusPort=9898 successfully
=========================================================================
Complete initialization parameters, total-count:89 , failure-count:4
=========================================================================
init nacos config fail.
03.将对应的事务组映射配置也添加上,Dataldi格式为【service:vgroupMapping.事务组名称】
service:vgroupMapping.book-service-seata-service-group : SEATA_GROUP : default
service:vgroupMapping.user-service-seata-service-group : SEATA_GROUP : default
service:vgroupMapping.borrow-service-seata-service-group : SEATA_GROUP : default
8.5.2 客户端
01.将我们的各个服务作为Seate的客户端,只需要导入依赖即可:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
02.配置
seata:
# 注册
registry:
# 使用Nacos
type: nacos
nacos:
# 使用Seata的命名空间,这样才能正确找到Seata服务,由于组使用的是SEATA_GROUP,配置默认值就是,就不用配了
namespace: 61fbe599-6501-4faa-a139-993bc5bfa83a
username: nacos
password: nacos
# 配置
config:
type: nacos
nacos:
namespace: 61fbe599-6501-4faa-a139-993bc5bfa83a
username: nacos
password: nacos
03.配置开启分布式事务,首先在启动类添加注解,此注解会添加一个后置处理器将数据源封装为支持分布式事务的代理数据源(虽然官方表示配置文件中已经默认开启了自动代理,但是UP主实测1.4.2版本下只能打注解的方式才能生效)
@EnableAutoDataSourceProxy
@SpringBootApplication
public class BookApplication {
public static void main(String[] args) {
SpringApplication.run(BookApplication.class, args);
}
}
04.在开启分布式事务的方法上添加`@GlobalTransactional`注解
@GlobalTransactional
@Override
public boolean doBorrow(int uid, int bid) {
//这里打印一下XID看看,其他的服务业添加这样一个打印,如果一会都打印的是同一个XID,表示使用的就是同一个事务
System.out.println(RootContext.getXID());
if(bookClient.bookRemain(bid) < 1)
throw new RuntimeException("图书数量不足");
if(userClient.userRemain(uid) < 1)
throw new RuntimeException("用户借阅量不足");
if(!bookClient.bookBorrow(bid))
throw new RuntimeException("在借阅图书时出现错误!");
if(mapper.getBorrow(uid, bid) != null)
throw new RuntimeException("此书籍已经被此用户借阅了!");
if(mapper.addBorrow(uid, bid) <= 0)
throw new RuntimeException("在录入借阅信息时出现错误!");
if(!userClient.userBorrow(uid))
throw new RuntimeException("在借阅时出现错误!");
return true;
}
8.5.3 数据库
01.配置
store.mode : db
store.session.mode : db
02.表说明
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('HandleAllSession', ' ', 0);