Administrator
Published on 2026-03-13 / 5 Visits
0

Java 实现基于 Redis ZSet 延迟队列

#AI

基于 Redis ZSet 延迟队列,包含原子消费、防重复、异常重试核心逻辑。

前置依赖

首先确保项目引入 Redis 客户端(以 Spring Data Redis 为例,最常用):

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

完整实现代码

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Redis ZSet 延迟队列(生产可用版)
 */
@Component
public class RedisDelayQueue {
    // 延迟队列的 Redis Key
    private static final String DELAY_QUEUE_KEY = "delay_queue:order";
    // 处理中的任务 Key(防止重复消费)
    private static final String PENDING_QUEUE_KEY = "delay_queue:pending";
    // 重试次数上限
    private static final int MAX_RETRY = 3;

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    // 定时消费线程池(核心数根据业务调整)
    private final ScheduledExecutorService consumerExecutor = Executors.newScheduledThreadPool(1);

    /**
     * 初始化:启动消费者轮询
     */
    public void init() {
        // 每隔 100ms 轮询一次(可根据业务调整)
        consumerExecutor.scheduleAtFixedRate(this::consume, 0, 100, TimeUnit.MILLISECONDS);
    }

    /**
     * 生产延迟任务
     * @param taskId 任务ID(如订单号)
     * @param delaySeconds 延迟秒数
     */
    public void produce(String taskId, long delaySeconds) {
        // 计算到期时间戳(秒)
        long expireTime = System.currentTimeMillis() / 1000 + delaySeconds;
        // 添加到 ZSet:value=taskId,score=到期时间戳
        redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskId, expireTime);
        System.out.printf("[%s] 生产延迟任务:%s,延迟 %d 秒,到期时间戳:%d%n",
                LocalDateTime.now(), taskId, delaySeconds, expireTime);
    }

    /**
     * 消费延迟任务(核心逻辑)
     */
    private void consume() {
        long now = System.currentTimeMillis() / 1000; // 当前时间戳(秒)

        // 1. 查已到期的任务(score ≤ 当前时间,只取1条,避免一次处理过多)
        Set<ZSetOperations.TypedTuple<String>> expiredTasks = redisTemplate.opsForZSet()
                .rangeByScoreWithScores(DELAY_QUEUE_KEY, 0, now, 0, 1);

        if (expiredTasks == null || expiredTasks.isEmpty()) {
            return; // 无到期任务,直接返回
        }

        // 2. 遍历到期任务(这里只取1条,所以循环只有1次)
        for (ZSetOperations.TypedTuple<String> task : expiredTasks) {
            String taskId = task.getValue();
            if (taskId == null) {
                continue;
            }

            // 3. 原子删除(核心:防多消费者重复消费)
            Long removeCount = redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskId);
            if (removeCount == null || removeCount == 0) {
                continue; // 被其他消费者拿走,跳过
            }

            // 4. 标记为处理中(防止任务执行失败丢失)
            redisTemplate.opsForValue().set(PENDING_QUEUE_KEY + ":" + taskId, "1", 5, TimeUnit.MINUTES);

            // 5. 执行任务(核心业务逻辑)
            try {
                executeTask(taskId);
                // 6. 任务执行成功,删除处理中标记
                redisTemplate.delete(PENDING_QUEUE_KEY + ":" + taskId);
                System.out.printf("[%s] 任务 %s 执行成功%n", LocalDateTime.now(), taskId);
            } catch (Exception e) {
                // 7. 任务执行失败,重试逻辑
                handleTaskFail(taskId);
                System.err.printf("[%s] 任务 %s 执行失败:%s%n", LocalDateTime.now(), taskId, e.getMessage());
            }
        }
    }

    /**
     * 执行具体业务任务(替换为你的业务逻辑)
     * @param taskId 任务ID
     */
    private void executeTask(String taskId) {
        // 示例:订单超时未支付,执行取消订单逻辑
        System.out.printf("[%s] 开始执行任务:%s → 模拟取消超时订单%n", LocalDateTime.now(), taskId);
        // 这里写你的核心业务代码,比如调用订单取消接口
        // 如:orderService.cancelOrder(taskId);
    }

    /**
     * 任务执行失败处理(重试+兜底)
     * @param taskId 任务ID
     */
    private void handleTaskFail(String taskId) {
        // 1. 获取当前重试次数
        String retryKey = "delay_queue:retry:" + taskId;
        Integer retryCount = (Integer) redisTemplate.opsForValue().get(retryKey);
        retryCount = retryCount == null ? 1 : retryCount + 1;

        // 2. 判断是否超过重试上限
        if (retryCount > MAX_RETRY) {
            // 重试耗尽,丢到死信队列(人工处理)
            redisTemplate.opsForList().leftPush("delay_queue:dead", taskId);
            redisTemplate.delete(PENDING_QUEUE_KEY + ":" + taskId);
            redisTemplate.delete(retryKey);
            System.err.printf("[%s] 任务 %s 重试 %d 次失败,进入死信队列%n",
                    LocalDateTime.now(), taskId, retryCount);
            return;
        }

        // 3. 未超上限,重新加入延迟队列(指数退避:1s→2s→4s)
        long delay = (long) Math.pow(2, retryCount - 1);
        redisTemplate.opsForValue().set(retryKey, retryCount, 10, TimeUnit.MINUTES);
        produce(taskId, delay);
        redisTemplate.delete(PENDING_QUEUE_KEY + ":" + taskId);
        System.out.printf("[%s] 任务 %s 重试 %d 次,重新加入延迟队列,延迟 %d 秒%n",
                LocalDateTime.now(), taskId, retryCount, delay);
    }

    /**
     * 关闭线程池(应用关闭时调用)
     */
    public void shutdown() {
        consumerExecutor.shutdown();
        try {
            if (!consumerExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                consumerExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            consumerExecutor.shutdownNow();
        }
    }
}

测试代码(验证功能)

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DelayQueueTestApplication {
    public static void main(String[] args) {
        SpringApplication.run(DelayQueueTestApplication.class, args);
    }

    @Bean
    public CommandLineRunner testDelayQueue(RedisDelayQueue delayQueue) {
        return args -> {
            // 初始化消费者
            delayQueue.init();

            // 生产3个测试任务
            delayQueue.produce("order_1001", 3); // 3秒后执行
            delayQueue.produce("order_1002", 5); // 5秒后执行
            delayQueue.produce("order_1003", 1); // 1秒后执行

            // 阻塞主线程,观察执行结果
            Thread.sleep(10000);

            // 关闭线程池
            delayQueue.shutdown();
        };
    }
}

核心逻辑解释

  1. 生产任务produce 方法将任务ID和到期时间戳存入 ZSet,score 是到期时间戳,保证有序性。
  2. 消费任务
    • 轮询获取 score ≤ 当前时间 的任务,每次只取1条,避免压力过大;
    • ZREM 原子删除,确保多消费者环境下不会重复消费;
    • 任务执行前标记为“处理中”,防止执行失败丢失。
  3. 异常处理:任务执行失败后,采用指数退避重试(1s→2s→4s),重试耗尽后进入死信队列,人工兜底。

运行注意事项

  1. 确保 Redis 服务正常运行,且 Spring 配置了 Redis 连接信息;
  2. 轮询间隔(100ms)可根据业务调整:实时性要求高则调小,性能要求高则调大;
  3. 线程池核心数根据服务器配置调整,避免过多线程抢占资源;
  4. 死信队列需配合监控/告警,及时处理失败任务。

总结

  1. Redis ZSet 实现延迟队列的核心是用 score 存储到期时间戳,原子删除保证消费唯一性
  2. 生产环境需增加处理中标记、重试机制、死信队列,避免任务丢失或重复执行;
  3. 该方案适合中小型业务场景,高可靠/高并发场景建议用 RocketMQ/RabbitMQ 专业延迟队列。