# redis-as-queue **Repository Path**: zclget/redis-as-queue ## Basic Information - **Project Name**: redis-as-queue - **Description**: redis作为延迟队列,模仿下单超时操作简单案例 KeyExpirationEventMessageListener监听过期key源码解析 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2021-06-25 - **Last Updated**: 2025-04-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: Redis ## README # redis作为延迟队列,模仿下单超时操作简单案例 1. 配置依赖 ```xml org.springframework.boot spring-boot-starter-data-redis ``` 2. 添加yml配置 ```yaml spring: redis: port: 6379 host: 127.0.0.1 server: port: 8080 ``` 3. 配置redis配置文件 ```text # notify-keyspace-events Ex # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. notify-keyspace-events Ex ``` |字符 | 发送的通知| | :----: | :----: | |K |键空间通知,所有通知以__keyspace@__ 为前缀| |E |键事件通知,所有通知以 __keyevent@__ 为前缀| |g |DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知| |$ |字符串命令的通知| |l |列表命令的通知| |s |集合命令的通知| |h |哈希命令的通知| |z |有序集合命令的通知| |x |过期事件,每当有过期键被删除时发送| |e |驱逐事件,每当有键因为maxmemory政策而被删除时发送| |A |参数 g$lshzxe 的别名| 4. 添加配置类 ```java @Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } } ``` 5. 添加redis监听类实现KeyExpirationEventMessageListener接口 ```java @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { @Autowired RedisTemplate redisTemplate; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 针对 redis 数据失效事件,进行数据处理 * * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { RedisSerializer serializer = redisTemplate.getValueSerializer(); // 获取到失效的 key String orderNo = message.toString(); System.out.println("orderNo:" + orderNo); if (StrUtil.startWith("order", "order")) { System.out.println("订单:" + orderNo + "超时取消" + DateUtil.format(LocalDateTime.now(), DatePattern.NORM_DATETIME_PATTERN)); } } } ``` 6. 启动redis,启动项目,redis设置一个有过期时间的值,或直接访问http://localhost:8080/redisTest ```shell set order12345 a ex 5 ``` 7. 过期的key会触发监听调用onMessage方法,message.toString()为当前过期的key 8. 源码解析 ```java // KeyspaceEventMessageListener实现了InitializingBean,走bean的生命周期时会调用afterPropertiesSet @Override public void afterPropertiesSet() throws Exception { init(); } // init方法中调用了doRegister(listenerContainer); protected void doRegister(RedisMessageListenerContainer container) { listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS); } // RedisMessageListenerContainer public void addMessageListener(MessageListener listener, Topic topic) { addMessageListener(listener, Collections.singleton(topic)); } // addMessageListener public void addMessageListener(MessageListener listener, Collection topics) { addListener(listener, topics); lazyListen(); } ``` ```java // RedisMessageListenerContainer实现了InitializingBean调用afterPropertiesSet,初始化执行器 public void afterPropertiesSet() { if (taskExecutor == null) { manageExecutor = true; taskExecutor = createDefaultTaskExecutor(); } if (subscriptionExecutor == null) { subscriptionExecutor = taskExecutor; } initialized = true; } // lazyListen()方法走了执行器调用的任务 private void lazyListen() { boolean debug = logger.isDebugEnabled(); boolean started = false; if (isRunning()) { if (!listening) { synchronized (monitor) { if (!listening) { if (channelMapping.size() > 0 || patternMapping.size() > 0) { // 执行器执行任务 subscriptionExecutor.execute(subscriptionTask); listening = true; started = true; } } } if (debug) { if (started) { logger.debug("Started listening for Redis messages"); } else { logger.debug("Postpone listening for Redis messages until actual listeners are added"); } } } } } ``` ```java // SubscriptionTask的run方法 public void run() { synchronized (localMonitor) { subscriptionTaskRunning = true; } try { connection = connectionFactory.getConnection(); if (connection.isSubscribed()) { throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening"); } boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory); // NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription. if (!asyncConnection) { synchronized (monitor) { monitor.notify(); } } // 执行订阅事件 SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription(); if (asyncConnection) { SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime()); synchronized (monitor) { monitor.notify(); } } } catch (Throwable t) { handleSubscriptionException(t); } finally { // this block is executed once the subscription thread has ended, this may or may not mean // the connection has been unsubscribed, depending on driver synchronized (localMonitor) { subscriptionTaskRunning = false; localMonitor.notify(); } } } // eventuallyPerformSubscription private SubscriptionPresentCondition eventuallyPerformSubscription() { SubscriptionPresentCondition condition = null; if (channelMapping.isEmpty()) { condition = new PatternSubscriptionPresentCondition(); // 订阅事件执行,订阅后DispatchMessageListener收到消息会调用onMessage方法 connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet())); } else { if (patternMapping.isEmpty()) { condition = new SubscriptionPresentCondition(); } else { // schedule the rest of the subscription subscriptionExecutor.execute(new PatternSubscriptionTask()); condition = new PatternSubscriptionPresentCondition(); } connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); } return condition; } // DispatchMessageListener收到消息后调用onMessage // dispatchMessage(listeners, message, pattern); // 依次执行注册监听器的onMessage方法 // 细节... ```