基于 Redis ZSet 延迟队列,包含原子消费、防重复、异常重试核心逻辑。
前置依赖
首先确保项目引入 Redis 客户端(以 Spring Data Redis 为例,最常用):
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
完整实现代码
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Redis ZSet 延迟队列(生产可用版)
*/
@Component
public class RedisDelayQueue {
// 延迟队列的 Redis Key
private static final String DELAY_QUEUE_KEY = "delay_queue:order";
// 处理中的任务 Key(防止重复消费)
private static final String PENDING_QUEUE_KEY = "delay_queue:pending";
// 重试次数上限
private static final int MAX_RETRY = 3;
@Resource
private RedisTemplate<String, String> redisTemplate;
// 定时消费线程池(核心数根据业务调整)
private final ScheduledExecutorService consumerExecutor = Executors.newScheduledThreadPool(1);
/**
* 初始化:启动消费者轮询
*/
public void init() {
// 每隔 100ms 轮询一次(可根据业务调整)
consumerExecutor.scheduleAtFixedRate(this::consume, 0, 100, TimeUnit.MILLISECONDS);
}
/**
* 生产延迟任务
* @param taskId 任务ID(如订单号)
* @param delaySeconds 延迟秒数
*/
public void produce(String taskId, long delaySeconds) {
// 计算到期时间戳(秒)
long expireTime = System.currentTimeMillis() / 1000 + delaySeconds;
// 添加到 ZSet:value=taskId,score=到期时间戳
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskId, expireTime);
System.out.printf("[%s] 生产延迟任务:%s,延迟 %d 秒,到期时间戳:%d%n",
LocalDateTime.now(), taskId, delaySeconds, expireTime);
}
/**
* 消费延迟任务(核心逻辑)
*/
private void consume() {
long now = System.currentTimeMillis() / 1000; // 当前时间戳(秒)
// 1. 查已到期的任务(score ≤ 当前时间,只取1条,避免一次处理过多)
Set<ZSetOperations.TypedTuple<String>> expiredTasks = redisTemplate.opsForZSet()
.rangeByScoreWithScores(DELAY_QUEUE_KEY, 0, now, 0, 1);
if (expiredTasks == null || expiredTasks.isEmpty()) {
return; // 无到期任务,直接返回
}
// 2. 遍历到期任务(这里只取1条,所以循环只有1次)
for (ZSetOperations.TypedTuple<String> task : expiredTasks) {
String taskId = task.getValue();
if (taskId == null) {
continue;
}
// 3. 原子删除(核心:防多消费者重复消费)
Long removeCount = redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskId);
if (removeCount == null || removeCount == 0) {
continue; // 被其他消费者拿走,跳过
}
// 4. 标记为处理中(防止任务执行失败丢失)
redisTemplate.opsForValue().set(PENDING_QUEUE_KEY + ":" + taskId, "1", 5, TimeUnit.MINUTES);
// 5. 执行任务(核心业务逻辑)
try {
executeTask(taskId);
// 6. 任务执行成功,删除处理中标记
redisTemplate.delete(PENDING_QUEUE_KEY + ":" + taskId);
System.out.printf("[%s] 任务 %s 执行成功%n", LocalDateTime.now(), taskId);
} catch (Exception e) {
// 7. 任务执行失败,重试逻辑
handleTaskFail(taskId);
System.err.printf("[%s] 任务 %s 执行失败:%s%n", LocalDateTime.now(), taskId, e.getMessage());
}
}
}
/**
* 执行具体业务任务(替换为你的业务逻辑)
* @param taskId 任务ID
*/
private void executeTask(String taskId) {
// 示例:订单超时未支付,执行取消订单逻辑
System.out.printf("[%s] 开始执行任务:%s → 模拟取消超时订单%n", LocalDateTime.now(), taskId);
// 这里写你的核心业务代码,比如调用订单取消接口
// 如:orderService.cancelOrder(taskId);
}
/**
* 任务执行失败处理(重试+兜底)
* @param taskId 任务ID
*/
private void handleTaskFail(String taskId) {
// 1. 获取当前重试次数
String retryKey = "delay_queue:retry:" + taskId;
Integer retryCount = (Integer) redisTemplate.opsForValue().get(retryKey);
retryCount = retryCount == null ? 1 : retryCount + 1;
// 2. 判断是否超过重试上限
if (retryCount > MAX_RETRY) {
// 重试耗尽,丢到死信队列(人工处理)
redisTemplate.opsForList().leftPush("delay_queue:dead", taskId);
redisTemplate.delete(PENDING_QUEUE_KEY + ":" + taskId);
redisTemplate.delete(retryKey);
System.err.printf("[%s] 任务 %s 重试 %d 次失败,进入死信队列%n",
LocalDateTime.now(), taskId, retryCount);
return;
}
// 3. 未超上限,重新加入延迟队列(指数退避:1s→2s→4s)
long delay = (long) Math.pow(2, retryCount - 1);
redisTemplate.opsForValue().set(retryKey, retryCount, 10, TimeUnit.MINUTES);
produce(taskId, delay);
redisTemplate.delete(PENDING_QUEUE_KEY + ":" + taskId);
System.out.printf("[%s] 任务 %s 重试 %d 次,重新加入延迟队列,延迟 %d 秒%n",
LocalDateTime.now(), taskId, retryCount, delay);
}
/**
* 关闭线程池(应用关闭时调用)
*/
public void shutdown() {
consumerExecutor.shutdown();
try {
if (!consumerExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
consumerExecutor.shutdownNow();
}
} catch (InterruptedException e) {
consumerExecutor.shutdownNow();
}
}
}
测试代码(验证功能)
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class DelayQueueTestApplication {
public static void main(String[] args) {
SpringApplication.run(DelayQueueTestApplication.class, args);
}
@Bean
public CommandLineRunner testDelayQueue(RedisDelayQueue delayQueue) {
return args -> {
// 初始化消费者
delayQueue.init();
// 生产3个测试任务
delayQueue.produce("order_1001", 3); // 3秒后执行
delayQueue.produce("order_1002", 5); // 5秒后执行
delayQueue.produce("order_1003", 1); // 1秒后执行
// 阻塞主线程,观察执行结果
Thread.sleep(10000);
// 关闭线程池
delayQueue.shutdown();
};
}
}
核心逻辑解释
- 生产任务:
produce方法将任务ID和到期时间戳存入 ZSet,score 是到期时间戳,保证有序性。 - 消费任务:
- 轮询获取
score ≤ 当前时间的任务,每次只取1条,避免压力过大; - 用
ZREM原子删除,确保多消费者环境下不会重复消费; - 任务执行前标记为“处理中”,防止执行失败丢失。
- 轮询获取
- 异常处理:任务执行失败后,采用指数退避重试(1s→2s→4s),重试耗尽后进入死信队列,人工兜底。
运行注意事项
- 确保 Redis 服务正常运行,且 Spring 配置了 Redis 连接信息;
- 轮询间隔(100ms)可根据业务调整:实时性要求高则调小,性能要求高则调大;
- 线程池核心数根据服务器配置调整,避免过多线程抢占资源;
- 死信队列需配合监控/告警,及时处理失败任务。
总结
- Redis ZSet 实现延迟队列的核心是用 score 存储到期时间戳,原子删除保证消费唯一性;
- 生产环境需增加处理中标记、重试机制、死信队列,避免任务丢失或重复执行;
- 该方案适合中小型业务场景,高可靠/高并发场景建议用 RocketMQ/RabbitMQ 专业延迟队列。