chenjunda
长期主义

redis学习Day4

2025-11-27 redis

使用redis中的stream列表当消息队列进行异步下单

1. 线程池与后台线程初始化

1
2
3
4
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

@PostConstruct = Spring 初始化这个类之后立刻执行该方法。

这里做了一件事:

👉 启动一个后台线程不停监听 Redis Stream,处理订单

创建线程池:

1
2
private static final ExecutorService SECKILL_ORDER_EXECUTOR =
Executors.newSingleThreadExecutor();

使用单线程是为了避免多个线程并发处理相同用户的订单。


⭐ 1. 1秒杀系统为什么要后台线程?

在你的代码中,秒杀订单的处理方式是:

1
2
3
4
5
用户点击秒杀按钮  

写入 Redis Stream(速度快)

后台线程慢慢从 Stream 取消息并处理订单

也就是说:

👉 用户线程不直接去操作数据库
👉 后台线程负责真正写数据库、扣库存

这个机制叫 异步下单,能有效抗住高并发。

要实现“后台慢慢处理”,需要一个 常驻的后台线程


⭐ 1.2. 为什么不用一个普通线程,而是用线程池?

你可能会问:

直接 new Thread() 不行吗?

可以,但不推荐。原因如下:


✔ 1.2.1 线程池更专业、更安全

如果使用:

1
new Thread(new VoucherOrderHandler()).start();

存在下面问题:

❌ 线程意外结束,无法自动恢复

比如抛异常导致线程挂掉 → 系统无人处理订单,严重事故!

线程池则会:

✔ 自动管理线程
✔ 自动捕获错误
✔ 线程意外死亡后自动重启新的线程(具体取决于池类型)


✔ 1.2.2 newSingleThreadExecutor() = 永久单线程处理订单

你用的是:

1
Executors.newSingleThreadExecutor();

它有两个特点:

特点一:永远只有一个线程

保证:

✔ 同一时间只有 1 个消费者在操作
✔ 不会出现并发写数据库导致超卖
✔ 顺序消费 Redis Stream

这对秒杀系统非常关键。


特点二:线程挂了会自动拉起新线程

比如:

1
throw new RuntimeException();

普通线程会死掉,而线程池会自动:

⚠️ 线程异常 → ❌线程结束 → ✔️ 自动新建线程替代 → 系统继续正常下单

这就是线程池的意义。


⭐ 1.3. 为什么是单线程线程池?

因为下订单要满足两个要求:


✔ 要求 1:顺序处理订单

例如:

  • 用户 1001
  • 同一时间点疯狂点击秒杀

如果多个线程同时处理会:

✔ 数据库并发访问
✔ 并发扣库存
✔ 并发下单(冲突)

使用 单线程

👉 所有订单 排队 执行
👉 顺序处理
👉 避免冲突


✔ 要求 2:防止并发扣库存、冲突下单

如果用多线程:

  • 两条消息可能被两个线程同时处理
  • 导致 库存减成负数
  • 导致用户 “一人多单”

而单线程 = 绝对安全。


⭐ 1.4. newSingleThreadExecutor() 与 new Thread 有什么区别?

写法 会怎样 适用场景
new Thread() 创建一次后就结束。异常会让线程死掉,没法自动恢复。 简单测试
newSingleThreadExecutor() 永远只有一个线程,异常时自动创建新线程继续执行。 需要长期稳定运行的后台任务

秒杀订单处理属于:

👉 必须长期稳定运行
👉 必须顺序执行
👉 不能崩溃

所以必须用 单线程线程池


⭐ 1.5. 总结(最重要,面试必背)

使用线程池的原因:

✔ 1. 需要后台线程长期运行(非用户线程)

✔ 2. 线程池能自动重启线程,避免线程崩溃导致系统停摆

✔ 3. 单线程池保证订单处理完全顺序,避免并发问题

✔ 4. 高并发场景下,订单要排队慢慢处理,所以单线程最合适

✔ 5. 比 new Thread 更安全、更专业、更不容易出 bug

2. Redis Stream 消息队列

队列名称:

1
private static final String queueName = "stream.orders";

后台线程的核心逻辑:

读取消息

1
2
3
4
5
stringRedisTemplate.opsForStream().read(
Consumer.from("g1","c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);

等价于:

1
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS stream.orders >

意思:

  • GROUP g1:消费者组

  • c1:消费者

  • BLOCK 1000:阻塞 1 秒

  • :读取未消费的新消息

转成 Java 对象

1
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap,new VoucherOrder(),true);

Stream 的消息读出来是 Map,这里转换为 VoucherOrder 对象。

执行订单逻辑 & ACK

1
2
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());

处理成功后必须 ACK,否则 Redis 会认为没有处理成功。

2.1.1先看这一段 Java 代码到底干了啥

1
2
3
4
5
6
List<MapRecord<String, Object, Object>> messageList =
stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);

我们分成 3 块来看:

  1. Consumer.from("g1", "c1")
  2. StreamReadOptions...
  3. StreamOffset.create(...)

1️⃣ Consumer.from(“g1”, “c1”)

1
Consumer.from("g1", "c1")

等价于命令里的:

1
XREADGROUP GROUP g1 c1 ...

解释:

  • "g1":消费者组名(group)
  • "c1":消费者名(consumer)

在 Redis Stream 的消费者组模型中:

  • 一个 Stream 可以有多个 消费者组(group)
  • 一个 中可以有多个 消费者(consumer)
  • 组控制“这条消息被哪个组消费了”,
  • 组里面的消费者控制“同一条消息在组内分配给哪个具体消费者”。

你这里:

  • 只用一个组:g1
  • 只用一个消费者:c1
  • 所以就是单线程顺序消费(配合单线程线程池)

2️⃣ StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1))

1
2
3
StreamReadOptions.empty()
.count(1)
.block(Duration.ofSeconds(1))

这是构造 XREADGROUP 的一些参数:

count(1)

  • 对应命令中的 COUNT 1
  • 代表:一次最多拿 1 条消息

👉 也就是说,这个线程是一条一条从 Redis 中取订单来处理。

block(Duration.ofSeconds(1))

  • 对应命令中的 BLOCK 1000
  • 表示:如果队列暂时没消息,就最多阻塞 1 秒钟等消息
  • 1 秒内如果有新消息来,就立刻返回
  • 超过 1 秒还没有消息 → 返回 null 或空列表 → 代码会 continue 再循环一次

好处:

  • 避免 while(true) 死循环 + 空转 CPU
  • 同时又能做到“有消息立刻处理”

3️⃣ StreamOffset.create(queueName, ReadOffset.lastConsumed())

1
StreamOffset.create(queueName, ReadOffset.lastConsumed())

等价于命令里的:

1
STREAMS stream.orders >

重点是这个:ReadOffset.lastConsumed() 对应的是 >

XREADGROUP 中,> 的含义是:

只读取这个消费者组还没有分配过的“新消息”

也就是:

  • 不会再去读已经给本组消费者分配过的旧消息
  • 只拿“流中最近新到的消息”,组里没人处理过的

简化理解:

  • > = “我要新消息”
  • 0 = “我要 PendingList 里没 ACK 的旧消息”(你在 handlePendingList() 就用了这个)

2.1.2把 Java 调用翻译成 Redis 命令

你的这一段 Java:

1
2
3
4
5
stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);

大致等价于:

1
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS stream.orders >

含义:

  • GROUP g1 c1:以消费者组 g1、消费者 c1 的身份读取
  • COUNT 1:最多 1 条
  • BLOCK 1000:最多阻塞 1000ms(1秒)
  • STREAMS stream.orders:从 stream.orders 这个 Stream 读
  • >:读这个组中“还没分配过的新消息”

2.1.3读出来后转成 Java 对象

1
2
3
MapRecord<String, Object, Object> record = messageList.get(0);
Map<Object, Object> messageMap = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);

分解解释:

1️⃣ MapRecord 是 Redis Stream 的消息封装

  • 一个 Stream 的 entry 就是一条消息
  • 里面本质上就是一个 Map,例如:
1
XADD stream.orders * userId 123 voucherId 5 orderId 777

读取出来就是一个:

1
2
3
4
5
Map<Object, Object> = {
"userId" -> 123,
"voucherId" -> 5,
"orderId" -> 777
}

2️⃣ BeanUtil.fillBeanWithMap(…)

1
2
VoucherOrder voucherOrder =
BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);

相当于干了这件事:

1
2
3
4
5
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(123L);
voucherOrder.setVoucherId(5L);
voucherOrder.setId(777L);
// ... 按字段名一个个注入

true 表示:

  • 支持忽略大小写
  • 尝试进行一些类型转换(字符串 → Long)

2.1.4执行订单逻辑 + ACK

1
2
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());

1️⃣ handleVoucherOrder(voucherOrder);

内部做的主要是:

  • 加 Redisson 锁:保证一个用户一人一单
  • 调用 proxy.createVoucherOrder() 写库 + 扣库存

2️⃣ acknowledge(queueName, "g1", record.getId())

等价于 Redis 命令:

1
XACK stream.orders g1 <messageId>

ACK 的作用:

  • 告诉 Redis:这条消息我已经处理完了,可以从 PendingList 中移除了
  • 如果不 ACK:
    • 消息会一直留在 PendingList 里
    • Redis 认为你“拿走了,但还没处理完”

这就和你 handlePendingList() 方法关联起来了👇


2.1.5Redis Stream “消费者组” 的特性(重点)

你已经在用消费者组(GROUP),非常重要。下面是它的几个核心概念:


1️⃣ 一个 Stream,多组消费

  • 同一个 Stream(比如 stream.orders)可以创建多个 group:
    • 比如 g1 专门处理秒杀订单
    • g2 专门做日志统计、埋点之类

不同组之间是 互不影响 的:

  • 消息写入一次
  • 每个组都可以各自消费一遍
  • 有点像“广播到多个组,每组自己维护自己的消费进度”

2️⃣ 组内多消费者,负载均衡

一个 group 里可以有多个 consumer,例如:

  • group: g1
  • consumer: c1, c2, c3

特性:

  • 消息会在组内消费者之间分配
  • 一条消息通常只会被组内一个 consumer 处理
  • 相当于做了“组内负载均衡”

你现在只用一个消费者 c1,所以:
👉 所有消息都由 c1 处理,顺序性最好。


3️⃣ Pending List(未确认消息列表)

对每一个 + 消费者,Redis 会维持一个 PendingList:

  • 里面放的是:已经分配给消费者,但还没 ACK 的消息

  • 也就是:

    已经通过 XREADGROUP 取走,但是没通过 XACK 确认处理完成

会出现这种情况:

  • 消费者处理过程中挂了,没来得及 ACK
  • 消息就永远待在 PendingList 里

所以你才需要 handlePendingList()补偿处理未 ACK 的消息


4️⃣ 为什么你读 PendingList 用的是 offset = “0”

对比一下代码:

读新消息用:

1
StreamOffset.create(queueName, ReadOffset.lastConsumed()) // 对应 >

读 PendingList 用:

1
StreamOffset.create(queueName, ReadOffset.from("0")) // 对应 0

在 XREADGROUP 中:

  • > :从“从未分配过的新消息”中读取
  • 0 :从这个组的 PendingList 中读取“还没 ACK 的旧消息”

所以:

  • 正常逻辑:> → 新消息
  • 异常补偿:0 → PendingList 中未确认的消息

这就是你两个不同 read 调用的核心区别。


5️⃣ 消费者组的几个关键好处

  1. 消息不丢:
    • 即使消费者挂了,消息还在 PendingList
    • 你可以通过补偿逻辑重新处理
  2. 支持广播/多组消费:
    • 不同 group 可以各自维护 offset
  3. 支持水平扩展:
    • 一个 group 下可以增加多个 consumer 做并发消费
    • 你现在为了顺序性只用了一个
  4. 可查询消费状态:
    • 可以用 XPENDING 看 PendingList 中有哪些消息没处理完

2.1.6把整体流程再串一下(结合你的代码)

  1. Lua 脚本判断库存、下单资格,写入 Redis Stream → stream.orders
  2. 后台线程通过消费者组 g1、消费者 c1 使用 XREADGROUP 读取新消息(>
  3. 读到消息后:
    • 转为 VoucherOrder
    • 加锁 + 调用 createVoucherOrder 扣库存 + 写库
    • 成功后 XACK 确认
  4. 如果在第 3 步中发生异常,没走到 ACK:
    • 消息会留在 PendingList 中
    • 异常 catch 时调用 handlePendingList()
    • 通过 XREADGROUP ... 0(ReadOffset.from(“0”))从 PendingList 重新取消息处理
    • 处理成功再 ACK

2.3. Pending List 异常补偿

如果处理过程中抛异常,当前消息不会 ACK,会进入 PendingList。

代码会专门处理 PendingList:

1
handlePendingList();

PendingList 的特性:

  • 存放未确认(ACK)的消息
  • 防止消息丢失

实现逻辑:

  • 读取 PendingList(从 ID=0 开始)
  • 如果没消息就退出
  • 有消息 → 转对象 → 处理 → ACK

2.4. 创建订单前的分布式锁

在消费消息时:

1
2
3
4
5
6
RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("一人只能下一单");
return;
}

保证同一个用户不会并发生成多个订单。


2.5. Lua 脚本判断下单资格(核心)

Lua 脚本路径:

1
SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));

调用方式:

1
2
3
4
5
6
7
result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);

脚本返回值含义:

返回值 意义
0 成功,允许下单
1 库存不足
2 重复下单

Lua 脚本一般会做:

  1. 判断库存是否 > 0
  2. 判断用户是否已经抢过
  3. 给库存 -1
  4. 把订单信息写入 Redis Stream(上面消费那里取的就是这个)

2.6. 创建订单(真正写数据库)

这是后台线程调用的:

1
proxy.createVoucherOrder(voucherOrder);

里面做:


(1)检查是否一人一单

1
2
3
4
5
Long count = this.count(
new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
);
if (count >= 1) return;

(2)扣减库存(数据库操作)

1
2
3
4
5
6
seckillVoucherService.update(
new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1")
);

这里使用了 乐观锁gt(SeckillVoucher::getStock, 0),保证库存不会减成负数。


(3)保存订单

1
this.save(voucherOrder);

2.7. AOP 代理对象的作用

关键代码:

1
2
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
this.proxy = proxy;

为什么需要代理?

👉 只有通过代理对象调用事务方法才能让 @Transactional 生效(CGLIB 或 JDK 代理)。

因为:

  • 内部调用方法(this.createVoucherOrder)不会触发 AOP
  • 所以事务会失效

所以:

  • 先拿到代理对象保存起来
  • 以后后台线程调用 proxy.createVoucherOrder(),事务才能生效

3【项目启动时】执行的流程(最重要)

当 Spring Boot 启动并加载 Bean 时,你的服务类会按下面顺序执行:


🟦 3.1. 1Spring 创建 VoucherOrderServiceImpl Bean 实例

Spring扫描到 @Service,创建实例:

1
public class VoucherOrderServiceImpl extends ServiceImpl ...

这一步仅仅是实例化对象,并注入依赖:

  • ISeckillVoucherService
  • RedisIdWorker
  • StringRedisTemplate
  • RedissonClient

🟦 3.1.2. Spring 注入被 @Resource 标注的字段**

例如:

1
2
@Resource
private ISeckillVoucherService seckillVoucherService;

全部依赖都被注入好。

此时对象已经准备好,但还没进入生命周期回调。


🟦 3.1.3 Spring AOP 创建代理对象(非常重要)

因为你的类内部的某些方法用到了:

1
@Transactional

为了让事务生效,Spring 会给这个 Bean 创建 AOP 代理(通常是 CGLIB)。

此时:

  • 你对 this.xxx() 的直接调用 不会触发事务
  • 只有通过代理对象调用才会有事务

这个原理是你后面用:

1
2
AopContext.currentProxy();
this.proxy = proxy;

的原因。


🟦 3.1.4. 调用 @PostConstruct 的 init() 方法

启动后台线程的逻辑就在这里执行:

1
2
3
4
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

📌 这个方法是在 Bean 创建完成 + 依赖注入完成 + AOP 代理创建完成后执行的

这一点很关键,因为:

✔ 此时所有依赖都可以使用

✔ AOP 代理(事务增强)也已经准备好

✔ 所以启动后台线程是安全的

然后线程池开始运行 VoucherOrderHandler


🟦 3.1.5. 后台线程开始运行:VoucherOrderHandler.run()

你启动的是一个 永不结束的后台线程

1
2
3
4
5
6
7
8
9
10
while (true) {
try {
// 读取 Redis Stream 的新消息
// 转订单对象
// 创建订单(加锁、调用代理方法)
// ACK
} catch (Exception e) {
handlePendingList();
}
}

从这一刻开始:

👉 你的程序开始持续监听 Redis Stream,等待 Lua 脚本推来的订单消息


🎯 到这里为止,【项目刚启动时执行的顺序】完成:

【最终启动顺序总结】

  1. Spring 创建 VoucherOrderServiceImpl 实例
  2. 注入 @Resource 的依赖
  3. 创建 Spring AOP 代理(增强 @Transactional)
  4. 执行 @PostConstruct → 启动单线程监听 Redis Stream
  5. 新线程进入 while(true) 无限循环等待消息

🎯 3.2、【用户请求秒杀接口时】执行的顺序

用户调用接口:

1
public Result seckillVoucher(Long voucherId)

执行顺序如下:


🟦 1. 获取 userId、orderId

1
2
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId(...);

🟦 2. 执行 Lua 脚本检查秒杀资格

1
stringRedisTemplate.execute(SECKILL_SCRIPT, ...);

Lua 脚本会完成:

  • 判断库存是否足够
  • 判断用户是否已下单
  • 给库存 -1(Redis 内存层)
  • 写入 Redis Stream:XADD stream.orders * ...

如果脚本返回 1 或 2,说明失败。

如果返回 0:

👉 意味着订单写入了 Redis Stream(消息队列)。


🟦 3. 获取 AOP 代理对象

1
2
3
IVoucherOrderService proxy = 
(IVoucherOrderService) AopContext.currentProxy();
this.proxy = proxy;

必须要代理对象来执行事务方法 createVoucherOrder。

(注意:createVoucherOrder 并不会在这里执行!)


🟦 4. 直接 return Result.ok()

因为下单是异步的,返回给用户的速度非常快。


🎯3.3【后台线程监听到消息】执行顺序

后台线程不断执行:

1
handleVoucherOrder(voucherOrder);

这个过程:


🟦 1. Redis Stream 有新消息,线程读取它

由刚才 Lua 脚本写入的:

1
XADD stream.orders * userId=X voucherId=Y orderId=Z

🟦 2. 转为 Java 对象 VoucherOrder

1
BeanUtil.fillBeanWithMap(...)

🟦 3. 加 Redisson 分布式锁(按 userId)

1
RLock lock = redissonClient.getLock("lock:order:"+userId);

确保“一人一单”。


🟦 4. 调用代理对象方法创建订单

1
proxy.createVoucherOrder(voucherOrder);

该方法有:

  • 查询用户是否已下单(数据库)
  • 扣库存(数据库)
  • 保存订单(数据库)
  • 全程在一个事务里

🟦 5. 执行成功后 ACK Redis Stream 消息

1
XACK stream.orders g1 messageId

4.lua脚本语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- 优惠券id
local voucherId = ARGV[1];
-- 用户id
local userId = ARGV[2];
-- 订单id
local orderId = ARGV[3]

-- 库存的key
local stockKey = 'seckill:stock:' .. voucherId;
-- 订单key
local orderKey = 'seckill:order:' .. voucherId;

-- 判断库存是否充足 get stockKey > 0 ?
local stock = redis.call('GET', stockKey);
if (tonumber(stock) <= 0) then
-- 库存不足
return 1;
end

-- 库存充足,判断用户是否已经下过单 SISMEMBER orderKey userId
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
-- 用户下过单
return 2;
end

-- 库存充足,没有下过单,扣库存、下单
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
-- 发送消息到队列中,XADD stream.orders * key1 value1 key2 value2...
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
-- 返回0,表示下单成功
return 0;

4.1ARGV 是什么?(参数传进 Lua 的方式)

local voucherId = ARGV[1];

  • ARGV 是 Redis 在执行 Lua 脚本时,传进来的 参数数组

  • 你在 Java 里执行脚本时写的是:

    1
    2
    3
    4
    5
    6
    7
    stringRedisTemplate.execute(
    SECKILL_SCRIPT,
    Collections.emptyList(), // KEYS(这里没用)
    voucherId.toString(),
    userId.toString(),
    String.valueOf(orderId)
    );

    对应关系是:

    1
    2
    3
    ARGV[1] = voucherId
    ARGV[2] = userId
    ARGV[3] = orderId

所以:

1
2
3
local voucherId = ARGV[1];
local userId = ARGV[2];
local orderId = ARGV[3];

就是把传进来的参数取出来,赋值成局部变量。


4.2拼接 Redis 的 key

1
2
local stockKey = 'seckill:stock:' .. voucherId;
local orderKey = 'seckill:order:' .. voucherId;

这里的语法点:

  • '字符串' .. 变量:Lua 中的字符串拼接运算符是 ..

  • 比如 voucherId = 5 时:

    1
    2
    stockKey = "seckill:stock:5"
    orderKey = "seckill:order:5"

这些就是 Redis 里真实使用的 key 名:

  • seckill:stock:5:存这个优惠券的库存(String 类型)
  • seckill:order:5:存这张券已经下过单的用户(Set 集合)

4.3redis.call 是干嘛的?

在 Lua 脚本里,所有 Redis 命令都通过 redis.call 来调用

1
redis.call('命令名', 参数1, 参数2, ...)

比如:

  • redis.call('GET', stockKey)
  • redis.call('SISMEMBER', orderKey, userId)
  • redis.call('XADD', 'stream.orders', '*', 'userId', userId, ...)

可以理解为在 Lua 里“直接执行 Redis 命令”的语法。


4.4GET:读取库存

1
local stock = redis.call('GET', stockKey);

Redis 原生命令:

1
GET key

作用:

  • 从 Redis 读取一个 字符串类型 的值

这里:

  • stockKey 就是 'seckill:stock:' .. voucherId
  • 假设是 "seckill:stock:5",值可能是 "10"

redis.call('GET', stockKey) 返回的是一个字符串,例如 "10"
Lua 里用:

1
tonumber(stock)

把它转成数字,方便比较大小。


4.5判断库存是否 <= 0

1
2
3
if (tonumber(stock) <= 0) then
return 1;
end

解释:

  • tonumber(stock):把 GET 到的字符串 "10" 转成数字 10
  • <= 0:如果是 0 或负数,说明 没有库存了
  • return 1:Lua 脚本返回 1 代表 “库存不足”

所以这里的逻辑是:

如果库存 <= 0,则提前结束脚本,返回 1。


4.6SISMEMBER:判断用户是否已经下过单

1
2
3
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
return 2;
end

Redis 原生命令:

1
SISMEMBER key member

解释:

  • 用于判断一个值 member 是否在 Set 集合 key
  • 返回:
    • 1:存在
    • 0:不存在

这里:

  • orderKey = "seckill:order:<voucherId>"
  • 这个 key 对应的是 Redis 的集合(Set),里面存的都是已经下过单的 userId
  • redis.call('SISMEMBER', orderKey, userId)
    • 如果这个用户下过单 → 返回 1
    • 没下过单 → 返回 0

脚本判断:

1
== 1  → 下过单 → return 2

return 2 的含义:表示用户已下单,不能重复抢


4.7INCRBY:扣减库存

1
redis.call('INCRBY', stockKey, -1);

Redis 原生命令:

1
INCRBY key increment
  • 增加 key 对应的整数型 value
  • increment 可以是负数 → 实际就是减法

例子:

1
2
SET seckill:stock:5 10
INCRBY seckill:stock:5 -1 # 结果变成 9

在 Lua 中:

1
redis.call('INCRBY', stockKey, -1)

就是:库存 -1
为什么可以这样用?因为 Redis 的 String 类型如果存的是数字形式,可以当整数自增自减使用。


4.8SADD:把用户加入已下单集合

1
redis.call('SADD', orderKey, userId);

Redis 原生命令:

1
SADD key member [member ...]

含义:

  • 向集合(Set)中添加一个或多个成员

例子:

1
SADD seckill:order:5 1001
  • 表示用户 1001 抢到了券 5
  • 之后可以通过 SISMEMBER seckill:order:5 1001 判断是否抢过

在你的脚本里:

  • orderKey = "seckill:order:" .. voucherId
  • userId 是这次来抢的用户 ID

所以这一行表示:

记录下这个用户已经抢过这张券,用来实现“一人一单”。


4.9XADD:把订单写入消息队列(Redis Stream)

1
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);

这一行是整个脚本里最关键的一句,用于 发送消息到 Redis Stream 队列

Redis 原生命令:

1
XADD key message-id field1 value1 field2 value2 ...

参数解释:

  • key:Stream 的名称,比如 'stream.orders'(队列名)
  • message-id
    • *:让 Redis 自动生成一个递增的 ID(如 1691832948000-0
  • field1 value1:消息体里的内容(键值对)

你的这条命令等价于:

1
2
3
4
XADD stream.orders * 
userId <userId>
voucherId <voucherId>
id <orderId>

比如:

1
XADD stream.orders * userId 1001 voucherId 5 id 88374290123

也就是:

stream.orders 这个消息队列里插入一条新消息,内容包含:

  • userId
  • voucherId
  • id(订单id)

后面的 Java 后台线程 就是从这个队列里读这些消息,然后真正去写数据库下订单。


4.10return 0:表示执行成功

1
return 0;

含义就是:

  • 前面库存充足
  • 用户也没下过单
  • 成功扣了 Redis 里预库存
  • 成功记录用户已抢过
  • 成功将订单信息写入消息队列(Stream)

所以返回 0 表示 一切正常,下单成功(异步创建)

你的 Java 里对应逻辑:

1
2
3
if (result != null && !result.equals(0L)) {
// 1 = 库存不足, 2 = 重复下单
}

✅ 总结一下每个 Redis 语法的作用

按顺序记一下就好:

  1. GET key
    → 读取库存值,用来判断是不是 > 0
  2. SISMEMBER key member
    → 判断该用户是否已经在“已下单用户集合”里(是否重复下单)
  3. INCRBY key -1
    → 库存减 1(预扣库存,防止超卖)
  4. SADD key member
    → 把当前用户加入“已下单用户集合”,实现一人一单
  5. XADD stream.orders * field value ...
    → 把订单信息(userId、voucherId、orderId)写入 Redis Stream 队列,供后台线程异步处理
  6. return 1 / 2 / 0
    → 作为脚本执行结果返回给 Java,用于判断:
    • 1:库存不足
    • 2:重复下单
    • 0:抢购成功,已入队等待异步处理

Author: chenjunda

Link: http://example.com/2025/11/27/redis%E5%AD%A6%E4%B9%A0Day4/

Copyright: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

NextPost >
redis学习Day3
CATALOG
  1. 1. 使用redis中的stream列表当消息队列进行异步下单
    1. 1.1. 1. 线程池与后台线程初始化
      1. 1.1.1. ⭐ 1. 1秒杀系统为什么要后台线程?
      2. 1.1.2. ⭐ 1.2. 为什么不用一个普通线程,而是用线程池?
        1. 1.1.2.1. ✔ 1.2.1 线程池更专业、更安全
        2. 1.1.2.2. ❌ 线程意外结束,无法自动恢复
        3. 1.1.2.3. ✔ 1.2.2 newSingleThreadExecutor() = 永久单线程处理订单
        4. 1.1.2.4. 特点一:永远只有一个线程
        5. 1.1.2.5. 特点二:线程挂了会自动拉起新线程
      3. 1.1.3. ⭐ 1.3. 为什么是单线程线程池?
        1. 1.1.3.1. ✔ 要求 1:顺序处理订单
        2. 1.1.3.2. ✔ 要求 2:防止并发扣库存、冲突下单
      4. 1.1.4. ⭐ 1.4. newSingleThreadExecutor() 与 new Thread 有什么区别?
      5. 1.1.5. ⭐ 1.5. 总结(最重要,面试必背)
      6. 1.1.6. ✔ 1. 需要后台线程长期运行(非用户线程)
      7. 1.1.7. ✔ 2. 线程池能自动重启线程,避免线程崩溃导致系统停摆
      8. 1.1.8. ✔ 3. 单线程池保证订单处理完全顺序,避免并发问题
      9. 1.1.9. ✔ 4. 高并发场景下,订单要排队慢慢处理,所以单线程最合适
      10. 1.1.10. ✔ 5. 比 new Thread 更安全、更专业、更不容易出 bug
    2. 1.2. 2. Redis Stream 消息队列
      1. 1.2.1. 读取消息
      2. 1.2.2. 转成 Java 对象
      3. 1.2.3. 执行订单逻辑 & ACK
      4. 1.2.4. 2.1.1先看这一段 Java 代码到底干了啥
      5. 1.2.5. 1️⃣ Consumer.from(“g1”, “c1”)
      6. 1.2.6. 2️⃣ StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1))
        1. 1.2.6.1. count(1)
        2. 1.2.6.2. block(Duration.ofSeconds(1))
      7. 1.2.7. 3️⃣ StreamOffset.create(queueName, ReadOffset.lastConsumed())
      8. 1.2.8. 2.1.2把 Java 调用翻译成 Redis 命令
      9. 1.2.9. 2.1.3读出来后转成 Java 对象
      10. 1.2.10. 1️⃣ MapRecord 是 Redis Stream 的消息封装
      11. 1.2.11. 2️⃣ BeanUtil.fillBeanWithMap(…)
      12. 1.2.12. 2.1.4执行订单逻辑 + ACK
      13. 1.2.13. 1️⃣ handleVoucherOrder(voucherOrder);
      14. 1.2.14. 2️⃣ acknowledge(queueName, "g1", record.getId())
      15. 1.2.15. 2.1.5Redis Stream “消费者组” 的特性(重点)
      16. 1.2.16. 1️⃣ 一个 Stream,多组消费
      17. 1.2.17. 2️⃣ 组内多消费者,负载均衡
      18. 1.2.18. 3️⃣ Pending List(未确认消息列表)
      19. 1.2.19. 4️⃣ 为什么你读 PendingList 用的是 offset = “0”
        1. 1.2.19.1. 读新消息用:
        2. 1.2.19.2. 读 PendingList 用:
      20. 1.2.20. 5️⃣ 消费者组的几个关键好处
      21. 1.2.21. 2.1.6把整体流程再串一下(结合你的代码)
    3. 1.3. 2.3. Pending List 异常补偿
    4. 1.4. 2.4. 创建订单前的分布式锁
    5. 1.5. 2.5. Lua 脚本判断下单资格(核心)
    6. 1.6. 2.6. 创建订单(真正写数据库)
      1. 1.6.1. (1)检查是否一人一单
      2. 1.6.2. (2)扣减库存(数据库操作)
      3. 1.6.3. (3)保存订单
    7. 1.7. 2.7. AOP 代理对象的作用
    8. 1.8. 3【项目启动时】执行的流程(最重要)
      1. 1.8.1. 🟦 3.1. 1Spring 创建 VoucherOrderServiceImpl Bean 实例
      2. 1.8.2. 🟦 3.1.2. Spring 注入被 @Resource 标注的字段**
      3. 1.8.3. 🟦 3.1.3 Spring AOP 创建代理对象(非常重要)
      4. 1.8.4. 🟦 3.1.4. 调用 @PostConstruct 的 init() 方法
      5. 1.8.5. ✔ 此时所有依赖都可以使用
      6. 1.8.6. ✔ AOP 代理(事务增强)也已经准备好
      7. 1.8.7. ✔ 所以启动后台线程是安全的
      8. 1.8.8. 🟦 3.1.5. 后台线程开始运行:VoucherOrderHandler.run()
      9. 1.8.9. 🎯 到这里为止,【项目刚启动时执行的顺序】完成:
      10. 1.8.10. 【最终启动顺序总结】
      11. 1.8.11. 🎯 3.2、【用户请求秒杀接口时】执行的顺序
      12. 1.8.12. 🟦 1. 获取 userId、orderId
      13. 1.8.13. 🟦 2. 执行 Lua 脚本检查秒杀资格
      14. 1.8.14. 🟦 3. 获取 AOP 代理对象
      15. 1.8.15. 🟦 4. 直接 return Result.ok()
      16. 1.8.16. 🎯3.3【后台线程监听到消息】执行顺序
      17. 1.8.17. 🟦 1. Redis Stream 有新消息,线程读取它
      18. 1.8.18. 🟦 2. 转为 Java 对象 VoucherOrder
      19. 1.8.19. 🟦 3. 加 Redisson 分布式锁(按 userId)
      20. 1.8.20. 🟦 4. 调用代理对象方法创建订单
      21. 1.8.21. 🟦 5. 执行成功后 ACK Redis Stream 消息
    9. 1.9. 4.lua脚本语法
      1. 1.9.1. 4.1ARGV 是什么?(参数传进 Lua 的方式)
      2. 1.9.2. local voucherId = ARGV[1];
      3. 1.9.3. 4.2拼接 Redis 的 key
      4. 1.9.4. 这里的语法点:
      5. 1.9.5. 4.3redis.call 是干嘛的?
      6. 1.9.6. 4.4GET:读取库存
      7. 1.9.7. Redis 原生命令:
      8. 1.9.8. 4.5判断库存是否 <= 0
      9. 1.9.9. 4.6SISMEMBER:判断用户是否已经下过单
      10. 1.9.10. Redis 原生命令:
      11. 1.9.11. 4.7INCRBY:扣减库存
      12. 1.9.12. Redis 原生命令:
      13. 1.9.13. 4.8SADD:把用户加入已下单集合
      14. 1.9.14. Redis 原生命令:
      15. 1.9.15. 4.9XADD:把订单写入消息队列(Redis Stream)
      16. 1.9.16. Redis 原生命令:
      17. 1.9.17. 4.10return 0:表示执行成功
      18. 1.9.18. ✅ 总结一下每个 Redis 语法的作用