拼团交易系统
部署
本地环境:Maven3.8.4
SpringBoot: 2.7.12
jdk:1.8
目录结构:

docker-compose:
version: '3.8'
services:
# 1. 前端
group-buy-market-front:
image: nginx:alpine
container_name: group-buy-market-front
restart: unless-stopped
ports:
- '18091:80'
volumes:
- ./nginx/html:/usr/share/nginx/html
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf:ro
privileged: true
networks:
- group-buy-network
# 4. Java 后端
group-buying-sys:
build:
context: ../../.. # 从 docs/tag/v2.0 回到项目根
dockerfile: group-buying-sys-app/Dockerfile
image: smile/group-buying-sys:latest
container_name: group-buying-sys
restart: unless-stopped
depends_on:
mysql:
condition: service_healthy
redis:
condition: service_healthy
ports:
- '8091:8091'
environment:
- TZ=Asia/Shanghai
- SPRING_PROFILES_ACTIVE=prod
volumes:
- ./log:/data/log
logging:
driver: json-file
options:
max-size: '10m'
max-file: '3'
networks:
- group-buy-network
mysql:
image: mysql:8.0
container_name: group-buy-mysql
hostname: mysql
command: --default-authentication-plugin=mysql_native_password
restart: unless-stopped
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: 123456
ports:
- "13306:3306"
volumes:
- ./mysql/my.cnf:/etc/mysql/conf.d/mysql.cnf:ro
- ./mysql/sql:/docker-entrypoint-initdb.d
healthcheck:
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
interval: 5s
timeout: 10s
retries: 10
start_period: 15s
networks:
- group-buy-network
# Redis
redis:
image: redis:6.2
restart: unless-stopped
container_name: group-buy-redis
hostname: redis
privileged: true
ports:
- 16379:6379
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
command: redis-server /usr/local/etc/redis/redis.conf
networks:
- group-buy-network
healthcheck:
test: [ "CMD", "redis-cli", "ping" ]
interval: 10s
timeout: 5s
retries: 3
# rabbitmq
# 账密 admin/admin
# rabbitmq-plugins enable rabbitmq_management
rabbitmq:
image: rabbitmq:3.8-management
container_name: group-buy-rabbitmq
hostname: rabbitmq
restart: unless-stopped
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
command: rabbitmq-server
volumes:
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
- ./rabbitmq/mq-data:/var/lib/rabbitmq
networks:
- group-buy-network
nacos:
image: nacos/nacos-server:v2.1.0
container_name: group-buy-nacos-server
hostname: nacos
restart: unless-stopped
env_file:
- ./nacos/custom.env
ports:
- "8848:8848"
- "9848:9848"
- "9849:9849"
depends_on:
- mysql
networks:
- group-buy-network
volumes:
- ./nacos/init.d:/docker-entrypoint-init.d
networks:
group-buy-network:
external: true
dockerfile:
# —— 第一阶段:Maven 构建 ——
FROM maven:3.8.7-eclipse-temurin-17-alpine AS builder
WORKDIR /workspace
# 把项目级 settings.xml 复制到容器里
COPY .mvn/settings.xml /root/.m2/settings.xml
# 1. 先只拷贝父 POM 及各模块的 pom.xml,加速依赖下载
COPY pom.xml ./pom.xml
COPY group-buying-sys-api/pom.xml ./group-buying-sys-api/pom.xml
COPY group-buying-sys-domain/pom.xml ./group-buying-sys-domain/pom.xml
COPY group-buying-sys-infrastructure/pom.xml ./group-buying-sys-infrastructure/pom.xml
COPY group-buying-sys-trigger/pom.xml ./group-buying-sys-trigger/pom.xml
COPY group-buying-sys-types/pom.xml ./group-buying-sys-types/pom.xml
COPY group-buying-sys-app/pom.xml ./group-buying-sys-app/pom.xml
# 离线下载所有依赖
RUN mvn dependency:go-offline -B
# 2. 拷贝所有源码
COPY . .
# 3. 只打包 main 应用模块(连带编译它依赖的模块),跳过测试,加速构建
RUN mvn \
-f pom.xml clean package \
-pl group-buying-sys-app -am \
-DskipTests -B
# —— 第二阶段:运行时镜像 ——
FROM openjdk:17-jdk-slim
LABEL maintainer="smile"
# 可选:设置时区
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# 把构建产物拷过来
COPY --from=builder \
/workspace/group-buying-sys-app/target/group-buying-sys-app.jar \
app.jar
# 暴露端口,按需改
EXPOSE 8091
ENTRYPOINT ["java", "-jar", "app.jar"]
修改项目后部署的影响:
前端服务 (group-buy-market-front
)
-
代码位置:通过卷挂载 (
./nginx/html:/usr/share/nginx/html
)。 -
修改影响
- 如果修改的是
./nginx/html
下的前端代码(如 HTML/JS/CSS),无需重建,Nginx 会直接读取更新后的文件。 - 如果修改的是 Nginx 配置 (
./nginx/conf/nginx.conf
),需重启容器生效:
docker compose restart group-buy-market-front
- 如果修改的是
Java 后端服务 (group-buying-sys
)
-
代码位置:通过镜像构建(
build
指定了 Dockerfile 路径)。 -
修改影响
- 如果修改了 Java 代码或依赖(如
pom.xml
),必须重建镜像:
docker compose up -d --build group-buying-sys
- 如果修改了 Java 代码或依赖(如
其他服务(MySQL/Redis/RabbitMQ/Nacos)
-
代码位置:均使用官方镜像,无业务代码。
-
修改影响
-
修改配置文件(如 ./redis/redis.conf)需重启容器:
docker compose restart redis
-
无需
--build
(除非你自定义了它们的镜像)。
-
压测
.服务器资源:2核心4GB
验证锁单接口:防超卖
Jmeter测试,一秒发1000次请求下:
可以发现,只有一开始的少部分的并发请求进入抢占库存,抢占失败会返回'拼团组队失败,缓存库存不足',后面'交易锁单拦截-xx'都是在第一层就被拦下了,即下单前的人数/库存校验。
如果仅要求“防超卖”,已经可以确保在资源有限时也不超卖。
但是在2核 4GB,服务只能稳定支撑 ≈240 QPS,平均响应 2 秒
测试查询拼团配置接口:
≈320 QPS
系统备忘录
本系统涉及微信和支付宝的回调。
1.微信扫码登录,*https://mp.weixin.qq.com/debug/cgi-bin/sandboxinfo?action=showinfo&t=sandbox/index*平台上配置了扫描通知地址,如果是本地测试,需要打开frp内网穿透,然后填的地址是frp建立通道的服务器端的ip:端口
2.支付宝,用户付款成功回调,也是同理,本地测试就要开frp。注意frp中的通道,默认是本地端口=远程端口,但是如果在服务器上部署了一套,那么远程的端口就会与frp的端口冲突!!!导致本地测试的时候失效。
流程:
用户锁单-》支付宝付款-》成功后return_url设置了用户支付完毕后跳转回哪个地址是给前端用户看的; alipay_notify_url设置了支付成功后alipay调用你的后端哪个接口。
这里有小商城和拼团系统,notify_url指拼团系统中拼团达到指定人数后,通知小商城的HTTP地址,但是如果notify_type为MQ,则
notify_url为空,并且notify_mq非空,指明是拼团成功通知还是用户退单通知(topic.team_refund)。
如果不参与拼团,则支付回调会直接修改订单为deal_done,然后发一个'支付成功'消息,进入下一环节:发货。
若参与拼团,则RPC调用拼团系统中的'拼团交易结算'接口,增加拼团完成量、更新订单状态。
若拼团达到人数,发送拼团成功通知,小商场将订单中相应拼团的 status 都设置为deal_done,然后小商场内部也再发一个'支付成功'消息,主要用于通知这些
拼团对应的订单进入下一环节:发货(感觉'支付成功'取名不够直观)。
若为用户退单通知,小商场需处理退款业务。
临时记的备忘录,不一定完善,具体看后面总结。
踩坑
Lua脚本问题(后面也没采用这种方式)
long max = target + recovery;
String lua =
"local v = redis.call('INCR', KEYS[1])+1; " +
"if tonumber(v) > tonumber(ARGV[1]) then " +
" redis.call('DECR', KEYS[1]); " +
" return 0; " +
"end " +
"return v;";
Long occupy = redisService.eval(
lua,
RScript.ReturnType.INTEGER,
Collections.singletonList(teamOccupiedStockKey),
max);
报错:
org.redisson.client.RedisException: ERR Error running script (call to f_xxxx):
@user_script:1: user_script:1: attempt to compare nil with number.
问题:max的值根本传不到 ARGV[1]
原因:因为我配置了一个全局默认的序列号器:
config.setCodec(JsonJacksonCodec.INSTANCE); //用 Jackson 进行序列化
Lua脚本期望:原始字符串或数字参数
因此,需要对这块单独配置序列化器StringCodec()
:
redissonClient.getScript(new org.redisson.client.codec.StringCodec()).eval(xxx,xx....)
系统设计
在 DDD 中有一套共识的工程两阶段设计手段,包括;战略设计、战术设计。
战略设计:战略设计的核心是通过业务边界划分和上下文隔离,将复杂的业务系统拆分为多个高内聚、低耦合的限界上下文,并明确它们之间的交互方式(如通过领域事件、API、消息队列等)。
战术设计:战术设计关注如何在限界上下文内部,使用领域模型来表达业务逻辑,避免传统的贫血模型(Anemic Model)导致的复杂、难以维护的代码。
用例图
用例图(use case diagram)是用户与系统交互的最简表示形式,展现了用户和与他相关的用例之间的关系。它不仅反映了不同角色(如用户、运营)在系统中的职责边界和任务范围,还以可视化的方式呈现了系统提供的核心功能和服务,如同一份构建系统的战略蓝图。

四色建模图
MVC的困局:面向过程的“大通铺”
- 传统MVC开发是面向过程的,像一个大通铺,大家挤在一起。
- 为每个功能流程(A、B、C流程)编写代码,导致功能代码四处复制、杂乱交织,难以管理和复用。
DDD的解法:面向领域的“精装公寓”
- DDD通过领域建模,将系统划分为不同的领域(如活动域、人群标签配置域、交易域)。
- 这就像为每个家庭分配独立的公寓和房间,让代码各归其位,结构清晰,易于维护。
1.建模方法
建模的起点:从用户行为出发
- DDD建模始于用户,分析其行为命令如何触发系统动作。
- 用例图是完美的起点,它直观展示了用户与系统的所有交互,帮助我们识别出所有关键行为。
统一的语言:协作的基石
- 建模过程需要产品、研发、测试等所有角色基于统一语言(如“拼团”、“成团”)进行协作。
- 四色建模/风暴模型是DDD的标准方法,旨在让所有参与者能共同理解和构建业务模型。
此图为风暴事件指导图,通过寻找领域事件,发起事件命令,完成领域事件的过程,完成 DDD 工程建模。
- 蓝色 - 决策命令,是用户发起的行为动作,如;发起拼团Command、支付订单Command,是流程的起点。
- 黄色 - 领域事件,在领域内已经发生的、有业务意义的事实。如支付已成功Event、拼团已成功Event。,是流程的终点。
- 红色 - 业务流程,连接决策命令和领域事件的处理逻辑或业务规则。它接收命令,执行业务操作,并产生事件,如拼团成团策略(判断人数是否已满)、支付处理流程。
- 粉色 - 外部系统,流程中需要调用的第三方服务或系统,如支付宝支付、微信登录。
- 绿色 - 只读模型,做一些读取数据的动作,没有写库的操作,如拼团活动展示。
- 棕色 - 领域对象,承载业务数据和行为的核心对象,是命令操作的直接目标,包括实体、值对象、聚合根,如用户地址(值对象)。
综上,左下角的示意图。是一个用户,通过一个策略命令,使用领域对象,通过业务流程,完成2个领域事件,调用1次外部接口个过程。我们在整个 DDD 建模过程中,就是在寻找这些节点。
流程解析:
1.起点:用户意图(User),用户想要做一件事,比如“发起拼团”或“支付订单”。
2.动作:决策命令(Command),用户的意图被封装为一个具体的 Command
(命令),通常包含执行该命令所需的所有数据。
3.核心:领域对象,命令不会凭空执行,它必须作用于一个具体的领域对象(通常是聚合根 Aggregate)。这个对象是业务的核心载体,拥有数据和行为。
4.执行:业务流程,领域对象根据自身的业务规则来处理接收到的命令。这个过程会修改对象自身的状态(如减少库存),并封装了最核心的业务逻辑。
5.结果:领域事件,业务执行成功后,会产生一个或多个领域事件。
6.扩展:调用外部系统,产生的领域事件可能会触发后续动作,其中之一就是调用外部系统。这是系统与外界协作的方式。
7.展示:读模型,负责提供数据查询功能。它通常通过监听领域事件来更新自己的数据视图,确保用户能看到最新的状态
2.寻找领域事件
寻找领域事件的过程,就是寻找系统中流程节点的结果态。什么结束了、什么完成了、什么终止。这个过程就是一堆人头脑风暴的过程,避免错失流程节点。
比如:发起拼团完成、支付完成、参与拼团完成、拼团目标达成、回调通知完成...
3.划分领域
在确定了领域事件以后,接下来要做的就是通过决策命令串联领域事件,并填充上所需要的领域对象。
- 首先,通过用户的行为动作,也就是决策命令,串联到对应的领域事件上。并对复杂的流程提供出红色的业务流程。
- 之后,为决策命令添加领域对象,每一个领域在整个流程中都起到了至关重要的作用。
有了识别出来的领域角色的流程,就可以非常容易的划分出领域边界了。
观察串联好的流程和聚集的领域对象,功能紧密相关、数据频繁交互的一组对象和事件自然形成一个领域。
例如:
所有与成团逻辑相关的命令、事件、实体(如拼团锁单
、拼团结算
)、策略(如成团校验策略
)可以划归为 拼团域
。
所有与活动相关的,比如活动配置信息、商品试算优惠价格,都可以划分为活动域
。
4.简易流程图
- 首先,站在运营的角度,要为这次拼团配置对应的拼团活动。那么就会涉及到;给哪个渠道的什么商品ID配置拼团,这样用户在进入商品页就可以看到带有拼团商品的信息了。之后要考虑,这个拼团的商品所提供的规则信息,包括:折扣、起止时间、人数等。还要拿到折扣的一个试算金额。这个试算出来的金额,就是告诉用户,通过拼团可以拿到的最低价格。
- 那么,拼团活动表,为什么会把折扣拆分出来呢。因为这里的折扣可能有多种迭代到一个拼团上。比如,给一个商品添加了直减10元的优惠,又对符合的人群id的用户,额外打9折,这样就有了2个折扣迭代。所以拆分出来会更好维护。这是对常变的元素和稳定的元素进行设计的思考。
- 另外,为了支持拼团库表,需要先根据业务规则把符合条件的用户 ID 写入 Bitmap,并为这批用户打上可配置的人群标签。创建拼团活动时,只需关联对应标签,即可让活动自动面向这部分用户生效,实现精准运营和差异化折扣。
- 之后,站在用户的角度,是参与拼团。首次发起一个拼团或者参与已存在的拼团进行数据的记录,达成拼团约定拼团人数后,开始进行通知。这个通知的设计站在平台角度可以提供回调,那么任何的系统也就都可以接入了。
系统表设计
group_buy_activity(拼团活动)
字段名 | 说明 |
---|---|
id | 自增 |
activity_id | 活动ID |
activity_name | 活动名称 |
discount_id | 折扣ID |
group_type | 成团方式(0自动成团、1达成目标成团) |
take_limit_count | 拼团次数限制 |
target | 拼团目标 |
valid_time | 拼团时长(分钟) |
status | 活动状态(0创建、1生效、2过期、3废弃) |
start_time | 活动开始时间 |
end_time | 活动结束时间 |
tag_id | 人群标签规则标识 |
tag_scope | 人群标签规则范围(多选;1可见限制、2参与限制) |
create_time | 创建时间 |
update_time | 更新时间 |
group_buy_discount(折扣配置)
字段名 | 说明 |
---|---|
id | 自增ID |
discount_id | 折扣ID |
discount_name | 折扣标题 |
discount_desc | 折扣描述 |
discount_type | 折扣类型(0:base、1:tag) |
market_plan | 营销优惠计划(ZJ:直减、MJ:满减、ZK:折扣、N元购) |
market_expr | 营销优惠表达式 |
tag_id | 人群标签(特定优惠限定) |
create_time | 创建时间 |
update_time | 更新时间 |
group_buy_order(拼团订单表)
字段名 | 说明 |
---|---|
id | 自增ID |
team_id | 拼单组队ID |
activity_id | 活动ID |
source | 渠道 |
channel | 来源 |
original_price | 原始价格 |
deduction_price | 折扣金额 |
pay_price | 支付价格 |
target_count | 目标数量 |
complete_count | 完成数量 |
lock_count | 锁单数量 |
status | 状态(0拼单中、1完成、2失败、3完成-含退单) |
valid_start_time | 拼团开始时间 |
valid_end_time | 拼团结束时间 |
notify_type | 回调类型(HTTP、MQ) |
notify_url | 回调地址(HTTP 回调不可为空) |
create_time | 创建时间 |
update_time | 更新时间 |
group_buy_order_list(拼团订单明细表)
字段名 | 说明 |
---|---|
id | 自增ID |
user_id | 用户ID |
team_id | 拼单组队ID |
order_id | 订单ID |
activity_id | 活动ID |
start_time | 锁单时间 |
end_time | 最晚锁单时间 |
valid_end_time | 拼团结束时间 |
goods_id | 商品ID |
source | 渠道 |
channel | 来源 |
original_price | 原始价格 |
deduction_price | 折扣金额 |
pay_price | 支付金额 |
status | 状态(0初始锁定、1消费完成、2用户退单) |
out_trade_no | 外部交易单号(幂等) |
create_time | 创建时间 |
update_time | 更新时间 |
biz_id | 业务唯一ID |
out_trade_time | 外部交易时间 |
notify_task(回调任务)
字段名 | 说明 |
---|---|
id | 自增ID |
activity_id | 活动ID |
team_id | 拼单组队ID |
notify_category | 回调种类(trade_unpaid2refund) |
notify_type | 回调类型(HTTP、MQ) |
notify_mq | 回调消息 |
notify_url | 回调接口 |
notify_count | 回调次数 |
notify_status | 回调状态(0初始、1完成、2重试、3失败) |
parameter_json | 参数对象 |
uuid | 唯一标识 |
create_time | 创建时间 |
update_time | 更新时间 |
crowd_tags(人群标签)
字段名 | 说明 |
---|---|
id | 自增ID |
tag_id | 人群ID |
tag_name | 人群名称 |
tag_desc | 人群描述 |
statistics | 人群标签统计量 |
create_time | 创建时间 |
update_time | 更新时间 |
crowd_tags_detail(人群标签明细)
字段名 | 说明 |
---|---|
id | 自增ID |
tag_id | 人群ID |
user_id | 用户ID |
create_time | 创建时间 |
update_time | 更新时间 |
crowd_tags_job(人群标签任务)
字段名 | 说明 |
---|---|
id | 自增ID |
tag_id | 标签ID |
batch_id | 批次ID |
tag_type | 标签类型(参与量、消费金额) |
tag_rule | 标签规则(限定类型 N次) |
stat_start_time | 统计数据开始时间 |
stat_end_time | 统计数据结束时间 |
status | 状态(0初始、1计划、2重置、3完成) |
create_time | 创建时间 |
update_time | 更新时间 |
sc_sku_activity(渠道商品活动配置关联表)
字段名 | 说明 |
---|---|
id | 自增ID |
source | 渠道 |
channel | 来源 |
activity_id | 活动ID |
goods_id | 商品ID |
create_time | 创建时间 |
update_time | 更新时间 |
sku(商品信息)
字段名 | 说明 |
---|---|
id | 自增ID |
source | 渠道 |
channel | 来源 |
goods_id | 商品ID |
goods_name | 商品名称 |
original_price | 商品价格 |
create_time | 创建时间 |
update_time | 更新时间 |
DDD架构设计
MVC架构:
DDD架构:
价格试算与人群标签
活动是否允许用户参与,拼团的判断逻辑有两重,具体条件如下:
- 活动是否设置了
tag_scope
tag_scope
用于限制活动参与的范围。若活动未设置tag_scope
,则默认认为没有任何限制,所有用户均可参与拼团;若配置了tag_scope
,则需要根据该配置进一步判断用户是否符合参与条件。 - 用户是否在指定的人群标签
tag_id
范围内tag_id
指定了本次活动的参与人群,只有拥有该标签的人群才能参与活动(具体逻辑就是每个tagid的位图里存了很多userid,只有这些userid才能参与)。如果活动未配置tag_id
,则默认所有用户都可参与拼团。需要注意的是,在本项目的实现中,虽然活动配置了tag_id
,但由于位图(bitmap)未进行配置,实际上也是默认所有用户均可参与拼团。
ps:这里只校验用户是否有参与活动的资格!!!
后续还有锁单的校验,注意区分,锁单是基于这里的资格判断之后的,再去此时活动是否仍有效、用户参与拼团次数是否已达上限...
价格试算
@Service
@RequiredArgsConstructor
public class IndexGroupBuyMarketServiceImpl implements IIndexGroupBuyMarketService {
private final DefaultActivityStrategyFactory defaultActivityStrategyFactory;
@Override
public TrialBalanceEntity indexMarketTrial(MarketProductEntity marketProductEntity) throws Exception {
StrategyHandler<MarketProductEntity, DefaultActivityStrategyFactory.DynamicContext, TrialBalanceEntity> strategyHandler = defaultActivityStrategyFactory.strategyHandler();
TrialBalanceEntity trialBalanceEntity = strategyHandler.apply(marketProductEntity, new DefaultActivityStrategyFactory.DynamicContext());
return trialBalanceEntity;
}
}
IndexGroupBuyMarketService
│
│ indexMarketTrial()
▼
DefaultActivityStrategyFactory
│ (return rootNode)
▼
RootNode.apply()
│ doApply() (执行)
│ router() (路由到下一node)
▼
SwitchNode.apply()
│ ...
▼
... (可能还有其他节点)
▼
EndNode.apply() → 组装结果并返回 TrialBalanceEntity
▲
└────────── 最终一路向上 return
IndexGroupBuyMarketService
是领域服务,整个价格试算的入口
DefaultActivityStrategyFactory
帮你拿到 根节点,真正的“工厂”工作(多线程预处理、分支路由)都在各 Node 里完成。
DynamicContext
是一次性创建的共享上下文:谁需要谁就往里放
目前项目中是单策略模式,即满减、直减这些优惠N选一;后期可扩展成组合策略,实现方式:
1.简单点就是每种组合单独定义一个组合策略,比如“满减 + 直减”:
@Service("MJ_COUPON")
public class MJCouponCalculateService extends AbstractDiscountCalculateService {
@Resource
private MJCalculateService mjCalculateService;
@Resource
private CouponCalculateService couponCalculateService;
@Override
public BigDecimal doCalculate(BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount) {
// 先满减
BigDecimal afterMJ = mjCalculateService.doCalculate(originalPrice, groupBuyDiscount);
// 再优惠券
return couponCalculateService.doCalculate(afterMJ, groupBuyDiscount);
}
}
但是如果以后有更多组合(比如“满减+直减+优惠券”),会出现类爆炸。
2.动态策略配置,比如:["MJ", "COUPON"]
。
写一个通用的 CompositeDiscountService
,按照配置顺序,依次执行每个基础策略:
public class CompositeDiscountService extends AbstractDiscountCalculateService {
private final List<IDiscountCalculateService> strategies;
@Override
public BigDecimal doCalculate(BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount discount) {
BigDecimal price = originalPrice;
for (IDiscountCalculateService s : strategies) {
price = s.calculate("userId", price, discount);
}
return price;
}
}
如何配置多种策略???
(1)规则优先级配置(非动态计算)
运营预先定义策略顺序:例如强制规定 满减 → 直减 → 折扣
,避免计算所有排列组合。优点:性能高(O(n)),规则可控。适用场景:优惠策略较少或业务方明确要求顺序。
(2)动态计算最优解
有限枚举 + 剪枝优化,仅对允许叠加的策略枚举顺序,并通过规则提前排除无效组合(如互斥优惠)。
优点:灵活性高,用户获利最大化。
假设总价 300
元,可用优惠:
满 200
减 50
.
满 300
打 8
折.
直减 20
元
1. 满减 → 折扣 : (300-50)*0.8 = 200
2. 满减 → 直减 : (300-50)-20 = 230
3. 折扣 → 满减 : (300 * 0.8)-50 = 190 ← 最优
4. 折扣 → 直减 : (300 * 0.8)-20 = 220
5. 直减 → 满减 : (300-20) → 不满200,满减不生效 → 280
6. 直减 → 折扣 : (300-20)*0.8 = 224
(3)分层优惠
总价
│
├─ 第一阶段:全局优惠(如全场8折)
│
├─ 第二阶段:品类优惠(如家电满1000减100)
│
└─ 第三阶段:单品优惠(如A商品直降50)
人群标签
人群标签采集
步骤 | 目的 | 说明 |
---|---|---|
1. 记录日志 | 标明本次批次任务的开始 | 方便后续排查、链路追踪 |
2. 读取批次配置 | 拿到该批次统计范围、规则、时间窗等 | 若返回 null 通常代表批次号错误或已被清理 |
3. 采集候选用户 | 从业务数仓/模型结果里拉取符合条件的用户 ID 列表 | 真实场景中会:• 调 REST / RPC 拿画像• 或扫离线结果表• 或读 Kafka 流 |
4. 双写标签明细 | 将每个用户与标签的关系永久化 & 提供实时校验能力 | 方法内部两件事:• 插入 crowd_tags_detail 表• 在 Redis BitMap 中把该用户对应位设为 1(幂等处理冲突) |
5. 更新统计量 | 维护标签当前命中人数,用于运营看板 | 这里简单按“新增条数”累加,也可改为重新 count(*) 全量回填 |
6. 结束 | 方法返回 void | 如果过程抛异常,调度系统可重试/报警 |
一句话总结 这是一个被定时器或消息触发的离线批量打标签任务: 拉取任务规则 → (离线)筛出符合条件的用户 → 写库 + 写 Redis 位图 → 更新命中人数。 之后业务系统就能用位图做到毫秒级
isUserInTag(userId, tagId)
判断,实现精准运营投放。
Bitmap(位图)
概念
- Bitmap 又称 Bitset,是一种用位(bit)来表示状态的数据结构。
- 它把一个大的“布尔数组”压缩到最小空间:每个元素只占 1 位,要么 0(False)、要么 1(True)。
为什么用 Bitmap?
- 超高空间效率:1000 万个用户,只需要约 10 MB(1000 万 / 8)。
- 超快操作:检查某个索引位是否为 1、计数所有“1”的个数(BITCOUNT)、找出第一个“1”的位置(BITPOS)等,都是 O(1) 或者极快的位运算。
典型场景
- 用户标签 / 权限判断:把符合某个条件的用户的索引位置设置为 1,以后实时判断“用户 X 是否在标签 A 中?”就只需读一个 bit。
- 海量去重 / 布隆过滤器:在超大流量场景下判断“URL 是否已访问过”、“手机号是否已注册”等。
- 统计分析:快速统计某个条件下有多少个用户/对象符合(BITCOUNT)。
示例:
default int getIndexFromUserId(String userId, int bitmapSize) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hashBytes = md.digest(userId.getBytes(StandardCharsets.UTF_8));
BigInteger bigInt = new BigInteger(1, hashBytes);
return bigInt.mod(BigInteger.valueOf(bitmapSize)).intValue();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not found", e);
}
}
int index = getIndexFromUserId(uuid, 10_000_000); // 1千万位的bitmap
输入:userId = "abcd-1234"
MD5(十六进制):E2FC714C4727EE9395F324CD2E7F331F
可以当做128位的无符号整数,假设是bigInt =302349823749823749823749823
bitmapSize=10000
那么index=bigInt % 10000 = 9823
Bitmap 的存储方式
bitmap.set(123)
的含义就是把 第 123 位 (bit) 标记为 1
。
在底层实现上,Bitmap 通常用一段连续的二进制数组(比如 int[]
或 byte[]
)来存储:
- 如果用 int 数组存储:每个
int
占 32 bit(在 Java/C++ 等语言里)。- 第
123
个 bit 属于第123 / 32 = 3
个整型元素(下标从 0 开始)。 - 在这个元素里具体是哪一位呢?就是
123 % 32 = 27
这位。 - 所以实际上是
array[3]
的第27
个 bit 被置 1。
- 第
- 如果用 byte 数组存储:每个
byte
占 8 bit。- 第
123
个 bit 属于第123 / 8 = 15
个字节。 - 在这个字节里具体位置是
123 % 8 = 3
。 - 所以是
array[15]
的第3
位被置 1。
- 第
布隆过滤器
核心思想:用 多个哈希函数 来降低冲突。
具体做法:
- 对
userId
用 k 个不同的 hash 函数(可以是 MD5 的不同切片、或 MurmurHash 等)。 - 得到 k 个位置,把这些位置的 bit 全部设为 1。
- 判断存在性时,只要这 k 个位置都为 1,就认为“可能存在”;只要有一个为 0,就一定不存在。
这样虽然依然有假阳性(可能存在其实不存在),但概率大大降低。
判断是否是否存在,如果任意一个bit=0 =》一定不存在;如果所有bit=1=》可能存在。
人群标签过滤
白名单。
无 tagId(没配标签)→ 不限人群,全部放行(visible=true, enable=true
)。
有 tagId 且位图存在 → 位图里的人可以参加(白名单)。
有 tagId 但位图不存在 → 现在的实现是默认全放行(把“未配置位图”当作“不限制”),因为真实场景中由外部系统统计用户行为=>将符合条件的用户放入位图中,这里暂时没有模拟。
动态降级与人群切量
downgradeSwitch
—— 降级开关
作用:在出现异常或高压场景时,主动关闭部分功能,保证核心流程可用。
- 值为
0
(默认):功能正常,系统按照全量逻辑执行。 - 值为
1
:开启降级,比如:- 关闭一些非核心功能(如推荐、统计、日志落库)。
- 使用兜底方案(如直接返回默认值、提示“稍后再试”)。
cutRange
—— 人群切量开关
作用:做灰度发布或分流测试,让不同用户群体验不同的功能版本。
-
默认值为
100
:表示 100% 用户都可用,即全量发布。 -
如果设置为
30
:就表示 只有 30% 的用户能进入新功能,其他 70% 用户还是老逻辑。 -
计算逻辑:对用户 ID 做哈希,取模 100,落在
[0, cutRange]
范围内的用户通过。public boolean isCutRange(String userId) { // 计算哈希码的绝对值 int hashCode = Math.abs(userId.hashCode()); // 获取最后两位 int lastTwoDigits = hashCode % 100; // 判断是否在切量范围内 // 在范围内,可以继续参加活动 if (lastTwoDigits <= Integer.parseInt(cutRange)) { return true; } return false; }
不要直接对用户 ID 取模,因为可能是String类型的。
拼团交易锁单
下单到支付中间有一个流程,即锁单,比如淘宝京东中,在这个环节(限定时间内)选择使用优惠券、京豆等,可以得到优惠价,再进行支付;拼团场景同理,先加入拼团,进行锁单,然后优惠试算,最后才付款。
锁单流程:
1.幂等查询,如果已有一模一样的锁单,直接返回该条记录;
2.拼团人数校验(前端显示有滞后性,在调用锁单接口的时候还要重新拉取一下)
3.优惠试算,查看拼团活动配置信息(优惠价、目标人群、活动有效期、最大参与次数...)。
4.人群限定,非目标人群不允许参与活动
5.锁单责任链
- 活动有效性
- 用户参与次数
- 剩余库存校验
拼团结算

结算规则过滤:SC渠道拦截、外部交易单号交易、结算时间校验(now小于拼团结束时间)
对接商城和拼团系统
下单总体流程
- 查询商品并初始化订单
- 查询商品信息,构建订单,填入
total_amount
,此时订单状态为 PAY_CREATE。
- 查询商品信息,构建订单,填入
- 判断订单类型
- 普通下单:直接进入预支付流程。
- 拼团下单/开团:远程调用拼团系统,执行锁单逻辑(活动校验、库存校验、优惠计算等)。
- 生成预支付订单
- 根据订单类型决定支付金额:
- 普通单:按商品原价。
- 拼团单:按优惠后价格。
- 创建支付单,填入
pay_amount
、pay_url
等信息,订单状态置为 PAY_WAIT。
- 根据订单类型决定支付金额:
- 等待支付回调
- 用户扫码/支付成功后,支付平台回调商城接口,更新订单状态。
- 超时未支付订单由调度任务关闭。
具体业务步骤
1. 用户下单
- 如果用户已存在未支付订单:
- 且有支付链接(
pay_url
) → 直接返回支付链接。 - 没有支付链接 → 进入支付单创建流程。
- 且有支付链接(
- 否则,进入新订单创建。
2. 创建订单
- 查询商品信息并保存新订单(状态 PAY_CREATE)。
- 若为拼团单(
marketType == GROUP_BUY_MARKET
),调用拼团系统执行 营销锁单- 校验活动有效性
- 校验用户参与次数
- 校验剩余库存
- 优惠试算
- 记录拼团锁单结果(订单号、折扣金额等)
- 普通订单跳过营销锁单。
3. 创建预支付单
- 拼团单:根据优惠结果生成预支付订单。
- 普通单:直接用原价生成预支付订单。
- 更新订单状态为 PAY_WAIT,返回支付链接。
支付完成与组队结算
1.支付回调
- 更新订单状态
- 触发“支付结算并发货”流程
2.组队结算判断
- 调用拼团营销系统组队结算接口,更新当前拼团完成人数
- 判断该拼团是否已完成:
- 是:
- 调用营销结算 HTTP 接口
- 结算完成 N 个用户组成的队伍
- 发送“组队完成回调通知”
- 否:直接结束流程
- 是:
3.后续发货
- 当收到拼团完成(complete_count==target_count)的回调消息时,小型商城执行后续交易结算及发货逻辑(目前是模拟触发的)。
注意
alipay_notify_url
- 作用:支付宝在用户支付成功后,向该地址发起服务器端回调(需公网可访问,或通过内网穿透映射到本地)。
- 调用流程:支付宝 →
pay-mall
- 用途:
pay-mall
接收到支付成功通知后,可以调用拼团组队结算接口。 - 与之相关的两个地址:
return_url
:用户付款后网页自动跳转的地址(通常是返回商城首页),属于前端页面跳转,与业务结算无关。gateway_url
:支付宝提供的商户收款页面地址(用户发起付款时访问)。
group-buy-market_notify_url
-
http://127.0.0.1:8092/api/v1/alipay/group_buy_notify 注意!HTTP调用下才使用,MQ这个字段失效!
-
作用:由
pay-mall
商城设置,作为拼团平台的回调地址。 -
调用流程:拼团平台(
group-buy-market
) →pay-mall
-
触发条件:某个
teamId
的拼团人数达到目标值,拼团成功。 -
用途:通知
pay-mall
对该teamId
下所有成员执行后续操作,例如发货。
本地对接
在 group-buying-sys
项目中,对 group-buying-api
模块执行 mvn clean install
(或在 IDE 中运行 install)。这会将该模块的 jar 安装到本地 Maven 仓库(~/.m2/repository
)。然后在 pay-mall
项目的 pom.xml
中添加依赖,使用相同的 groupId
、artifactId
和 version
即可引用该模块,如下所示:
<dependency>
<groupId>edu.whut</groupId>
<artifactId>group-buying-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
发包
仅适用于本地,共用一个本地Maven仓库,一旦换台电脑或者在云服务器上部署,无法就这样引入,因此可以进行发包。这里使用阿里云效发包https://packages.aliyun.com/
1)点击制品仓库->生产库
2)下载settings-aliyun.xml文件并保存至本地的Maven的conf文件夹中。
3) 配置项目的Maven仓库为阿里云提供的这个,而不是自己的本地仓库。
4)发包,打开Idea中的Maven,双击deploy
5)验证
6)使用
将公共镜像仓库的settings文件和阿里云效的settings文件合并,可以同时拉取公有依赖和私有包。
逆向工程:退单

逆向的流程,要分析用户是在哪个流程节点下进行退单行为。包括3个场景;
已锁单、未支付:redis恢复量+1,mysql中锁单量-1
已锁单、已支付,但拼团未成团:redis恢复量+1,mysql中锁单量、完成量-1,退款
已锁单、已支付,且拼团已成团:redis恢复量无需+1,因为成团之后不开放给别人;mysql中锁单量、完成量-1,退款,拼团设置为'已完成含退单'状态,但拼团中所有人都退单,更新为失败!
核心流程说明
阶段一:退单操作流程
-
客户主动提交退单请求
-
通过责任链模式处理:数据加载Node(查询订单) → 重复检查Node(防止重复退单) → 策略执行Node
-
策略选择
根据订单状态和拼团状态选择对应退单策略(三种之一)
-
执行退单
更新数据库操作(锁单量、完成量、拼团状态、订单状态...)
-
消息通知 + 任务补偿
发送MQ退单消息通知(未支付退单、已支付未成团...三种消息 notify_category)
将消息写入notify_task表,定时任务扫描未成功处理的消息,以做补偿兜底。
阶段二:库存恢复流程
-
消息监听
MQ监听器接收退单成功消息
-
服务调用
调用恢复库存服务
-
策略选择
根据退单类型选择对应策略(已成团的无需恢复了,反正新用户也无法再参与该拼团)
-
库存恢复
执行Redis库存恢复操作(带分布式锁保护)
设计模式应用
-
责任链模式
TradeRefundRuleFilterFactory
构建的过滤链:
DataNodeFilter
→UniqueRefundNodeFilter
→RefundOrderNodeFilter
-
策略模式
-
策略接口:
RefundOrderStrategy
-
实现策略:
Unpaid2RefundStrategy
(未付款退单的流程)
Paid2RefundStrategy
(已付款退单)
PaidTeam2RefundStrategy
(已成团退单)
-
-
工厂模式
TradeRefundRuleFilterFactory
负责组装责任链 -
模板方法模式
AbstractRefundOrderStrategy
提供:- 公共方法封装 (发送退单MQ消息、库存恢复redis)
- 依赖注入支持
退单触发入口
1)用户主动退单
2)定时任务,定时任务扫描锁单但未结算的订单,若支付时间超过设定值,对这笔订单执行退单操作。
注意:小型支付商城中的订单可能有些是普通订单,有些是拼团订单。
对于普通订单,无需调用拼团系统中的退单接口,自己本地退单,对于CREATE或PAY_WAIT状态的订单,直接修改订单状态为CLOSED;对于PAY_SUCCESS(个人支付完成)、DEAL_DONE,额外调用支付宝退款。
对于拼团订单,RPC调用拼团系统的退单接口,调用成功后设置订单为WAIT_REFUND,然后由MQ消息回调调用支付宝退款。
定时任务+MQ消息通知
定时任务
拼团营销系统:
1.MQ消息补偿,每天零点执行一次(暂定)
//每天零点执行一次
@Scheduled(cron = "0 0 0 * * ?")
public void exec() {
// 为什么加锁?分布式应用N台机器部署互备(一个应用实例挂了,还有另外可用的),任务调度会有N个同时执行,那么这里需要增加抢占机制,谁抢占到谁就执行。完毕后,下一轮继续抢占。
// 获取锁句柄,并未真正获取锁
RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec");
try {
//尝试获取锁 waitTime = 3:如果当前锁已经被别人持有,调用线程最多等待 3 秒去重试获取;
// leaseTime = 0:不设过期时间,看门狗机制
boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS);
if (!isLocked) return;
Map<String, Integer> result = tradeTaskService.execNotifyJob();
log.info("定时任务,回调通知完成 result:{}", JSON.toJSONString(result));
} catch (Exception e) {
log.error("定时任务,回调通知失败", e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
2.超时未支付订单扫描,每隔5分钟执行一次;主要就是营销锁单了,但是15分之内还没付款,自动调用退单逻辑,释放锁单量,然后发退单的MQ消息。
小型支付商城:
1.支付宝回调补偿,未支付订单扫描,每隔10秒调用一次支付宝的接口,查询某未支付的订单到底付了没有,如果付了,则更新订单状态。主要是为了防止用户付了钱,但是由于网络波动,导致支付宝调用系统的回调接口失败,做的一次补偿动作。
2.超时订单扫描,每3分钟执行一次,对于超过15分钟仍未付款的订单,将其关闭。
待优化:将定时轮询查询改为在每个用户下单时发一个延迟消息的事件触发方式。
特性 | 定时轮询查询 | 延迟消息(事件触发) | 优胜方 |
---|---|---|---|
实时性 | 差(取决于轮询间隔,如1分钟) | 高(理论上精确到秒) | 延迟消息 |
数据库压力 | 巨大(高频扫描全表) | 极小(只查单条记录) | 延迟消息 |
可靠性 | 高(逻辑简单,不易丢单) | 中(依赖MQ的可靠性) | 定时轮询 |
扩展性 | 差(订单量越大,性能越差) | 好(天然分布式,随订单量线性扩展) | 延迟消息 |
复杂度 | 低(实现简单) | 中(需引入和维护MQ) | 定时轮询 |
资源利用率 | 低(大量无效查询) | 高(按需触发,无浪费) | 延迟消息 |
MQ消息
有三种MQ消息:
1.退款成功通知
2.拼团组队成功通知
3.订单支付成功消息
退款成功消息:拼团系统发送,拼团订单。拼团系统发送,小型商城和拼团系统都接收,各自执行退单流程。
组队成功消息:拼团系统发送,拼团订单。拼团系统发送,小型商城和拼团系统都接收,小型商城更新订单状态;拼团系统仅仅是简单的打印一下'通知成功'消息。
订单支付成功消息:小型商城发送,普通订单则用户支付后的回调就发送;拼团订单则是先接到'组队成功消息'之后 再发MQ。小型商城接收,更新订单状态为模拟发货。 这里主要起到解耦的作用,将发货这个过程解耦。
不仅在相关接口完成的时候自动发送MQ消息,同时有兜底,将MQ消息持久化进Mysql,设置定时任务来扫描表,对暂未处理(处理失败)的MQ消息重新投递。
字段名 | 类型 | 允许为空 | 默认值 | 约束 / 备注 |
---|---|---|---|---|
id |
int UNSIGNED | NO | AUTO_INCREMENT | 自增ID,主键 |
activity_id |
bigint | NO | 活动ID | |
team_id |
varchar(8) | NO | 拼单组队ID | |
notify_category |
varchar(64) | YES | NULL | 回调种类 |
notify_type |
varchar(8) | NO | 'HTTP' |
回调类型(HTTP、MQ) |
notify_mq |
varchar(32) | YES | NULL | 回调消息 |
notify_url |
varchar(128) | YES | NULL | 回调接口 |
notify_count |
int | NO | 回调次数 | |
notify_status |
tinyint(1) | NO | 回调状态【0 初始、1 完成、2 重试、3 失败】 | |
parameter_json |
varchar(256) | NO | 参数对象(JSON 字符串) | |
uuid |
varchar(128) | NO | 唯一标识 | |
create_time |
datetime | NO | CURRENT_TIMESTAMP | 创建时间 |
update_time |
datetime | NO | CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP | 更新时间 |
如何确保MQ消息持久化成功?
// 4. 更新数据库,拼团交易结算,若达到拼团人数,返回notifyTaskEntity发送回调通知,否则返回null不做处理
NotifyTaskEntity notifyTaskEntity =repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate);
// 5. 组队回调处理 - 处理失败也会有定时任务补偿,通过这样的方式,可以减轻任务调度,提高时效性
if (null != notifyTaskEntity) {
threadPoolExecutor.execute(() -> {
Map<String, Integer> notifyResultMap = null;
try {
notifyResultMap = tradeTaskService.execNotifyJob(notifyTaskEntity);
log.info("回调通知拼团完结 result:{}", JSON.toJSONString(notifyResultMap));
} catch (Exception e) {
log.error("回调通知拼团完结失败 result:{}", JSON.toJSONString(notifyResultMap), e);
throw new AppException(e.getMessage());
}
});
}
在拼团支付成功后的结算过程中(repository.settlementMarketPayOrder
),所有数据库操作(更新状态、更新拼团人数、持久化拼团成功消息)都被包含在一个事务内。通过 @Transactional
注解保证了事务的一致性,确保了操作的原子性。
如果任何步骤失败,事务会回滚,数据保持一致。
repository.settlementMarketPayOrder
执行失败的话,后面也不会发送MQ消息了。
如果操作是幂等的,并且失败是由于 暂时性故障(如数据库连接失败、网络问题等),那么可以引入 重试机制 来增加系统的容错性。
如何衡量消息是否成功发送?
private Map<String, Integer> execNotifyJob(List<NotifyTaskEntity> notifyTaskEntityList) throws Exception {
//successCount:成功回调的任务数量
int successCount = 0, errorCount = 0, retryCount = 0;
for (NotifyTaskEntity notifyTask : notifyTaskEntityList) {
// HTTP模式下回调小商城中的groupBuyNotify接口 success 成功,error 失败
String response = port.groupBuyNotify(notifyTask);
// 更新状态判断&变更数据库表回调任务状态
if (NotifyTaskHTTPEnumVO.SUCCESS.getCode().equals(response)) {
int updateCount = repository.updateNotifyTaskStatusSuccess(notifyTask);
if (1 == updateCount) {
successCount += 1;
}
} else if (NotifyTaskHTTPEnumVO.ERROR.getCode().equals(response)) {
if (notifyTask.getNotifyCount() < 5) {
// 失败但可以重试 → 标记为 RETRY,等待下一次收集 “待处理的通知任务列表”
if (repository.updateNotifyTaskStatusRetry(notifyTask) == 1) {
retryCount++;
}
} else {
// 已达最大重试次数 → 标记为 ERROR(不再重试)
if (repository.updateNotifyTaskStatusError(notifyTask) == 1) {
errorCount++;
}
}
}
}
目前逻辑比较简单,只能确保消息发送出去了,如果为了提高安全性,还需要:
1.发送方确认机制(ConfirmCallback、returnCallback)
2.消费方确认机制,比如把auto ACK改为manul ACK,配置无状态重试/有状态重试(需要对消息做幂等性处理),超过最大重试次数的消息进入死信队列中,等待人工审查。
3.消费方成功收到消息并将消息表中的对应消息的status设置为'已完成',而不是发送者来写。
收获
实体对象
实体是指具有唯一标识的业务对象。
在 DDD 分层里,Domain Entity ≠ 数据库 PO。
在 edu.whut.domain.*.model.entity
包下放的是纯粹的业务对象,它们只表达业务语义(团队 ID、活动时间、优惠金额……),对「数据持久化细节」保持无感知。因此它们看起来“字段不全”是正常的:
- 它们不会带
@TableName
/@TableId
等 MyBatis-Plus 注解; - 也不会出现数据库的技术字段(
id
、create_time
、update_time
、status
等); - 只保留聚合根真正需要的业务属性与行为。
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class PayActivityEntity {
/** 拼单组队ID */
private String teamId;
/** 活动ID */
private Long activityId;
/** 活动名称 */
private String activityName;
/** 拼团开始时间 */
private Date startTime;
/** 拼团结束时间 */
private Date endTime;
/** 目标数量 */
private Integer targetCount;
}
这个也是实体对象,因为多个字段的组合: teamId 和 activityId 能唯一标识这个实体。
多线程异步调用
如果某任务比较耗时(如加载大量数据),可以考虑开多线程异步调用。
// Runnable ➞ 只能 run(),没有返回值
public interface Runnable {
void run();
}
// Callable<V> ➞ call() 能返回 V,也能抛检查型异常
public interface Callable<V> {
V call() throws Exception;
}
public class MyTask implements Callable<String> {
private final String name;
public MyTask(String name) {
this.name = name;
}
@Override
public String call() throws Exception {
// 模拟耗时操作
TimeUnit.MILLISECONDS.sleep(300);
return "任务[" + name + "]的执行结果";
}
}
public class SimpleAsyncDemo {
public static void main(String[] args) {
// 创建大小为 2 的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
try {
// 构造两个任务
MyTask task1 = new MyTask("A");
MyTask task2 = new MyTask("B");
// 用 FutureTask 包装 Callable
FutureTask<String> future1 = new FutureTask<>(task1);
FutureTask<String> future2 = new FutureTask<>(task2);
// 提交给线程池异步执行
pool.execute(future1);
pool.execute(future2);
// 主线程可以先做别的事…
System.out.println("主线程正在做其他事情…");
// 在需要的时候再获取结果(可加超时)
String result1 = future1.get(1, TimeUnit.SECONDS); //设置超时时间1秒
String result2 = future2.get(); //无超时时间
System.out.println("拿到结果1 → " + result1);
System.out.println("拿到结果2 → " + result2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.err.println("任务执行中出错: " + e.getCause());
} catch (TimeoutException e) {
System.err.println("等待结果超时");
} finally {
pool.shutdown();
}
}
}
动态配置(热更新)
BeanPostProcessor
是 Spring 提供的一个扩展接口,用来在 Spring 容器实例化 Bean(并完成依赖注入)之后,但 在调用 Bean 的初始化方法之前或之后,对 Bean 进行额外的加工处理。
Spring 容器启动时会扫描并实例化所有实现了 BeanPostProcessor
接口的 Bean,然后在Bean 初始化阶段前后依次调用它们的 postProcessBeforeInitialization
和 postProcessAfterInitialization
方法。
postProcessAfterInitialization
返回你修改之后的bean实例。
**原理:**利用 Redis 的发布/订阅(Pub/Sub)机制,在程序运行时动态推送配置变更通知,订阅者接收到消息后更新相应的 Bean 字段。通过 反射(Reflection API) 可以 动态修改运行中的对象实例的字段值。
实现步骤
注解标记
用 @DCCValue("key:default")
标注需要动态注入的字段,指定 Redis Key 和默认值。
// 标记要动态注入的字段
@Retention(RUNTIME) @Target(FIELD)
public @interface DCCValue {
String value(); // "key:default"
}
// 业务使用示例
@Service
public class MyFeature {
@DCCValue("myFlag:0") //标注字段,默认值为0
private String myFlag;
public boolean enabled() { return "1".equals(myFlag); }
}
启动时注入
实现 BeanPostProcessor
,覆写postProcessAfterInitialization
方法,在每个 Spring Bean 初始化后自动执行:
- 扫描标注了
@DCCValue
的字段; - 拼接完整
Redis Key
,若 Redis 中没有配置,则写入默认值(即@DCCValue注解上的值); - 通过反射将配置值注入到 Bean 的字段;
- 将配置与 Bean 映射关系存入本地HashMap,以便后续热更新。
@Override
public Object postProcessAfterInitialization(Object bean, String name) {
private final Map<String, Object> dccObjGroup = new HashMap<>();
Class<?> cls = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
for (Field f : cls.getDeclaredFields()) {
DCCValue dccValue = f.getAnnotation(DCCValue.class);
if (dccValue != null) {
String[] parts = dccValue.value().split(":");
String key = PREFIX + parts[0]; // Redis 中存储的 Key
String defaultValue = parts[1]; // 默认值
RBucket<String> bucket = redis.getBucket(key);
String value = bucket.isExists() ? bucket.get() : defaultValue;
bucket.trySet(defaultValue); // 若 Redis 中无配置,则写入默认值
injectField(bean, f, value); // 通过反射注入值
dccObjGroup.put(key, bean); // 缓存配置与 Bean 映射关系
}
}
return bean; // 返回初始化后的 Bean
}
运行时热更新
-
订阅一个 Redis Topic(频道),比如
"dcc_update"
; -
外部通过发布接口
PUBLISH dcc_update "key,newValue"
发送更新消息;private final RTopic dccTopic; @GetMapping("/dcc/update") public void update(@RequestParam String key, @RequestParam String value) { // 发布配置更新消息到 Redis 主题,格式为 "configKey,newValue" String message = key + "," + value; dccTopic.publish(message); // 通过 dccTopic 发布更新消息 log.info("配置更新发布成功 - key: {}, value: {}", key, value); }
-
订阅者收到后:
- 更新 Redis 中的配置;
- 从映射里取出对应 Bean,使用反射更新字段。
// 发布/订阅配置热更新
@Bean("dccTopic")
public RTopic dccTopic(RedissonClient redis) {
RTopic dccTopic = redis.getTopic("dcc_update");
dccTopic.addListener(String.class, (channel, msg) -> {
String[] parts = msg.split(","); // msg 约定格式:"configKey,newValue"
String key = PREFIX + parts[0]; // 拼接 Redis Key
String newValue = parts[1]; // 新的配置值
RBucket<String> bucket = redis.getBucket(key);
if (!bucket.isExists()) {
return; // 如果不是我们关心的配置,跳过
}
bucket.set(newValue); // 更新 Redis 中的配置
Object bean = beans.get(key); // 从内存中取出 Bean 实例
if (bean != null) {
injectField(bean, parts[0], newValue); // 通过反射更新 Bean 字段
}
});
return dccTopic; // 返回 Redis Topic 实例
}
在 Redis 的发布/订阅模型中,RTopic dccTopic = redis.getTopic("dcc_update");
这行代码指定了 dccTopic
订阅的主题(也可以理解为一个消息通道)。不同的类可以通过依赖注入来使用这个 RTopic
实例。一些类可以调用 dccTopic.publish(message)
向该通道发送消息;而另一些类则可以通过 dccTopic.addListener()
来订阅该主题,从而接收消息并进行相应的处理。
面试官:为什么选择Redis Pub/Sub,不用rabbitmq?
两者都能实现这个需求,但 Redis 更轻量。在多实例部署时,每个实例都能收到广播并通过反射完成热更新;如果某个节点宕机重启,它会直接从 Redis 中拉取最新配置,而不依赖历史广播。
确实,Redis Pub/Sub 没有消息确认等可靠性机制;如果换成 RabbitMQ,配置交换机为广播模式,各实例使用匿名队列,同样可以接收消息并完成更新,而且还能提供更强的可靠性保证。不过在本项目中,动态配置中心在初期就基于 Redis 实现了,RabbitMQ 是在后期做交易领域时才引入的组件。考虑到场景对可靠性要求不高,同时也为了保持架构的简单性,所以没有替换为 RabbitMQ。
热更新数据流转过程
1.广播消息(PUBLISH):配置变更会通过 PUBLISH
命令广播到 Redis 中的某个主题。
2.Redis Sub(订阅):订阅该主题的客户端收到消息后,进行处理。
3.更新 Redis 和 Bean 字段:
- 更新 Redis 中的配置(保持一致性)。
- 更新 Bean 实例的对应字段(通过反射,确保配置的实时性)。
重要说明
RedissonClient( Redis)的作用:
1.消息广播(通过 Topic)
2.redis中的配置与bean中的字段配置一致,有一定容错 / 恢复能力,如果某个节点启动时错过了消息,它可以在初始化时直接从 Redis 读到最新配置。
HashMap的作用:
在广播监听阶段,快速获取要操作的bean实例,进行反射。
OkHttpClient+Retrofit
1.引入依赖
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>retrofit</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>converter-jackson</artifactId>
<version>2.11.0</version>
</dependency>
2.配置 OkHttpClient
@Configuration
public class OkHttpClientConfig {
@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
.connectTimeout(Duration.ofSeconds(10))
.readTimeout(Duration.ofSeconds(30))
.writeTimeout(Duration.ofSeconds(30))
.retryOnConnectionFailure(true)
// TODO: 可以统一加日志拦截器、鉴权拦截器
.build();
}
}
-
单例复用:Spring 管理 Bean,整个应用只创建一次。
-
集中配置:超时、拦截器、SSL 等只在这里写一份,避免代码里到处
new
。
3.配置Retrofit
@Configuration
public class RetrofitConfig {
private static final String BASE_URL = "https://api.example.com/";
@Bean
public Retrofit retrofit(OkHttpClient okHttpClient) {
return new Retrofit.Builder()
.baseUrl(BASE_URL)
.client(okHttpClient) // 🔑 复用统一的 OkHttpClient
.addConverterFactory(JacksonConverterFactory.create())
.build();
}
@Bean
public ApiService apiService(Retrofit retrofit) {
return retrofit.create(ApiService.class);
}
}
4.定义 API 接口(Retrofit 风格)
public interface ApiService {
@GET("users/{id}")
Call<User> getUser(@Path("id") String id);
@POST("orders")
Call<OrderResponse> createOrder(@Body OrderRequest request);
}
5.在 Service 中调用
@Slf4j
@Service
@RequiredArgsConstructor
public class UserService {
private final ApiService apiService;
// 同步请求
public User getUserById(String id) {
try {
Response<User> resp = apiService.getUser(id).execute();
if (resp.isSuccessful()) {
return resp.body();
}
throw new RuntimeException("请求失败,HTTP " + resp.code());
} catch (Exception e) {
log.error("获取用户信息失败", e);
throw new RuntimeException(e);
}
}
// 异步请求
public void getUserAsync(String id) {
apiService.getUser(id).enqueue(new Callback<User>() {
@Override
public void onResponse(Call<User> call, Response<User> response) {
log.info("异步回调结果: {}", response.body());
}
@Override
public void onFailure(Call<User> call, Throwable t) {
log.error("异步请求失败", t);
}
});
}
}
Retrofit 在运行时会生成这个接口的实现类,帮你完成:
- 拼 URL(把
{id}
换成具体值) - 发起 GET 请求
- 拿到响应的 JSON 并自动反序列化成
User
对象
OkHttp 提供底层能力:连接池、超时、拦截器、HTTP/2 等,适合做全局单例配置。
特点/场景 | Retrofit | RPC(如 gRPC、Dubbo 等) |
---|---|---|
主要用途 | 封装 HTTP API 调用,集成第三方服务 | 微服务之间的内部通信 |
协议层 | 基于 HTTP/HTTPS(REST 风格) | 基于 TCP/HTTP2,自定义协议或 Protobuf |
数据序列化 | JSON(默认) | Protobuf / Thrift / Avro(更高效) |
典型应用场景 | 第三方 REST API、外部服务调用 | 微服务架构、跨语言调用、内部高性能通信 |
调用方式 | 支持同步/异步,声明式接口 | 支持同步/异步、流式调用,多语言 SDK |
性能特点 | 依赖 HTTP/JSON,序列化开销较大 | 高吞吐量、低延迟,序列化高效 |
易用性 | 简单,代码少,学习成本低 | 需要服务框架支持,学习/配置成本较高 |
支付宝下单沙箱
https://open.alipay.com/develop/sandbox/app
读取本地配置文件。
@Data
@Component
@ConfigurationProperties(prefix = "alipay", ignoreInvalidFields = true)
public class AliPayConfigProperties {
// 「沙箱环境」应用ID - 您的APPID,收款账号既是你的APPID对应支付宝账号。获取地址;https://open.alipay.com/develop/sandbox/app
private String appId;
// 「沙箱环境」商户私钥,你的PKCS8格式RSA2私钥
private String merchantPrivateKey;
// 「沙箱环境」支付宝公钥
private String alipayPublicKey;
// 「沙箱环境」服务器异步通知页面路径
private String notifyUrl;
// 「沙箱环境」页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
private String returnUrl;
// 「沙箱环境」
private String gatewayUrl;
// 签名方式
private String signType = "RSA2";
// 字符编码格式
private String charset = "utf-8";
// 传输格式
private String format = "json";
}
创建alipay客户端。
@Configuration
public class AliPayConfig {
@Bean("alipayClient")
public AlipayClient alipayClient(AliPayConfigProperties properties) {
return new DefaultAlipayClient(properties.getGatewayUrl(),
properties.getAppId(),
properties.getMerchantPrivateKey(),
properties.getFormat(),
properties.getCharset(),
properties.getAlipayPublicKey(),
properties.getSignType());
}
}
公众号扫码登录流程
https://mp.weixin.qq.com/debug/cgi-bin/sandboxinfo?action=showinfo&t=sandbox/index 微信开发者平台。
微信登录时,需要调用微信提供的接口做验证,使用Retrofit
场景:用微信的能力来替你的网站做“扫码登录”或“社交登录”,代替自己写一整套帐号/密码体系。后台只需要基于 openid
做一次性关联(比如把某个微信号和你系统的用户记录挂钩),后续再次扫码就当作同一用户;
1.前端请求二维码凭证
-
用户点击“扫码登录”,前端向后端发
GET /api/v1/login/weixin_qrcode_ticket
。 -
后端获取 access_token 1.先尝试从本地缓存(如 Guava Cache)读取
access_token
; 2.若无或已过期,则请求微信接口:GET https://api.weixin.qq.com/cgi-bin/token ?grant_type=client_credential &appid={你的 AppID} &secret={你的 AppSecret}
微信返回
{ "access_token":"ACCESS_TOKEN_VALUE", "expires_in":7200 }
,后端缓存这个值(有效期约 2 小时)。 -
后端利用
access_token
创建二维码 ticket,返给前端。(每次调用微信会返回不同的ticket)
2.前端展示二维码
- 前端根据
ticket
生成二维码链接:https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket={ticket}
3.微信回调后端
- 用户确认扫描后,微信服务器向你预先配置的回调 URL(如
POST /api/v1/weixin/portal/receive
)推送包含ticket
和openid
的消息。 - 后端:将
ticket → openid
存入缓存(openidToken.put(ticket, openid)
);调用sendLoginTemplate(openid)
给用户推送“登录成功”模板消息(手机公众号上推送,非网页)
4.前端获知登录结果
- 轮询方式:生成二维码后,前端每隔几秒向后端
check_login
接口发送ticket
来验证登录状态,后端查缓存来判断ticket
对应用户是否成功登录。 - 推送方式:前端通过 WebSocket/SSE 建立长连接,后端回调处理完成后直接往该连接推送登录成功及 JWT。
浏览器指纹获取登录ticket
在扫码登录流程的基础上改进!!!
目的:把「这张二维码/ticket」严格绑在发起请求的那台浏览器上,防止别的设备或会话拿到同一个 ticket 就能登录。
1.生成指纹
前端在用户打开「扫码登录页」时,先用 JS/浏览器 API(比如 User-Agent、屏幕分辨率、插件列表、Canvas 指纹等)算出一个唯一的浏览器指纹 fp
。
2.获取 ticket 时携带指纹
前端发起请求:
GET /api/v1/login/weixin_qrcode_ticket_scene?sceneStr=<fp>
后端执行:
String ticket = loginPort.createQrCodeTicket(sceneStr);
sceneTicketCache.put(sceneStr, ticket); // 把 fp→ticket 映射进缓存
3.扫码后轮询校验
前端轮询:传入 ticket
和 sceneStr
指纹
GET /api/v1/login/check_login_scene?ticket=<ticket>&sceneStr=<fp>
后端逻辑(简化):
// 1) 验证拿到的 sceneStr(fp) 对应的 ticket 是否一致
String cachedTicket = sceneTicketCache.getIfPresent(sceneStr);
if (!ticket.equals(cachedTicket)) {
// fp 不匹配,拒绝
return NO_LOGIN;
}
// 2) 再看 ticket→openid 有没有被写入(扫码并回调后,saveLoginState 会写入)
String openid = ticketOpenidCache.getIfPresent(ticket);
if (openid != null) {
// 同一浏览器,且已扫码确认,返回 openid(或 JWT)
return SUCCESS(openid);
}
return NO_LOGIN;
4.回调时保存登录状态
当用户扫描二维码,微信会回调你预定的接口地址,拿到 ticket
、openid
后,调用:
ticketOpenidCache.put(ticket, openid); // 保存 ticket→openid
注意 ticketOpenidCache
和 sceneTicketCache
一般是一个Cache Bean,这里只是为了更清晰。
安全性提升
- 防止“票据劫持”:别人就算截获了这个 ticket,想拿去自己那台机器上轮询也不行,因为指纹对不上。
- 防止多人共用:多个人在不同设备上同时扫同一个码,只有最先发起获取 ticket 的那台浏览器能完成登录。
独占锁和无锁化场景(防超卖)
目标: 保证 库存数量的正确性 —— 不能出现“明明只有 10 件商品,却卖出去 11 件”的情况。
典型问题场景:
- 秒杀/拼团/抢购,高并发请求瞬间打到库存。
- 多个并发事务都认为“库存足够”,于是都扣减成功。
独占锁
适用场景
- 定时任务互备 多机部署时,确保每天只有一台机器在某个时间点执行同一份任务(如数据清理、报表生成、邮件推送等)。
@Scheduled(cron = "0 0 0 * * ?")
public void exec() {
// 获取锁句柄,并未真正获取锁
RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec");
try {
//尝试获取锁 waitTime = 3:如果当前锁已经被别人持有,调用线程最多等待 3 秒去重试获取;leaseTime = 0:不设过期时间,看门狗机制
boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS);
if (!isLocked) return;
Map<String, Integer> result = tradeSettlementOrderService.execSettlementNotifyJob();
log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result));
} catch (Exception e) {
log.error("定时任务,回调通知拼团完结任务失败", e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
无锁化并发控制(法一)
目标:在万级并发下保证 不超卖、可退单补量、团长也算库存,且不引入 JVM 级互斥锁。
角色 | Redis Key | 含义 | 变化方式 |
---|---|---|---|
计数器 | teamOccupiedStockKey |
已占用名额(仅团员) | INCR |
退单补量 | recoveryTeamStockKey |
退回名额(累加) | 退单环节 INCRBY |
配额上限 | target |
团长 + 团员 的最大名额 | 配置 |
1. Lua 原子脚本
-- KEYS[1] = teamOccupiedStockKey
-- ARGV[1] = target(含团长)
-- ARGV[2] = recoveryCount
local limit = tonumber(ARGV[1]) + tonumber(ARGV[2])
local v = redis.call('INCR', KEYS[1]) + 1 -- +1 把团长补进去
if v > limit then
redis.call('DECR', KEYS[1]) -- 回滚
return 0 -- 告诉调用方名额已满
else
return v -- 抢到的序号(含团长)
end
- 原子性:
INCR → 判断 → DECR
全在一条脚本里,Redis 单线程保证不会被并发打断。 - +1 偏移:计数器只统计团员,每次 +1 把团长补进去,对比对象与
target
同维度。(redis中的teamOccupiedStockKey
的值比真实锁单量少1,是正常的,因为redis中只存了团员的锁单,团长是在代码逻辑中手动+1的) - 退单补量:
limit = target + recoveryCount
,退单线程把名额写回recoveryTeamStockKey
后,下一次抢单自然放量。
无锁化并发控制(法二)
@Override
public boolean occupyTeamStock(String teamOccupiedStockKey, String recoveryTeamStockKey, Integer target, Integer validTime) {
// 获取失败恢复量
Long recoveryCount = redisService.getAtomicLong(recoveryTeamStockKey);
recoveryCount = null == recoveryCount ? 0 : recoveryCount;
// 1. incr 得到值,与总量和恢复量做对比。恢复量为系统失败时候记录的量。
// 2. 从有组队量开始,相当于已经有了一个占用量,所以要 +1,因为团长开团的时候teamid为null,但事实上锁单已经有一单了。
long occupy = redisService.incr(teamOccupiedStockKey) + 1; //取teamOccupiedStockKey的值,先自增,再返回;类似++i
if (occupy > target + recoveryCount) {
repository.recoveryTeamStock(recoveryTeamStockKey);
return false;
}
// 1. 给每个产生的值加锁为兜底设计,虽然incr操作是原子的,基本不会产生一样的值。但在实际生产中,遇到过集群的运维配置问题,以及业务运营配置数据问题,导致incr得到的值相同。
// 2. validTime + 60分钟,是一个延后时间的设计,让数据保留时间稍微长一些,便于排查问题。
String lockKey = teamOccupiedStockKey + Constants.UNDERLINE + occupy;
Boolean lock = redisService.setNx(lockKey, validTime + 60, TimeUnit.MINUTES);
if (!lock) {
log.info("组队库存加锁失败 {}", lockKey);
}
return lock;
}
这里teamOccupiedStockKey
和recoveryTeamStockKey
都是只增不减的,如果抢占失败的,直接对recoveryTeamStockKey+1
。
recoveryTeamStockKey
还有通过参与拼团的人退单来+1
为什么对teamOccupiedStockKey-1
必须要用Lua脚本?
long occupy = redisService.incr(teamOccupiedStockKey);
if (occupy > target + recoveryCount) {
redisService.decr(teamOccupiedStockKey); // 回滚
return false;
}
如果直接JAVA代码中写两个逻辑,风险在:
INCR
和 DECR
是两条独立命令。
在 INCR
和 DECR
之间的时间窗口里,其他请求可能已经拿到 INCR
的结果。
多个失败请求并发执行,会出现“多次 DECR 把计数扣过头”的问题,导致库存虚减甚至超卖。
本项目采用第二种方法!!!
极端兜底锁
String lockKey = teamOccupiedStockKey + Constants.UNDERLINE + occupy;
Boolean lock = redisService.setNx(lockKey, validTime + 60, TimeUnit.MINUTES);
if (!lock) {
log.info("组队库存加锁失败 {}", lockKey);
}
- 解决极小概率 相同序号并发撞号 的问题(
- TTL 比业务
validTime
多留 60 min,方便排查。 - 订单关闭/失效时记得删除对应
lockKey
(目的是设置了过期时间为拼团有效期+60分钟),防止 Redis 小键堆积。
怎样的情况可能导致并发撞号?
t1:客户端发起 INCR
,Redis 内存里变成 101,返回 101(内存里修改完成就返回,不等落盘);
t1+Δ:Redis 还没把这条写刷到磁盘(AOF 还在缓冲里 / OS 还没 fsync);
t2:Redis 故障崩溃 → 只剩下落盘的旧状态(100);
t3:重启后加载旧数据,从 100 再开始递增 → 101(和之前用过的号重复)。
持久化粒度取决于 appendfsync
配置:
always
:每次写都 fsync → 最安全,最慢。everysec
(默认):每秒 fsync → 可能丢 1 秒内的数据。no
:完全交给操作系统 → 性能好,但可能丢几十秒。
典型适用场景
- 电商秒杀 & 拼团抢购 万级甚至十万级并发下不适合所有请求都排队,必须让绝大多数请求用原子计数并行处理。
- 抢票系统 票务分配、座位预占,都讲究“先到先得”+“补偿回退”,不能用一把大锁。
本项目有两层防护:
页面/接口层校验
-
进入拼团展示页,前端先查当前正在拼团的信息(可以看到XX拼团还剩X人)。
-
用户点击下单时,调用锁单接口,由于页面刷新的时间戳可能滞后,出现“页面显示还差 X 人,但点进去已满员”的情况时,返回"拼团组
队完结,锁单量已达成"。但仍有可能出现:数据库显示有剩余名额,但高并发下,可能有多个用户透过这层防护!!!
真正的并发保证:Redis 原子 INCR
+ 补偿计数(recoveryTeamStockKey
) + 兜底 SETNX
(法二)
生活例子理解
假设你有一个限量商品,每个商品有一个唯一的编号,假设这些商品编号为 1、2、3、4、5(总共 5 个)。这些商品被分配给用户,每个用户会抢一个编号。每个用户成功抢到一个商品后,系统会在库存中占用一个编号。
抢购过程:
- 有 5 个商品编号(1-5),这些编号是库存量。
- 每个用户请求一个商品编号,系统会给用户分配一个编号(这个过程就像是自增占用量的过程)。
- 如果用户请求的编号超过了现有库存的最大编号(5),则说明没有商品可以分配给该用户,用户抢购失败。
- 如果有多个用户抢同一个编号(例如都想抢到编号 1 的商品),系统通过“分布式锁”来保证只有一个用户能成功抢到编号 1,其他用户则失败。
Supplier<T>
Supplier<T>
是 Java 8 提供的一个函数式接口
@FunctionalInterface
public interface Supplier<T> {
/**
* 返回一个 T 类型的结果,参数为空
*/
T get();
}
任何“无参返回一个 T 类型对象”的代码片段(方法引用或 lambda)都可以当成 Supplier<T>
来用。
作用
1.延迟执行
把“取数据库数据”这类开销大的操作,包装成 Supplier<T>
传进去;只有真正需要时(缓存未命中),才触发执行。
// 缓存未命中时,才调用 supplier.get() 执行数据库查询
T dbResult = dataFetcher.get();
2.解耦逻辑
缓存组件不关心数据如何获取,只负责缓存策略;调用方通过 Supplier
提供数据获取逻辑。
public <T> T getFromCacheOrDb(String key, Supplier<T> dataFetcher) { ... }
3.重用性高
同一个缓存-回源模板方法可以服务于任何返回 T
的场景,既可以查 User
,也可以查 Order
、List<Product>
……
// 查询用户
User user = getFromCacheOrDb("user:123", () -> userDao.findById(123));
// 查询订单列表
List<Order> orders = getFromCacheOrDb("orders:456", () -> orderDao.listByUserId(456));
分布式限流(AOP + Redisson 实现)+黑名单

核心思路
动态开关管理
- 使用
@DCCValue("rateLimiterSwitch:open")
从配置中心动态注入全局开关,支持热更新。 - 当开关为
"close"
时,直接放行所有请求,切面不再执行限流逻辑。
AOP 切面拦截
- 通过自定义注解
@RateLimiterAccessInterceptor
标记需要限流的方法。 - 注解参数
key
用于指定限流维度(如userId
表示按用户限流,all
表示全局限流)。 - 切面在运行时解析这个字段的值,动态生成 Redis 限流器 Key,例如:
//添加拦截注解
@RateLimiterAccessInterceptor(key = "userId", permitsPerSecond = 5, fallbackMethod = "fallback")
public void order(String userId) {...}
请求1: userId=U12345 → Redis Key: rl:limiter:U12345
请求2: userId=U67890 → Redis Key: rl:limiter:U67890
反射的应用:
获取限流维度 Key(如 userId)
-
切面会从方法参数对象中反射查找
userId
字段:private String extractField(Object obj, String name) { Field field = getFieldByName(obj, name); field.setAccessible(true); Object v = field.get(obj); return v != null ? v.toString() : null; }
-
调用降级方法
当请求被限流或进入黑名单时,切面会通过反射执行注解里指定的
fallbackMethod
:Method method = jp.getTarget().getClass() .getMethod(fallbackMethod, ms.getParameterTypes()); return method.invoke(jp.getTarget(), jp.getArgs());
限流与黑名单
- 使用
RRateLimiter
实现分布式令牌桶,每秒放入permitsPerSecond
个令牌。 - 取不到令牌时:
- 如果配置了
blacklistCount
,用RAtomicLong
记录该 Key 的拒绝次数; - 拒绝次数超限后,将 Key 加入黑名单 24 小时。(
rl:bl:keyAttr
中存放着24小时内该用户超限次数,如果大于blacklistCount
,则黑名单启动拦截;而不是指某个rl:bl:keyAttr
存在就拦截,还是要比较次数的!)
- 如果配置了
- 命中黑名单或限流时,调用注解里的
fallbackMethod
执行降级逻辑。
注意这里有两个key:
rl:bl:keyAttr
,设置了24小时的过期时间,里面存着24小时内xx用户超限的次数。
rl:limiter:keyAttr
未设置过期时间(xx用户随时来,随时限流)
令牌桶算法(Token Bucket)
- 工作原理:按固定速率往桶里放“令牌”(tokens),例如每秒放 N 个。每次请求到达时,必须先从桶中“取一个令牌”,才能通过;如果取不到,则拒绝或降级。
- 特点:支持流量平滑释放和突发流量吸纳,桶最多能存储 M 个令牌。
方法调用
↓
AOP 切面拦截(匹配 @RateLimiterAccessInterceptor)
↓
检查全局限流开关(@DCCValue 注入)
↓
解析注解里的 key → 获取对应参数值(如 userId)
↓
黑名单检查(RAtomicLong)
↓
分布式令牌桶限流(RRateLimiter.tryAcquire)
↓
├─ 成功 → 执行目标方法
└─ 失败 → 累加拒绝计数 & 调用 fallbackMethod
对比维度 | 本地限流 | 分布式限流 |
---|---|---|
实现复杂度 | 低:直接用 Guava RateLimiter ,几行代码即可接入 |
中高:依赖 Redis/Redisson,需要注入客户端并管理限流器 |
性能开销 | 极低:全程内存操作,纳秒级延迟 | 中等:每次获取令牌需网络往返,存在 RTT 延迟 |
限流范围 | 单实例:仅对当前 JVM 有效,多实例互不影响 | 全局:多实例共享同一套令牌桶,合计速率可控 |
状态持久化 & 容错 | 无:服务重启后状态丢失;实例宕机只影响自身 | 有:Redis 存储限流器与黑名单,可持久化;需保证 Redis 可用性 |
目前本项目采用 分布式限流,使用 Redisson 实现跨实例令牌桶,确保全局限流控制。
防止重复下单
目标: 保证 同一个用户 在某一时间段/某一业务维度下(比如一个拼团活动、一个商品),只生成一条有效订单。
典型问题场景:
- 用户在页面疯狂点击“立即购买”。
- 用户在弱网环境下重复请求。
- 用户恶意构造多条请求。
常见方案:
- 前端限制:按钮禁用、loading 状态。
- 后端约束
- 复合唯一索引(
userId+goodId+activityId
),天然保证“一人一单”; - 幂等 Key(
idempotencyKey
或外部交易号),请求携带 Key,数据库唯一索引约束避免重复写入; - (可选)分布式锁,避免并发下插入多条记录。
- 复合唯一索引(
本质:限制用户的重复请求,确保“一次支付流程只产生一条订单”。
如何实现幂等性
保证业务的幂等性,核心思想就是:对于同一个操作的多次重复请求,其产生的结果应该与仅执行一次请求的结果完全相同。
1)生成幂等 Key
- 前端进入支付流程时调用接口(
GET /api/idempotency-key
),后端生成全局唯一 ID(UUID 或雪花 ID)返回给前端; - 或者外部系统(如小商城)传来唯一的外部交易单号(
out_trade_no
),天生作为幂等Key。
请求携带幂等 Key
- 用户点击“下单”时,调用
/create_pay_order
接口,需在请求体中附带idempotencyKey
; - 服务端根据该 Key 判断:若数据库中已有相同
idempotency_key
,直接返回该订单,否则创建新订单。
2)唯一索引
- 在订单表中新增
idempotency_key
列,并对其增加唯一索引; - 双重保障:前端重复发送同一 Key,也仅能插入一条记录,彻底避免重复下单。
3)Redis 分布式锁
根据 idempotencyKey
在 Redis 里尝试获取一个分布式锁(比如 SETNX idempotencyKey
,或者 Redisson 的 RLock.tryLock()
)
拿到锁的请求:先查数据库是否已有相同 idempotencyKey
的订单;
如果没有 → 创建新订单并写入数据库;
如果已有 → 直接返回该订单。
两种实践:
只 INSERT:如果 (user_id, idempotency_key)
已经存在,会报错 Duplicate entry
。
先查后插:需要 SELECT
→ 再 INSERT
,下订单时额外先查一次消耗资源,且并发下有竞态,还要兜异常。
推荐两者结合:
INSERT INTO orders (user_id, idempotency_key, ...)
VALUES (:uid, :key, ...)
ON DUPLICATE KEY UPDATE id = LAST_INSERT_ID(id);
SELECT LAST_INSERT_ID() AS order_id;
情况 A:key 第一次出现
INSERT
成功,生成新的自增id
;ON DUPLICATE
分支不会触发;LAST_INSERT_ID()
= 新生成的订单 ID。
情况 B:key 已存在
INSERT
命中唯一约束,触发ON DUPLICATE KEY UPDATE
分支;- 这里我们写的是
id = LAST_INSERT_ID(id)
,相当于把已存在行的 id“赋值给自己”,不改变数据,但会更新会话里的LAST_INSERT_ID()
值; - 因此
SELECT LAST_INSERT_ID()
返回的是已有订单的 id。
还可以用Redis SETNX
(原子操作,保证只处理一次)。 来查消息是否被处理过
RPC微服务调用
RPC(Remote Procedure Call,远程过程调用)
就像调用本地方法一样调用远程服务的方法。
框架(Dubbo、gRPC、Thrift 等)会帮你处理 序列化、网络通信、连接管理、负载均衡 等细节。
开发者只需要写接口 + 实现类,调用方直接调用接口,RPC 框架在背后“悄悄”完成远程调用。
实现步骤
1.父Pom统一版本
<!-- 统一锁版本,避免不同模块写不同小版本 -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-bom</artifactId>
<version>3.3.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
2.pay-mall-infrustruct(Consumer)group-buying-sys-trigger (Provider)引入依赖
<dependencies>
<!-- Dubbo 核心 + Spring Boot 自动装配 -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
</dependency>
<!-- Nacos 注册中心扩展 -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-nacos</artifactId>
</dependency>
</dependencies>
3.部署nacos(详见微服务笔记)
4.配置注册(消费者、生产者都要配)
dubbo:
application:
name: group-buy-market-service # 换成各自服务名
registry:
address: nacos://localhost:8848 # 远程环境写内网地址
# username/password 如果 Nacos 开了鉴权
protocol:
name: dubbo
port: 20880 # 生产者开放端口;消费者可不写
consumer:
timeout: 3000 # 毫秒
check: false # 忽略启动时服务是否可用
5.开启 Dubbo 注解扫描
在消费者、生产者的主启动类上加,设置正确的包名,让 @DubboService
和 @DubboReference
被 Spring+Dubbo 识别和处理
@SpringBootApplication
@EnableDubbo(scanBasePackages = "edu.whut")
public class Application { … }
6.在Dubbo RPC调用中,DTO对象需要在网络中进行传输,因此它们必须实现 java.io.Serializable
接口:
/**
* 用户信息请求对象
*/
@Data
public class UserRequestDTO implements Serializable { // 实现 Serializable
private static final long serialVersionUID = 1L; // 添加 serialVersionUID,用于版本控制
// 用户ID
private String userId;
// 用户名
private String userName;
// 邮箱
private String email;
}
7.定义服务接口:
服务接口定义了服务提供者能够提供的功能以及服务消费者能够调用的方法。这个接口必须是公共的,并且通常放置在一个独立的 api
模块中。供服务提供者和消费者共同依赖。
/**
* 用户服务接口
*/
public interface IUserService {
/**
* 根据用户ID获取用户信息
* @param requestDTO 用户请求对象
* @return 用户响应对象
*/
UserResponseDTO getUserInfo(UserRequestDTO requestDTO);
/**
* 创建新用户
* @param requestDTO 用户请求对象
* @return 操作结果
*/
String createUser(UserRequestDTO requestDTO);
}
8.服务提供者 (Provider) 实现并暴露服务
在服务提供者应用中,实现上述定义的服务接口,并使用 @DubboService
注解将其暴露为Dubbo服务。可以放在trigger/rec包下。
/**
* 用户服务实现类
*/
@DubboService(version = "1.0.0", group = "user-service") // 关键注解:暴露Dubbo服务
@Service // 也可以同时是Spring的Service
public class UserServiceImpl implements IUserService {
@Override
public UserResponseDTO getUserInfo(UserRequestDTO requestDTO) {
System.out.println("收到获取用户信息的请求: " + requestDTO.getUserId());
// 模拟业务逻辑
UserResponseDTO response = new UserResponseDTO();
response.setUserId(requestDTO.getUserId());
response.setUserName("TestUser_" + requestDTO.getUserId());
response.setEmail("test_" + requestDTO.getUserId() + "@example.com");
return response;
}
@Override
public String createUser(UserRequestDTO requestDTO) {
System.out.println("收到创建用户的请求: " + requestDTO.getUserName());
// 模拟业务逻辑
return "User " + requestDTO.getUserName() + " created successfully.";
}
}
9.服务消费者 (Consumer) 引用远程服务
在服务消费者应用中,通过 @DubboReference
注解引用远程Dubbo服务。Dubbo 会自动通过注册中心查找并注入对应的服务代理。
/**
* 用户API控制器
*/
@RestController
public class UserController {
@DubboReference(version = "1.0.0", group = "user-service") // 关键注解:引用Dubbo服务
private IUserService userService;
@GetMapping("/user/info")
public UserResponseDTO getUserInfo(@RequestParam String userId) {
UserRequestDTO request = new UserRequestDTO();
request.setUserId(userId);
return userService.getUserInfo(request);
}
@GetMapping("/user/create")
public String createUser(@RequestParam String userName, @RequestParam String email) {
UserRequestDTO request = new UserRequestDTO();
request.setUserName(userName);
request.setEmail(email);
return userService.createUser(request);
}
}
RPC:同步调用、强一致、快速响应,比如pay-mall调用拼团系统的拼团交易锁单 、营销结算、营销拼团退单
HTTP:本系统调用微信支付这种第三方接口。
MQ:异步解耦、削峰填谷、最终一致性,比如退单消息,pay-mall调用营销拼团退单接口后,将订单设置为'待退单状态',然后拼团系统退单完成后发送'退单完成'消息,pay-mall接收继续做最终的退单处理。
怎么确保这个微服务调用的可靠性?
如果小型支付商城调用拼团失败,有两种情况:
1.网络异常、超时,dubbo框架会抛一个异常,可以特别处理,对其进行重试,设置最大重试次数,以及指数退避算法;这里要求锁单做幂等校验!
2.业务异常,不重试,可能是因为用户参与次数已达上限、活动过期之类的。
日志系统
输出流向一览
输出到3个地方:控制台、本地文件、ELK日志(服务器上内存不足无法部署!)
日志级别 | 控制台 | 本地文件(异步) | Logstash (TCP) |
---|---|---|---|
TRACE/DEBUG | — | — | — |
INFO | ✔ | log_info.log |
✔ |
WARN | ✔ | log_info.log``log_error.log |
✔ |
ERROR/FATAL | ✔ | log_info.log``log_error.log |
✔ |
注意:实际写文件时,都是通过 ASYNC_FILE_INFO/ERROR 两个异步 Appender 执行,以免日志写盘阻塞业务线程。
ELK日志系统
本地文件每台机器都会在自己 /data/log/...
目录下滚动输出自己的日志,互相之间不会合并。
如果你希望跨多台服务器统一管理,就需要把日志推到中央端——ELK日志系统
ELK=Elasticsearch(存储&检索)+ Logstash(采集&处理)+ Kibana(可视化)
docker-compose.yml:
version: '3'
services:
elasticsearch:
image: elasticsearch:7.17.28
ports: ['9201:9200','9300:9300']
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
volumes:
- ./data:/usr/share/elasticsearch/data
logstash:
image: logstash:7.17.28
ports: ['4560:4560','9600:9600']
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
environment:
- LS_JAVA_OPTS=-Xms1g -Xmx1g
kibana:
image: kibana:7.17.28
ports: ['5601:5601']
environment:
- elasticsearch.hosts=http://elasticsearch:9200
networks:
default:
driver: bridge
kibana配置:
#
# ** THIS IS AN AUTO-GENERATED FILE **
#
# Default Kibana configuration for docker target
server.host: "0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ] # 记得修改ip
monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"
logstash配置:
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
type => "info"
}
}
filter {}
output {
elasticsearch {
action => "index"
hosts => "es:9200"
index => "group-buy-market-log-%{+YYYY.MM.dd}"
}
}
自己的项目:
<!-- 上报日志;ELK -->
<springProperty name="LOG_STASH_HOST" scope="context" source="logstash.host" defaultValue="127.0.0.1"/>
<!--输出到logstash的appender-->
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--可以访问的logstash日志收集端口-->
<destination>${LOG_STASH_HOST}:4560</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.3</version>
</dependency>
使用
检查索引:curl http://localhost:9201/_cat/indices?v3
打开 Kibana:浏览器访问 http://localhost:5601
,新建 索引模式(如 app-log-*
),即可在 Discover/Visualize 中查看与分析日志。