高并发选课项目3

事务型消息

异步消息发送时机问题

目前扣减库存的事务ItemService.decreaseStock是封装在OrderService
.createOrder事务里面的。在扣减Redis 库存、发送异步消息之后,还有订
单入库、增加销量的操作。如果这些操作失败,那么createOrder 事务会回
滚,decreaseStock事务也回滚,但是Redis的扣减操作却不能回滚,会导
致数据不一致。
解决的方法就是在订单入库、增加销量成功之后再发送异步消息,ItemService
.decreaseStock只负责扣减Redis库存,不发送异步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean decreaseStock(Integer itemId, Integer amount) {
long affectedRow=redisTemplate.opsForValue().
increment("promo_item_stock_"+itemId,amount.intValue()*-1);
//>0,表示Redis扣减成功
if(affectedRow>=0){
//抽离了发送异步消息的逻辑
return true;
} else {
//Redis扣减失败,回滚
increaseStock(itemId, amount)
return false;
}
}

public boolean increaseStock(Integer itemId, Integer amount) {
redisTemplate.opsForValue().increment("promo_item_stock_"+
itemId,amount.intValue());
return true;
}

将发送异步消息的逻辑抽取出来

1
2
3
4
//ItemService
public boolean asyncDecreaseStock(Integer itemId, Integer amount) {
return mqProducer.asyncReduceStock(itemId, amount);
}

再在OrderService.createOrder里面调用

1
2
3
4
5
6
7
8
9
10
11
12
···
//订单入库
orderDOMapper.insertSelective(orderDO);
//销量增加
itemService.increaseSales(itemId,amount);
//执行完最后一步才发送异步消息
boolean mqResult=itemService.asyncDecreaseStock(itemId,amount);
if(!mqResult){
//回滚redis库存
itemService.increaseStock(itemId,amount);
throw new BizException(EmBizError.MQ_SEND_FAIL);
}

这样就算订单入库失败、销量增加失败、消息发送失败,都能保证缓存和数
据库的一致性

事务提交问题

但是这么做,依然有问题。Spring 的@Transactional 标签,会在事务方法
返回后才提交,如果提交的过程中,发生了异常,则数据库回滚,但是Redis
库存已扣,还是无法保证一致性。我们需要在事务提交成功后再发送异步消息

解决方法

TransactionSynchronizationManager.registerSynchronization方法
,这个方法的传入一个TransactionSynchronizationAdapter的匿名类,
通过afterCommit方法,在事务提交成功后,执行发送消息操作

1
2
3
4
5
6
7
8
9
10
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
boolean mqResult=itemService.asyncDecreaseStock(itemId,amount);
if(!mqResult){
itemService.increaseStock(itemId,amount);
throw new BizException(EmBizError.MQ_SEND_FAIL);
}
}

事务型消息

上面的做法,依然不能保证万无一失。假设现在事务提交成功了,等着执行
afterCommit方法,这个时候突然宕机了,那么订单已然入库,销量已然
增加,但是去数据库扣减库存的这条消息却“丢失”了。这里就需要引入
RocketMQ的事务型消息。
所谓事务型消息,也会被发送到消息队列里面,这条消息处于prepared状
态,broker会接受到这条消息,但是不会把这条消息给消费者消费。该状
态的消息,会执行TransactionListener的executeLocalTransaction
方法,根据执行结果,改变事务型消息的状态,让消费端消费或是不消费
在mq.MqProducer类里面新注入一个TransactionMQProducer类,与
DefaultMQProducer类似,也需要设置服务器地址、命名空间等。
新建一个transactionAsyncReduceStock的方法,该方法使用事务型消
息进行异步扣减库存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 事务型消息同步库存扣减消息
public boolean transactionAsyncReduceStock(Integer userId, Integer itemId, Integer promoId, Integer amount, String stockLogId) {
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("itemId", itemId);
bodyMap.put("amount", amount);
//用于执行orderService.createOrder的传参
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("itemId", itemId);
argsMap.put("amount", amount);
argsMap.put("userId", userId);
argsMap.put("promoId", promoId);

Message message = new Message(topicName, "increase",
JSON.toJSON(bodyMap).toString().getBytes(
Charset.forName("UTF-8")));
try {
//注意,发送的是sendMessageInTransaction
transactionMQProducer.sendMessageInTransaction(message, argsMap);
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
return true;
}

这样,就会发送一个事务型消息到broke,而处于prepared状态的事务型
消息,会执行TransactionListener的executeLocalTransaction方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
transactionMQProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message,
Object args) {
//在事务型消息中去进行下单
Integer itemId = (Integer) ((Map) args).get("itemId");
Integer promoId = (Integer) ((Map) args).get("promoId");
Integer userId = (Integer) ((Map) args).get("userId");
Integer amount = (Integer) ((Map) args).get("amount");
try {
//调用下单接口
orderService.createOrder(userId, itemId, promoId, amount);
} catch (BizException e) {
e.printStackTrace();
//发生异常就回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}

这样,在事务型消息中去执行下单操作,下单失败,则消息回滚,不会去数
据库扣减库存。下单成功,则消息被消费,扣减数据库库存

库存流水

不要以为这样就万事大吉了上述流程还有一个漏洞,就是当执行orderService
.createOrder后,突然又宕机了,根本没有返回,这个时候事务型消息就会进
入UNKNOWN状态,我们需要处理这个状态。
在匿名类TransactionListener里面,还需要覆写checkLocalTransaction方
法,这个方法就是用来处理UNKNOWN状态的。应该怎么处理?这就需要引入库存
流水。
数据库新建一张stock_log的表,用来记录库存流水

1
2
3
4
5
create table stock_log(
stock_log_id varchar(64) primary key,
item_id int not null default 0,
amount int not null default 0,
status int not null default 0);

添加一个ItemService.initStockLog方法

1
2
3
4
5
6
7
8
9
10
public String initStockLog(Integer itemId, Integer amount) {
StockLogDO stockLogDO = new StockLogDO();
stockLogDO.setItemId(itemId);
stockLogDO.setAmount(amount);
stockLogDO.setStockLogId(UUID.randomUUID().toString().replace("-", ""));
//1表示初始状态,2表示下单扣减库存成功,3表示下单回滚
stockLogDO.setStatus(1);
stockLogDOMapper.insertSelective(stockLogDO);
return stockLogDO.getStockLogId();
}

事务型消息会调用OrderService.createOrder方法,执行Redis扣减库存
、订单入库、销量增加的操作,当这些操作都完成后,就说明下单完成了,
等着异步更新数据库了。那么需要修改订单流水的状态

1
2
3
4
5
6
7
8
9
10
11
//OrderService.createOrder
//订单入库
orderDOMapper.insertSelective(orderDO);
//增加销量
itemService.increaseSales(itemId, amount);
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if (stockLogDO == null)
throw new BizException(EmBizError.UNKNOWN_ERROR);
//设置库存流水状态为成功
stockLogDO.setStatus(2);
stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);

异步更新数据库,需要事务型消息从prepare状态变成commit状态。假如此时
orderService.createOrder本身发生了异常,那么就回滚事务型消息,并
且返回LocalTransactionState.ROLLBACK_MESSAGE,这个下单操作就会
被取消。
如果本身没有发生异常,那么就返回LocalTransactionState.COMMIT_MESSAGE
,此时事务型消息会从prepare状态变为commit状态,接着被消费端消费,异
步扣减库存。

1
2
3
4
5
6
7
8
9
10
11
12
13
//MqProducer.TransactionListener().executeLocalTransaction()
try {
orderService.createOrder(userId, itemId, promoId, amount, stockLogId);
} catch (BizException e) {
e.printStackTrace();
//如果发生异常,createOrder已经回滚,此时要回滚事务型消息。
//设置stockLog为回滚状态
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
stockLogDO.setStatus(3);
stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;

UNKNOWN状态处理

如上节结尾所述,如果在执行createOrder的时候,突然宕机了,此时事务
型消息的状态是UNKNOWN,需要在checkLocalTransaction方法中进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public LocalTransactionState checkLocalTransaction(MessageExt message) {
//根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是UNKNOWN
String jsonString = new String(message.getBody());
Map<String, Object> map = JSON.parseObject(jsonString, Map.class);
String stockLogId = (String) map.get("stockLogId");
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if (stockLogDO == null)
return LocalTransactionState.UNKNOW;
//订单操作已经完成,等着异步扣减库存,那么就提交事务型消息
if (stockLogDO.getStatus() == 2) {
return LocalTransactionState.COMMIT_MESSAGE;
//订单操作还未完成,需要执行下单操作,那么就维持为prepare状态
} else if (stockLogDO.getStatus() == 1) {
return LocalTransactionState.UNKNOW;
}
//否则就回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}

库存售罄处理

现在是用户请求一次OrderController.createOrder就初始化一次流水,
但是如果10000个用户抢10个商品,就会初始化10000次库存流水,这显然
是不行的。
解决的方法是在ItemService.decreaseStock中,如果库存没有了,就打
上“售罄标志”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean decreaseStock(Integer itemId, Integer amount) {
long affectedRow = redisTemplate.opsForValue().
increment("promo_item_stock_" + itemId, amount.intValue() * -1);
if (affectedRow > 0) {
return true;
} else if (affectedRow == 0) {
//打上售罄标识
redisTemplate.opsForValue().set("promo_item_stock_invalid_" +
itemId, "true");
return true;
} else {
increaseStock(itemId, amount);
return false;
}
}

在OrderController.createOrder初始化流水之前,先判断一下是否售
罄,售罄了就直接抛出异常

1
2
3
4
//是否售罄
if (redisTemplate.hasKey("promo_item_stock_invalid_"+itemId))
throw new BizException(EmBizError.STOCK_NOT_ENOUGH);
String stockLogId = itemService.initStockLog(itemId, amount);

流量削峰

秒杀秒杀,就是在活动开始的一瞬间,有大量流量涌入,优化不当,会导致
服务器停滞,甚至宕机。所以引入流量削峰技术十分有必要

业务解耦—秒杀令牌

之前的验证逻辑和下单逻辑都耦合在OrderService.createOrder里面,现
在利用秒杀令牌,使校验逻辑和下单逻辑分离。
PromoService新开一个generateSecondKillToken,将活动、商品、用户
信息校验逻辑封装在里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public String generateSecondKillToken(Integer promoId,Integer itemId,
Integer userId) {
//判断库存是否售罄,若Key存在,则直接返回下单失败
if(redisTemplate.hasKey("promo_item_stock_invalid_"+itemId))
return null;
PromoDO promoDO=promoDOMapper.selectByPrimaryKey(promoId);
PromoModel promoModel=convertFromDataObj(promoDO);
if(promoModel==null) return null;
if(promoModel.getStartDate().isAfterNow()) {
promoModel.setStatus(1);
}else if(promoModel.getEndDate().isBeforeNow()){
promoModel.setStatus(3);
}else{
promoModel.setStatus(2);
}
//判断活动是否正在进行
if(promoModel.getStatus()!=2) return null;
//判断item信息是否存在
ItemModel itemModel=itemService.getItemByIdInCache(itemId);
if(itemModel==null) return null;
//判断用户是否存在
UserModel userModel=userService.getUserByIdInCache(userId);
if(userModel==null) return null;
//生成Token,并且存入redis内,5分钟时限
String token= UUID.randomUUID().toString().replace("-","");
redisTemplate.opsForValue().set("promo_token_"+promoId+"_userid_"+
userId+"_itemid_"+itemId,token);
redisTemplate.expire("promo_token_"+promoId+"_userid_"+userId+
"_itemid_"+itemId, 5,TimeUnit.MINUTES);
return token;
}

OrderController新开一个generateToken接口,以便前端请求,返回令牌

1
2
3
4
5
6
7
8
9
10
11
12
13
@RequestMapping(value = "/generatetoken",···)
@ResponseBody
public CommonReturnType generateToken(···) throws BizException {
//用户登录状态校验
···
//获取秒杀访问令牌
String promoToken = promoService.generateSecondKillToken(promoId,
itemId, userModel.getId());
if (promoToken == null)
throw new BizException(EmBizError.PARAMETER_VALIDATION_ERROR,
"生成令牌失败");
return CommonReturnType.create(promoToken);
}

前端在点击“下单”后,首先会请求generateToken接口,返回秒杀令牌。然后
将秒杀令牌promoToken作为参数,再去请求后端createOrder接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RequestMapping(value = "/createorder",···)
@ResponseBody
public CommonReturnType createOrder(··· @RequestParam(name =
"promoToken", required = false) String promoToken) throws BizException {
···
//校验秒杀令牌是否正确
if (promoId != null) {
String inRedisPromoToken = (String) redisTemplate.opsForValue().
get("promo_token_" + promoId + "_userid_" +
userModel.getId() + "_itemid_" + itemId);
if (inRedisPromoToken == null)
throw new BizException(EmBizError.PARAMETER_VALIDATION_ERROR,
"令牌校验失败");
if (!StringUtils.equals(promoToken, inRedisPromoToken))
throw new BizException(EmBizError.PARAMETER_VALIDATION_ERROR,
"令牌校验失败");
}

这样就彻底完成了校验逻辑和下单逻辑的分离。现在的问题是,假设有1E个
用户请求下单,那么就会生成1E的令牌,这是十分消耗性能的,所以接下
来会引入秒杀大闸进行限流

限流—令牌大闸

大闸的意思就是令牌的数量是有限的,当令牌用完时,就不再发放令牌了,
那么下单将无法进行。之前我们通过PromoService.publishPromo将库存
发布到了Redis上,现在我们将令牌总量也发布到Redis上,这里我们设定
令牌总量是库存的5倍

1
2
3
4
5
6
7
8
9
public void publishPromo(Integer promoId) {
···
//库存同步到Redis
redisTemplate.opsForValue().set("promo_item_stock_" + itemModel.getId(),
itemModel.getStock());
//大闸限制数量设置到redis内
redisTemplate.opsForValue().set("promo_door_count_" + promoId, itemModel.
getStock().intValue() * 5);
}

接下来,在PromoService.generateSecondKillToken方法中,在生成令
牌之前,首先将Redis里的令牌总量减1,然后再判断是否剩余,如果<0,
直接返回null

1
2
3
4
5
6
//获取大闸数量
long result = redisTemplate.opsForValue().
increment("promo_door_count_" + promoId, -1);
if (result < 0)
return null;
//令牌生成

限流-对列泄洪

队列泄洪,就是让多余的请求排队等待。排队有时候比多线程并发效率更
高,多线程毕竟有锁的竞争、上下文的切换,很消耗性能。而排队是无锁
的,单线程的,某些情况下效率更高。
比如Redis就是单线程模型,多个用户同时执行set操作,只能一一等待。
比如MySQL的insert和update语句,会维护一个行锁。阿里SQL就不会,
而是让多个SQL语句排队,然后依次执行。
像支付宝就使用了队列泄洪,双11的时候,支付宝作为网络科技公司,可
以承受很高的TPS,但是下游的各个银行,无法承受这么高的TPS。支付宝
维护了一个“拥塞窗口”,慢慢地向下游银行发送流量,保护下游。
那对于我们的项目,什么时候引入“队列泄洪”呢?在OrderController里
面,之前拿到秒杀令牌后,就要开始执行下单的业务了。现在,我们把下
单业务封装到一个固定大小的线程池中,一次只处理固定大小的请求。
在OrderController里面引入j.u.c.ExcutorService,创建一个init方
法,初始化线程池

1
2
3
4
5
@PostConstruct
public void init() {
//20个线程的线程池
executorService = Executors.newFixedThreadPool(20);
}

在拿到秒杀令牌后,使用线程池来处理下单请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
String stockLogId = itemService.initStockLog(itemId, amount);
if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), itemId, promoId, amount, stockLogId)) {
throw new BizException(EmBizError.UNKNOWN_ERROR, "下单失败");
}
return null;
}
});
try {
future.get();
} catch (InterruptedException e) {
···
}

防刷限流

验证码

之前的流程是,用户点击下单后,会直接拿到令牌然后执行下单流程。现在
用户点击下单后,前端会弹出一个“验证码”,用户输入之后,才能请求下
单接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RequestMapping(value = "/generateverifycode",···)
@ResponseBody
public void generateVerifyCode(HttpServletResponse response) throws
BizException, IOException {
···验证
//验证用户信息
Map<String, Object> map = CodeUtil.generateCodeAndPic();
//生成的验证码存到Redis里,并设置过期时间
redisTemplate.opsForValue().set("verify_code_" + userModel.getId(),
map.get("code"));
redisTemplate.expire("verify_code_" + userModel.getId(), 10,
TimeUnit.MINUTES);
//生成的图片,响应到前端页面
ImageIO.write((RenderedImage) map.get("codePic"), "jpeg", response.getOutputStream());
}

限并发

限制并发量意思就是同一时间只有一定数量的线程去处理请求,实现也
比较简单,维护一个全局计数器,当请求进入接口时,计数器-1,并且
判断计数器是否>0,大于0则处理请求,小于0则拒绝等待。
但是一般衡量并发性,是用TPS或者QPS,而该方案由于限制了线程数,
自然不能用TPS或者QPS衡量

限流方案—令牌桶/漏桶

客户端请求接口,必须先从令牌桶中获取令牌,令牌是由一个“定时器”
定期填充的。在一个时间内,令牌的数量是有限的。令牌桶的大小为100
,那么TPS就为100。

RateLimiter限流实现

google.guava.RateLimiter就是令牌桶算法的一个实现类,OrderController
引入这个类,在init方法里面,初始令牌数量为200

1
2
3
4
5
6
7
@PostConstruct
public void init() {
//20个线程的线程池
executorService = Executors.newFixedThreadPool(20);
//200个令牌,即200TPS
orderCreateRateLimiter = RateLimiter.create(200);
}

请求createOrder接口之前,会调用RateLimiter.tryAcquire方法,看
当前令牌是否足够,不够直接抛出异常

1
2
if (!orderCreateRateLimiter.tryAcquire())
throw new BizException(EmBizError.RATELIMIT);

登录态管理

mysql性能

mysql分布式架构扩展

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
server.port=8081
server.tomcat.accept-count=1000
server.tomcat.max-threads=800
server.tomcat.min-spare-threads=100
server.tomcat.max-connections=10000
spring.datasource.url=jdbc:mysql://172.16.227.230:3306/soquick?serverTimezone=UTC
server.tomcat.accesslog.enabled=true

server.tomcat.accesslog.directory=/var/www/soquick/tomcat

server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D

spring.redis.host=172.16.227.230

deploy.sh

1
nohup java -Xms400m -Xmx400m -XX:NewSize=200m -XX:MaxNewSize=200m -jar soquick.jar --spring.config.additional-location=/var/www/soquick/application.properties
Author: 高明
Link: https://skysea-gaoming.github.io/2021/05/11/%E9%AB%98%E5%B9%B6%E5%8F%91%E9%80%89%E8%AF%BE%E9%A1%B9%E7%9B%AE3/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.