事务型消息
异步消息发送时机问题
目前扣减库存的事务ItemService.decreaseStock是封装在OrderService
.createOrder事务里面的。在扣减Redis 库存、发送异步消息之后,还有订
单入库、增加销量的操作。如果这些操作失败,那么createOrder 事务会回
滚,decreaseStock事务也回滚,但是Redis的扣减操作却不能回滚,会导
致数据不一致。
解决的方法就是在订单入库、增加销量成功之后再发送异步消息,ItemService
.decreaseStock只负责扣减Redis库存,不发送异步消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean decreaseStock(Integer itemId, Integer amount) { long affectedRow=redisTemplate.opsForValue(). increment("promo_item_stock_"+itemId,amount.intValue()*-1); if(affectedRow>=0){ return true; } else { increaseStock(itemId, amount) return false; } }
public boolean increaseStock(Integer itemId, Integer amount) { redisTemplate.opsForValue().increment("promo_item_stock_"+ itemId,amount.intValue()); return true; }
|
将发送异步消息的逻辑抽取出来
1 2 3 4
| public boolean asyncDecreaseStock(Integer itemId, Integer amount) { return mqProducer.asyncReduceStock(itemId, amount); }
|
再在OrderService.createOrder里面调用
1 2 3 4 5 6 7 8 9 10 11 12
| ···
orderDOMapper.insertSelective(orderDO);
itemService.increaseSales(itemId,amount);
boolean mqResult=itemService.asyncDecreaseStock(itemId,amount); if(!mqResult){ itemService.increaseStock(itemId,amount); throw new BizException(EmBizError.MQ_SEND_FAIL); }
|
这样就算订单入库失败、销量增加失败、消息发送失败,都能保证缓存和数
据库的一致性
事务提交问题
但是这么做,依然有问题。Spring 的@Transactional 标签,会在事务方法
返回后才提交,如果提交的过程中,发生了异常,则数据库回滚,但是Redis
库存已扣,还是无法保证一致性。我们需要在事务提交成功后再发送异步消息
解决方法
TransactionSynchronizationManager.registerSynchronization方法
,这个方法的传入一个TransactionSynchronizationAdapter的匿名类,
通过afterCommit方法,在事务提交成功后,执行发送消息操作
1 2 3 4 5 6 7 8 9 10
| TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { boolean mqResult=itemService.asyncDecreaseStock(itemId,amount); if(!mqResult){ itemService.increaseStock(itemId,amount); throw new BizException(EmBizError.MQ_SEND_FAIL); } }
|
事务型消息
上面的做法,依然不能保证万无一失。假设现在事务提交成功了,等着执行
afterCommit方法,这个时候突然宕机了,那么订单已然入库,销量已然
增加,但是去数据库扣减库存的这条消息却“丢失”了。这里就需要引入
RocketMQ的事务型消息。
所谓事务型消息,也会被发送到消息队列里面,这条消息处于prepared状
态,broker会接受到这条消息,但是不会把这条消息给消费者消费。该状
态的消息,会执行TransactionListener的executeLocalTransaction
方法,根据执行结果,改变事务型消息的状态,让消费端消费或是不消费
在mq.MqProducer类里面新注入一个TransactionMQProducer类,与
DefaultMQProducer类似,也需要设置服务器地址、命名空间等。
新建一个transactionAsyncReduceStock的方法,该方法使用事务型消
息进行异步扣减库存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public boolean transactionAsyncReduceStock(Integer userId, Integer itemId, Integer promoId, Integer amount, String stockLogId) { Map<String, Object> bodyMap = new HashMap<>(); bodyMap.put("itemId", itemId); bodyMap.put("amount", amount); Map<String, Object> argsMap = new HashMap<>(); argsMap.put("itemId", itemId); argsMap.put("amount", amount); argsMap.put("userId", userId); argsMap.put("promoId", promoId);
Message message = new Message(topicName, "increase", JSON.toJSON(bodyMap).toString().getBytes( Charset.forName("UTF-8"))); try { transactionMQProducer.sendMessageInTransaction(message, argsMap); } catch (MQClientException e) { e.printStackTrace(); return false; } return true; }
|
这样,就会发送一个事务型消息到broke,而处于prepared状态的事务型
消息,会执行TransactionListener的executeLocalTransaction方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object args) { Integer itemId = (Integer) ((Map) args).get("itemId"); Integer promoId = (Integer) ((Map) args).get("promoId"); Integer userId = (Integer) ((Map) args).get("userId"); Integer amount = (Integer) ((Map) args).get("amount"); try { orderService.createOrder(userId, itemId, promoId, amount); } catch (BizException e) { e.printStackTrace(); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; }
|
这样,在事务型消息中去执行下单操作,下单失败,则消息回滚,不会去数
据库扣减库存。下单成功,则消息被消费,扣减数据库库存
库存流水
不要以为这样就万事大吉了上述流程还有一个漏洞,就是当执行orderService
.createOrder后,突然又宕机了,根本没有返回,这个时候事务型消息就会进
入UNKNOWN状态,我们需要处理这个状态。
在匿名类TransactionListener里面,还需要覆写checkLocalTransaction方
法,这个方法就是用来处理UNKNOWN状态的。应该怎么处理?这就需要引入库存
流水。
数据库新建一张stock_log的表,用来记录库存流水
1 2 3 4 5
| create table stock_log( stock_log_id varchar(64) primary key, item_id int not null default 0, amount int not null default 0, status int not null default 0);
|
添加一个ItemService.initStockLog方法
1 2 3 4 5 6 7 8 9 10
| public String initStockLog(Integer itemId, Integer amount) { StockLogDO stockLogDO = new StockLogDO(); stockLogDO.setItemId(itemId); stockLogDO.setAmount(amount); stockLogDO.setStockLogId(UUID.randomUUID().toString().replace("-", "")); stockLogDO.setStatus(1); stockLogDOMapper.insertSelective(stockLogDO); return stockLogDO.getStockLogId(); }
|
事务型消息会调用OrderService.createOrder方法,执行Redis扣减库存
、订单入库、销量增加的操作,当这些操作都完成后,就说明下单完成了,
等着异步更新数据库了。那么需要修改订单流水的状态
1 2 3 4 5 6 7 8 9 10 11
|
orderDOMapper.insertSelective(orderDO);
itemService.increaseSales(itemId, amount); StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if (stockLogDO == null) throw new BizException(EmBizError.UNKNOWN_ERROR);
stockLogDO.setStatus(2); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
|
异步更新数据库,需要事务型消息从prepare状态变成commit状态。假如此时
orderService.createOrder本身发生了异常,那么就回滚事务型消息,并
且返回LocalTransactionState.ROLLBACK_MESSAGE,这个下单操作就会
被取消。
如果本身没有发生异常,那么就返回LocalTransactionState.COMMIT_MESSAGE
,此时事务型消息会从prepare状态变为commit状态,接着被消费端消费,异
步扣减库存。
1 2 3 4 5 6 7 8 9 10 11 12 13
| try { orderService.createOrder(userId, itemId, promoId, amount, stockLogId); } catch (BizException e) { e.printStackTrace(); StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); stockLogDO.setStatus(3); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE;
|
UNKNOWN状态处理
如上节结尾所述,如果在执行createOrder的时候,突然宕机了,此时事务
型消息的状态是UNKNOWN,需要在checkLocalTransaction方法中进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public LocalTransactionState checkLocalTransaction(MessageExt message) { String jsonString = new String(message.getBody()); Map<String, Object> map = JSON.parseObject(jsonString, Map.class); String stockLogId = (String) map.get("stockLogId"); StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if (stockLogDO == null) return LocalTransactionState.UNKNOW; if (stockLogDO.getStatus() == 2) { return LocalTransactionState.COMMIT_MESSAGE; } else if (stockLogDO.getStatus() == 1) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.ROLLBACK_MESSAGE; }
|
库存售罄处理
现在是用户请求一次OrderController.createOrder就初始化一次流水,
但是如果10000个用户抢10个商品,就会初始化10000次库存流水,这显然
是不行的。
解决的方法是在ItemService.decreaseStock中,如果库存没有了,就打
上“售罄标志”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public boolean decreaseStock(Integer itemId, Integer amount) { long affectedRow = redisTemplate.opsForValue(). increment("promo_item_stock_" + itemId, amount.intValue() * -1); if (affectedRow > 0) { return true; } else if (affectedRow == 0) { redisTemplate.opsForValue().set("promo_item_stock_invalid_" + itemId, "true"); return true; } else { increaseStock(itemId, amount); return false; } }
|
在OrderController.createOrder初始化流水之前,先判断一下是否售
罄,售罄了就直接抛出异常
1 2 3 4
| if (redisTemplate.hasKey("promo_item_stock_invalid_"+itemId)) throw new BizException(EmBizError.STOCK_NOT_ENOUGH); String stockLogId = itemService.initStockLog(itemId, amount);
|
流量削峰
秒杀秒杀,就是在活动开始的一瞬间,有大量流量涌入,优化不当,会导致
服务器停滞,甚至宕机。所以引入流量削峰技术十分有必要
业务解耦—秒杀令牌
之前的验证逻辑和下单逻辑都耦合在OrderService.createOrder里面,现
在利用秒杀令牌,使校验逻辑和下单逻辑分离。
PromoService新开一个generateSecondKillToken,将活动、商品、用户
信息校验逻辑封装在里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public String generateSecondKillToken(Integer promoId,Integer itemId, Integer userId) { if(redisTemplate.hasKey("promo_item_stock_invalid_"+itemId)) return null; PromoDO promoDO=promoDOMapper.selectByPrimaryKey(promoId); PromoModel promoModel=convertFromDataObj(promoDO); if(promoModel==null) return null; if(promoModel.getStartDate().isAfterNow()) { promoModel.setStatus(1); }else if(promoModel.getEndDate().isBeforeNow()){ promoModel.setStatus(3); }else{ promoModel.setStatus(2); } if(promoModel.getStatus()!=2) return null; ItemModel itemModel=itemService.getItemByIdInCache(itemId); if(itemModel==null) return null; UserModel userModel=userService.getUserByIdInCache(userId); if(userModel==null) return null; String token= UUID.randomUUID().toString().replace("-",""); redisTemplate.opsForValue().set("promo_token_"+promoId+"_userid_"+ userId+"_itemid_"+itemId,token); redisTemplate.expire("promo_token_"+promoId+"_userid_"+userId+ "_itemid_"+itemId, 5,TimeUnit.MINUTES); return token; }
|
OrderController新开一个generateToken接口,以便前端请求,返回令牌
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RequestMapping(value = "/generatetoken",···) @ResponseBody public CommonReturnType generateToken(···) throws BizException { ··· String promoToken = promoService.generateSecondKillToken(promoId, itemId, userModel.getId()); if (promoToken == null) throw new BizException(EmBizError.PARAMETER_VALIDATION_ERROR, "生成令牌失败"); return CommonReturnType.create(promoToken); }
|
前端在点击“下单”后,首先会请求generateToken接口,返回秒杀令牌。然后
将秒杀令牌promoToken作为参数,再去请求后端createOrder接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RequestMapping(value = "/createorder",···) @ResponseBody public CommonReturnType createOrder(··· @RequestParam(name = "promoToken", required = false) String promoToken) throws BizException { ··· if (promoId != null) { String inRedisPromoToken = (String) redisTemplate.opsForValue(). get("promo_token_" + promoId + "_userid_" + userModel.getId() + "_itemid_" + itemId); if (inRedisPromoToken == null) throw new BizException(EmBizError.PARAMETER_VALIDATION_ERROR, "令牌校验失败"); if (!StringUtils.equals(promoToken, inRedisPromoToken)) throw new BizException(EmBizError.PARAMETER_VALIDATION_ERROR, "令牌校验失败"); }
|
这样就彻底完成了校验逻辑和下单逻辑的分离。现在的问题是,假设有1E个
用户请求下单,那么就会生成1E的令牌,这是十分消耗性能的,所以接下
来会引入秒杀大闸进行限流
限流—令牌大闸
大闸的意思就是令牌的数量是有限的,当令牌用完时,就不再发放令牌了,
那么下单将无法进行。之前我们通过PromoService.publishPromo将库存
发布到了Redis上,现在我们将令牌总量也发布到Redis上,这里我们设定
令牌总量是库存的5倍
1 2 3 4 5 6 7 8 9
| public void publishPromo(Integer promoId) { ··· redisTemplate.opsForValue().set("promo_item_stock_" + itemModel.getId(), itemModel.getStock()); redisTemplate.opsForValue().set("promo_door_count_" + promoId, itemModel. getStock().intValue() * 5); }
|
接下来,在PromoService.generateSecondKillToken方法中,在生成令
牌之前,首先将Redis里的令牌总量减1,然后再判断是否剩余,如果<0,
直接返回null
1 2 3 4 5 6
| long result = redisTemplate.opsForValue(). increment("promo_door_count_" + promoId, -1); if (result < 0) return null;
|
限流-对列泄洪
队列泄洪,就是让多余的请求排队等待。排队有时候比多线程并发效率更
高,多线程毕竟有锁的竞争、上下文的切换,很消耗性能。而排队是无锁
的,单线程的,某些情况下效率更高。
比如Redis就是单线程模型,多个用户同时执行set操作,只能一一等待。
比如MySQL的insert和update语句,会维护一个行锁。阿里SQL就不会,
而是让多个SQL语句排队,然后依次执行。
像支付宝就使用了队列泄洪,双11的时候,支付宝作为网络科技公司,可
以承受很高的TPS,但是下游的各个银行,无法承受这么高的TPS。支付宝
维护了一个“拥塞窗口”,慢慢地向下游银行发送流量,保护下游。
那对于我们的项目,什么时候引入“队列泄洪”呢?在OrderController里
面,之前拿到秒杀令牌后,就要开始执行下单的业务了。现在,我们把下
单业务封装到一个固定大小的线程池中,一次只处理固定大小的请求。
在OrderController里面引入j.u.c.ExcutorService,创建一个init方
法,初始化线程池
1 2 3 4 5
| @PostConstruct public void init() { executorService = Executors.newFixedThreadPool(20); }
|
在拿到秒杀令牌后,使用线程池来处理下单请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Future<Object> future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { String stockLogId = itemService.initStockLog(itemId, amount); if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), itemId, promoId, amount, stockLogId)) { throw new BizException(EmBizError.UNKNOWN_ERROR, "下单失败"); } return null; } }); try { future.get(); } catch (InterruptedException e) { ··· }
|
防刷限流
验证码
之前的流程是,用户点击下单后,会直接拿到令牌然后执行下单流程。现在
用户点击下单后,前端会弹出一个“验证码”,用户输入之后,才能请求下
单接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RequestMapping(value = "/generateverifycode",···) @ResponseBody public void generateVerifyCode(HttpServletResponse response) throws BizException, IOException { ···验证 Map<String, Object> map = CodeUtil.generateCodeAndPic(); redisTemplate.opsForValue().set("verify_code_" + userModel.getId(), map.get("code")); redisTemplate.expire("verify_code_" + userModel.getId(), 10, TimeUnit.MINUTES); ImageIO.write((RenderedImage) map.get("codePic"), "jpeg", response.getOutputStream()); }
|
限并发
限制并发量意思就是同一时间只有一定数量的线程去处理请求,实现也
比较简单,维护一个全局计数器,当请求进入接口时,计数器-1,并且
判断计数器是否>0,大于0则处理请求,小于0则拒绝等待。
但是一般衡量并发性,是用TPS或者QPS,而该方案由于限制了线程数,
自然不能用TPS或者QPS衡量
限流方案—令牌桶/漏桶
客户端请求接口,必须先从令牌桶中获取令牌,令牌是由一个“定时器”
定期填充的。在一个时间内,令牌的数量是有限的。令牌桶的大小为100
,那么TPS就为100。
RateLimiter限流实现
google.guava.RateLimiter就是令牌桶算法的一个实现类,OrderController
引入这个类,在init方法里面,初始令牌数量为200
1 2 3 4 5 6 7
| @PostConstruct public void init() { executorService = Executors.newFixedThreadPool(20); orderCreateRateLimiter = RateLimiter.create(200); }
|
请求createOrder接口之前,会调用RateLimiter.tryAcquire方法,看
当前令牌是否足够,不够直接抛出异常
1 2
| if (!orderCreateRateLimiter.tryAcquire()) throw new BizException(EmBizError.RATELIMIT);
|
登录态管理
mysql性能
mysql分布式架构扩展
application.properties
1 2 3 4 5 6 7 8 9 10 11 12 13
| server.port=8081 server.tomcat.accept-count=1000 server.tomcat.max-threads=800 server.tomcat.min-spare-threads=100 server.tomcat.max-connections=10000 spring.datasource.url=jdbc:mysql://172.16.227.230:3306/soquick?serverTimezone=UTC server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.directory=/var/www/soquick/tomcat
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
spring.redis.host=172.16.227.230
|
deploy.sh
1
| nohup java -Xms400m -Xmx400m -XX:NewSize=200m -XX:MaxNewSize=200m -jar soquick.jar --spring.config.additional-location=/var/www/soquick/application.properties
|