本文记录了使用CompletableFuture实现并行计算用户可用优惠卷的过程,文中代码仅供参考。
参考核销优惠卷业务的产品原型,我们的可用优惠卷列表是根据优惠金额从高到低排列的。
设计查询用户可用优惠卷列表响应实体:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Schema(description = "查询用户优惠券明细响应参数")
public class QueryCouponsDetailRespDTO {
/**
* 优惠券id
*/
@Schema(description = "优惠券id")
private String id;
/**
* 优惠对象 0:商品专属 1:全店通用
*/
@Schema(description = "优惠对象 0:商品专属 1:全店通用")
private Integer target;
/**
* 优惠商品编码
*/
@Schema(description = "优惠商品编码")
private String goods;
/**
* 优惠类型 0:立减券 1:满减券 2:折扣券
*/
@Schema(description = "优惠类型 0:立减券 1:满减券 2:折扣券")
private Integer type;
/**
* 消耗规则
*/
@Schema(description = "消耗规则")
private String consumeRule;
/**
* 优惠券金额
*/
@Schema(description = "优惠券金额")
private BigDecimal couponAmount;
}
先从缓存中获取用户的优惠卷列表:
Set<String> rangeUserCoupons = stringRedisTemplate.opsForZSet().range(
String.format(USER_COUPON_TEMPLATE_LIST_KEY, UserContext.getUserId()), 0, -1);
根据列表构建查询优惠卷模板消息的键:
// 构建 Redis Key 列表
List<String> couponTemplateIds = rangeUserCoupons.stream()
.map(each -> StrUtil.split(each, "_").get(0))
.map(each -> redisDistributedProperties.getPrefix() + String.format(COUPON_TEMPLATE_KEY, each))
.toList();
// 同步获取 Redis 数据并进行解析、转换和分区
List<Object> rawCouponDataList = stringRedisTemplate.executePipelined((RedisCallback<Object>) connection -> {
couponTemplateIds.forEach(each -> connection.hashCommands().hGetAll(each.getBytes()));
return null;
});
利用 Redis 的 Pipeline 技术批量获取模板信息,减少网络请求:
List<Object> couponTemplateList = stringRedisTemplate.executePipelined((RedisCallback<String>) connection -> {
couponTemplateIds.forEach(each -> connection.hashCommands().hGetAll(each.getBytes()));
return null;
});
使用 CompletableFuture 实现并行计算
由于不是所有优惠卷都是针对特定商品的,我们接下来利用 CompletableFuture 执行并行计算。
使用 CF 时,同样推荐使用自定义线程池来对业务做控制,本业务中设计到 CPU 计算较多,所以我们选择将核心线程数与最大线程数均设置为 CPU 核心数:
private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
9999,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
解析 Redis 数据,并按goods
字段进行分区处理:
Map<Boolean, List<CouponTemplateQueryRespDTO>> partitioned = JSON.parseArray(JSON.toJSONString(rawCouponDataList), CouponTemplateQueryRespDTO.class)
.stream()
.collect(Collectors.partitioningBy(coupon -> StrUtil.isEmpty(coupon.getGoods())));
// 拆分后的两个列表
List<CouponTemplateQueryRespDTO> goodsEmptyList = partitioned.get(true); // goods 为空的列表
List<CouponTemplateQueryRespDTO> goodsNotEmptyList = partitioned.get(false); // goods 不为空的列表
// 针对当前订单可用/不可用的优惠券列表
List<QueryCouponsDetailRespDTO> availableCouponList = Collections.synchronizedList(new ArrayList<>());
List<QueryCouponsDetailRespDTO> notAvailableCouponList = Collections.synchronizedList(new ArrayList<>());
至于这里为什么要使用synchronizedList
,是因为当前场景存在对集合的并发写操作,需要将 ArrayList 包装为线程安全版本。源码参考如下:
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
CompletableFuture.allOf
能够将多个 CompletableFuture
合并为一个新的 CompletableFuture<Void>
,并且只有当所有传入的 CompletableFuture
全部完成时(无论是成功还是失败),这个新的 CompletableFuture
才会被标记为完成。
工作原理:
CompletableFuture.allOf
返回的CompletableFuture<Void>
本身不包含每个单独任务的执行结果,它仅表示所有任务的状态:要么全部完成,要么至少有一个异常。- 如果任意一个传入的
CompletableFuture
异常完成,则allOfFuture
也会异常完成。 - 如果所有的
CompletableFuture
成功完成,则allOfFuture
也会成功完成。
CompletableFuture.allOf
通常用于以下几种场景:
- 等待所有异步任务完成后再执行操作: 当你希望所有异步任务都完成后,再触发某个后续操作(如汇总结果、更新数据库状态)时,使用
CompletableFuture.allOf
是一种理想的选择。 - 批量执行任务: 比如在微服务架构中,我们需要向多个服务发送异步请求,并在所有请求完成后进行后续处理。
- 并行处理: 当对一批数据进行并行处理时(如计算、过滤、转化等),可以利用
CompletableFuture.allOf
合并所有的异步任务,等所有任务完成后再继续执行。
对于 CompletableFuture.allOf
返回的 CompletableFuture<Void>
,通常会调用以下方法来等待其完成并进行后续操作:
**allOfFuture.join()**
** 或**allOfFuture.get()**
:** 阻塞等待所有任务完成。
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3);
allOfFuture.join(); // 阻塞等待所有任务完成
System.out.println("所有任务已完成");
**allOfFuture.thenRun(...)**
** 或**allOfFuture.thenAccept(...)**
:** 指定当所有任务完成后,要执行的回调操作。
CompletableFuture.allOf(future1, future2, future3)
.thenRun(() -> System.out.println("所有任务已完成")); // 当所有任务完成时执行
- **组合多个 *
**CompletableFuture**
* 的结果:** 使用CompletableFuture.allOf
时,返回的CompletableFuture<Void>
本身不包含各个任务的结果,所以如果要组合结果,可以使用以下方式:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
// 获取所有子任务的结果
allOfFuture.thenRun(() -> {
try {
String result1 = future1.get();
String result2 = future2.get();
System.out.println("所有任务结果:" + result1 + ", " + result2);
} catch (Exception e) {
e.printStackTrace();
}
}).join(); // 确保执行完成
像我们上面的场景,就是使用了方案三,使用 allOf
进行聚合,执行完所有任务后执行 thenRun
对可用优惠券进行排序,最终等待 thenRun
执行完成。
.thenRun(...)
本质上是一个非阻塞的操作,它会在前置 CompletableFuture
完成时立即提交回调任务,而不会等待它的执行完成。也就是说,.thenRun(...)
只安排了这个回调任务的执行,但它本身不会阻塞当前线程去等待回调任务的完成。因此,通常情况下如果希望确保 .thenRun(...)
中的逻辑执行完成,需要通过 join()
或 get()
来等待整个操作链的完成。
thenRun
的常见用途:
thenRun
主要用于 在前置任务完成后执行一个不依赖结果的任务,如简单地输出日志或做一些状态更新。- 由于
thenRun
不依赖前面的CompletableFuture
的执行结果,因此一般用于那些不需要获取结果的场景。如果需要处理前置任务的结果,可以使用thenApply
或thenCompose
。
了解完原理后,我们来实践一下。首先,处理 goods 为空的列表<font style="color:rgb(55, 65, 81);">goodsEmptyList</font>
:
CompletableFuture<Void> emptyGoodsTasks = CompletableFuture.allOf(
goodsEmptyList.stream()
.map(each -> CompletableFuture.runAsync(() -> {
QueryCouponsDetailRespDTO resultCouponDetail = BeanUtil.toBean(each, QueryCouponsDetailRespDTO.class);
JSONObject jsonObject = JSON.parseObject(each.getConsumeRule());
handleCouponLogic(resultCouponDetail, jsonObject, requestParam.getOrderAmount(), availableCouponList, notAvailableCouponList);
}, executorService))
.toArray(CompletableFuture[]::new)
);
先取出集合中的元素,以 stream 的 map 方法来处理每个集合中的元素,map 方法允许对元素做函数运算,再收集结果成为新的流对象,函数中,使用handleCouponLogic
方法来判断优惠卷归属集合。
同理,针对 goods 不为空的对象集合:
CompletableFuture<Void> notEmptyGoodsTasks = CompletableFuture.allOf(
goodsNotEmptyList.stream()
.map(each -> CompletableFuture.runAsync(() -> {
QueryCouponsDetailRespDTO resultCouponDetail = BeanUtil.toBean(each, QueryCouponsDetailRespDTO.class);
QueryCouponGoodsReqDTO couponGoods = goodsRequestMap.get(each.getGoods());
if (couponGoods == null) {
notAvailableCouponList.add(resultCouponDetail);
} else {
JSONObject jsonObject = JSON.parseObject(each.getConsumeRule());
handleCouponLogic(resultCouponDetail, jsonObject, couponGoods.getGoodsAmount(), availableCouponList, notAvailableCouponList);
}
}, executorService))
.toArray(CompletableFuture[]::new)
);
先检查用户结算商品是否存在于之前从缓存中查出来的商品不为空的列表中,如果存在,则解析消费规则并调用 handleCouponLogic
方法处理优惠券逻辑,否则添加至不可用优惠卷列表。
收集结果:
CompletableFuture.allOf(emptyGoodsTasks, notEmptyGoodsTasks)
.thenRun(() -> {
// 与业内标准一致,按最终优惠力度从大到小排序
availableCouponList.sort((c1, c2) -> c2.getCouponAmount().compareTo(c1.getCouponAmount()));
})
.join();
之前存储优惠卷消耗规则是这样存储的:
JSONObject consumeRule = new JSONObject();
consumeRule.put("termsOfUse", new BigDecimal("10")); // 使用条件 满 x 元可用
consumeRule.put("maximumDiscountAmount", new BigDecimal("3")); // 最大优惠金额
consumeRule.put("explanationOfUnmetC 3onditions", "xxx"); // 不满足使用条件说明
consumeRule.put("validityPeriod", 48); // 自领取优惠券后有效时间,单位小时
处理优惠卷优惠逻辑:
private void handleCouponLogic(QueryCouponsDetailRespDTO resultCouponDetail, JSONObject jsonObject, BigDecimal amount,
List<QueryCouponsDetailRespDTO> availableCouponList, List<QueryCouponsDetailRespDTO> notAvailableCouponList) {
BigDecimal termsOfUse = jsonObject.getBigDecimal("termsOfUse");
BigDecimal maximumDiscountAmount = jsonObject.getBigDecimal("maximumDiscountAmount");
switch (resultCouponDetail.getType()) {
case 0: // 立减券
resultCouponDetail.setCouponAmount(maximumDiscountAmount);
availableCouponList.add(resultCouponDetail);
break;
case 1: // 满减券
if (amount.compareTo(termsOfUse) >= 0) {
resultCouponDetail.setCouponAmount(maximumDiscountAmount);
availableCouponList.add(resultCouponDetail);
} else {
notAvailableCouponList.add(resultCouponDetail);
}
break;
case 2: // 折扣券
if (amount.compareTo(termsOfUse) >= 0) {
BigDecimal discountRate = jsonObject.getBigDecimal("discountRate");
BigDecimal multiply = amount.multiply(discountRate);
if (multiply.compareTo(maximumDiscountAmount) >= 0) {
resultCouponDetail.setCouponAmount(maximumDiscountAmount);
} else {
resultCouponDetail.setCouponAmount(multiply);
}
availableCouponList.add(resultCouponDetail);
} else {
notAvailableCouponList.add(resultCouponDetail);
}
break;
default:
throw new ClientException("无效的优惠券类型");
}
}
CASE 0 立减券 | 设置折扣金额为maximumDiscountAmount 添加至可用列表 |
---|---|
CASE 1 满减卷 | + 设置折扣金额为maximumDiscountAmount 若结算前金额大于规定金额 添加至可用列表+ 否则添加至不可用列表 |
CASE 2 折扣卷 | + 先校验是否达到使用门槛termsOfUse ,若达到进入下面判断- 计算使用优惠卷的优惠部分金额 与最大优惠金额比较 若大于则设置折扣金额为 maximumDiscountAmount - 否则设置折扣金额为 优惠部分金额 + 未达门槛-添加至不可用列表 |
上述表格中的maximumDiscountAmount
与termsOfUse
即是下面这张优惠卷示例中的最大优惠金额和需要满足条件字段:
优惠券样例
- 优惠类型:折扣券
- 优惠对象:店铺券
- 折扣:0.6 折
- 最大优惠金额:40 元
- 需要满足条件:金额满足 300 元
构建返回结果:
return QueryCouponsRespDTO.builder()
.availableCouponList(availableCouponList)
.notAvailableCouponList(notAvailableCouponList)
.build();
测试
首先开启单测断言
JVM参数:-ea
测试数据:
优惠券1
- 优惠类型:立减券
- 优惠对象:店铺券
- 优惠金额:立减 10 元
优惠券2
- 优惠类型:立减券
- 优惠对象:商品券 001
- 优惠金额:立减 3 元
优惠券3
- 优惠类型:满减券
- 优惠对象:店铺券
- 优惠金额:立减 10 元
- 需要满足条件:金额满足 100 元
优惠券4
- 优惠类型:折扣券
- 优惠对象:店铺券
- 折扣:0.6 折
- 最大优惠金额:20 元
- 需要满足条件:金额满足 100 元
优惠券5
- 优惠类型:折扣券
- 优惠对象:店铺券
- 折扣:0.6 折
- 最大优惠金额:40 元
- 需要满足条件:金额满足 300 元
Comments NOTHING