Redis监听过期的key实现流程是什么
导读:本文共9040.5字符,通常情况下阅读需要30分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 一、简介我们来个最简单的集群架构,如下图:我们上面图中看到是服务A和服务B就是同一个服务的不同实例。二、maven依赖pom.xml<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.... ...
目录
(为您整理了一些要点),点击可以直达。一、简介
我们来个最简单的集群架构,如下图:
我们上面图中看到是服务A和服务B就是同一个服务的不同实例。
二、maven依赖
pom.xml
<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.0</version><relativePath/><!--lookupparentfromrepository--></parent><groupId>com.alian</groupId><artifactId>expiration</artifactId><version>0.0.1-SNAPSHOT</version><name>expiration</name><description>redis-key-expiration-listener</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><project.package.directory>target</project.package.directory><java.version>1.8</java.version><!--com.fasterxml.jackson版本--><jackson.version>2.9.10</jackson.version><!--阿里巴巴fastjson版本--><fastjson.version>1.2.68</fastjson.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--redis依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${parent.version}</version></dependency><!--用于序列化--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--java8时间序列化--><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.14</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
三、编码实现
3.1、application.properties
# 端口
server.port=8090
# 上下文路径
server.servlet.context-path=/expiration# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.0.193
#spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=10
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=20000
# 读时间(毫秒)
spring.redis.timeout=10000
# 连接超时时间(毫秒)
spring.redis.connect-timeout=10000
3.2、Redis配置类
RedisConfig
packagecom.alian.expiration.config;importcom.fasterxml.jackson.annotation.JsonAutoDetect;importcom.fasterxml.jackson.annotation.PropertyAccessor;importcom.fasterxml.jackson.databind.ObjectMapper;importcom.fasterxml.jackson.databind.SerializationFeature;importcom.fasterxml.jackson.datatype.jsr310.JavaTimeModule;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.redis.connection.RedisConnectionFactory;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.data.redis.listener.RedisMessageListenerContainer;importorg.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;importorg.springframework.data.redis.serializer.RedisSerializer;importorg.springframework.data.redis.serializer.StringRedisSerializer;importjava.time.LocalDate;importjava.time.LocalDateTime;importjava.time.LocalTime;importjava.time.format.DateTimeFormatter;@ConfigurationpublicclassRedisConfig{/***redis配置**@paramredisConnectionFactory*@return*/@BeanpublicRedisTemplate<String,Object>redisTemplate(RedisConnectionFactoryredisConnectionFactory){//实例化redisTemplateRedisTemplate<String,Object>redisTemplate=newRedisTemplate<>();//设置连接工厂redisTemplate.setConnectionFactory(redisConnectionFactory);//key采用String的序列化redisTemplate.setKeySerializer(keySerializer());//value采用jackson序列化redisTemplate.setValueSerializer(valueSerializer());//Hashkey采用String的序列化redisTemplate.setHashKeySerializer(keySerializer());//Hashvalue采用jackson序列化redisTemplate.setHashValueSerializer(valueSerializer());//支持事务//redisTemplate.setEnableTransactionSupport(true);//执行函数,初始化RedisTemplateredisTemplate.afterPropertiesSet();returnredisTemplate;}/***key类型采用String序列化**@return*/privateRedisSerializer<String>keySerializer(){returnnewStringRedisSerializer();}/***value采用JSON序列化**@return*/privateRedisSerializer<Object>valueSerializer(){//设置jackson序列化Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer=newJackson2JsonRedisSerializer<>(Object.class);//设置序列化对象jackson2JsonRedisSerializer.setObjectMapper(getMapper());returnjackson2JsonRedisSerializer;}/***使用com.fasterxml.jackson.databind.ObjectMapper*对数据进行处理包括java8里的时间**@return*/privateObjectMappergetMapper(){ObjectMappermapper=newObjectMapper();//设置可见性mapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);//默认键入对象mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);//设置Java8时间序列化JavaTimeModuletimeModule=newJavaTimeModule();timeModule.addSerializer(LocalDateTime.class,newLocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss")));timeModule.addSerializer(LocalDate.class,newLocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addSerializer(LocalTime.class,newLocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));timeModule.addDeserializer(LocalDateTime.class,newLocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss")));timeModule.addDeserializer(LocalDate.class,newLocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addDeserializer(LocalTime.class,newLocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));//禁用把时间转为时间戳mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,false);mapper.registerModule(timeModule);returnmapper;}@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);returncontainer;}}
和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer
3.3、监听器
RedisKeyExpirationListener
packagecom.alian.expiration.listener;importcom.alian.expiration.service.RedisExpirationService;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.connection.Message;importorg.springframework.data.redis.listener.KeyExpirationEventMessageListener;importorg.springframework.data.redis.listener.RedisMessageListenerContainer;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassRedisKeyExpirationListenerextendsKeyExpirationEventMessageListener{@AutowiredprivateRedisExpirationServiceredisExpirationService; //把我们上面一步配置的bean注入进去publicRedisKeyExpirationListener(RedisMessageListenerContainerlistenerContainer){super(listenerContainer);}/***针对redis数据失效事件,进行数据处理**@parammessage*@parampattern*/@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){//用户做自己的业务处理即可,注意message.toString()可以获取失效的keyStringexpiredKey=message.toString();log.info("onMessage-->redis过期的key是:{}",expiredKey);try{//对过期key进行处理redisExpirationService.processingExpiredKey(expiredKey);log.info("过期key处理完成:{}",expiredKey);}catch(Exceptione){e.printStackTrace();log.error("处理redis过期的key异常:{}",expiredKey,e);}}}
实现的步骤如下:
继承KeyExpirationEventMessageListener
把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中
重写onMessage方法
通过Message 的 toString() 方法就可以获取到过期的key
对key中关键信息进行业务处理,比如 id
3.4、服务类
RedisExpirationService
packagecom.alian.expiration.service;importcom.alian.expiration.util.SignUtils;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Service;importjava.util.concurrent.TimeUnit;@Slf4j@ServicepublicclassRedisExpirationService{@AutowiredprivateRedisTemplate<String,Object>redisTemplate;publicvoidprocessingExpiredKey(StringexpiredKey){//如果是优惠券的key(一定要规范命名)if(expiredKey.startsWith("com.mall.coupon.id")){//临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理StringtempKey=SignUtils.md5(expiredKey,"UTF-8");//临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)Booleanexist=redisTemplate.opsForValue().setIfAbsent(tempKey,"1",10,TimeUnit.SECONDS);if(Boolean.TRUE.equals(exist)){log.info("BusinessHanding...");//比如截取里面的id,然后关联数据库进行处理}else{log.info("Otherserviceishanding...");}}else{log.info("Expiredkeyswithoutprocessing");}}}
基本流程如下:
判断是否是需要处理的key,一般这种key通过命名规范加以处理
以当前key生成一个新的key作为分布式key
如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)
设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key
根据 key 的关键信息(比如截取id),进行业务处理
3.5、工具类
SignUtils
packagecom.alian.expiration.util;importjava.security.MessageDigest;publicclassSignUtils{publicstaticfinalStringmd5(Strings,Stringcharset){char[]hexDigits=newchar[]{'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};try{byte[]btInput=s.getBytes(charset);MessageDigestmdInst=MessageDigest.getInstance("MD5");mdInst.update(btInput);byte[]md=mdInst.digest();intj=md.length;char[]str=newchar[j*2];intk=0;for(bytebyte0:md){str[k++]=hexDigits[byte0>>>4&15];str[k++]=hexDigits[byte0&15];}returnnewString(str);}catch(Exceptionvar11){return"";}}}
四、测试
4.1、测试类
简单模拟下发送一个优惠券数据到redis,然后设置超时时间
packagecom.alian.expiration;importlombok.extern.slf4j.Slf4j;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.TimeUnit;@Slf4j@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublicclassRedisKeyExpirationTest{@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@TestpublicvoidkeyExpiration(){//优惠券信息Stringid="2023021685264735";Map<String,String>map=newHashMap<>();map.put("id",id);map.put("amount","1000");map.put("type","1001");map.put("describe","满减红包");//缓存到redisredisTemplate.opsForHash().putAll("com.mall.coupon.id."+id,map);//设置过期时间redisTemplate.expire("com.mall.coupon.id."+id,10,TimeUnit.SECONDS);}}
4.2、单实例
单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:
10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
10:23:39 988 INFO [container-2]:Business Handing...
10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
10:23:50 005 INFO [container-3]:Expired keys without processing
10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
4.3、多实例
多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Business Handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
8091端口的服务结果如下(Other service is handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Other service is handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
结果分析:
多实例的情况下,每个实例都会收到过期key通知
通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复
使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
Redis监听过期的key实现流程是什么的详细内容,希望对您有所帮助,信息来源于网络。