Redis监听过期的key实现流程是什么(key,redis,开发技术)

时间:2024-04-29 06:05:33 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

    一、简介

    我们来个最简单的集群架构,如下图:

    Redis监听过期的key实现流程是什么

    我们上面图中看到是服务A和服务B就是同一个服务的不同实例。

    二、maven依赖

    pom.xml

    <?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.0</version><relativePath/><!--lookupparentfromrepository--></parent><groupId>com.alian</groupId><artifactId>expiration</artifactId><version>0.0.1-SNAPSHOT</version><name>expiration</name><description>redis-key-expiration-listener</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><project.package.directory>target</project.package.directory><java.version>1.8</java.version><!--com.fasterxml.jackson版本--><jackson.version>2.9.10</jackson.version><!--阿里巴巴fastjson版本--><fastjson.version>1.2.68</fastjson.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--redis依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${parent.version}</version></dependency><!--用于序列化--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--java8时间序列化--><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.14</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

    三、编码实现

    3.1、application.properties

    # 端口
    server.port=8090
    # 上下文路径
    server.servlet.context-path=/expiration

    # Redis数据库索引(默认为0)
    spring.redis.database=0
    # Redis服务器地址
    spring.redis.host=192.168.0.193
    #spring.redis.host=127.0.0.1
    # Redis服务器连接端口
    spring.redis.port=6379
    # Redis服务器连接密码(默认为空)
    spring.redis.password=
    # 连接池最大连接数(使用负值表示没有限制)
    spring.redis.jedis.pool.max-active=20
    # 连接池中的最小空闲连接
    spring.redis.jedis.pool.min-idle=10
    # 连接池中的最大空闲连接
    spring.redis.jedis.pool.max-idle=10
    # 连接池最大阻塞等待时间(使用负值表示没有限制)
    spring.redis.jedis.pool.max-wait=20000
    # 读时间(毫秒)
    spring.redis.timeout=10000
    # 连接超时时间(毫秒)
    spring.redis.connect-timeout=10000

    3.2、Redis配置类

    RedisConfig

    packagecom.alian.expiration.config;importcom.fasterxml.jackson.annotation.JsonAutoDetect;importcom.fasterxml.jackson.annotation.PropertyAccessor;importcom.fasterxml.jackson.databind.ObjectMapper;importcom.fasterxml.jackson.databind.SerializationFeature;importcom.fasterxml.jackson.datatype.jsr310.JavaTimeModule;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.redis.connection.RedisConnectionFactory;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.data.redis.listener.RedisMessageListenerContainer;importorg.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;importorg.springframework.data.redis.serializer.RedisSerializer;importorg.springframework.data.redis.serializer.StringRedisSerializer;importjava.time.LocalDate;importjava.time.LocalDateTime;importjava.time.LocalTime;importjava.time.format.DateTimeFormatter;@ConfigurationpublicclassRedisConfig{/***redis配置**@paramredisConnectionFactory*@return*/@BeanpublicRedisTemplate<String,Object>redisTemplate(RedisConnectionFactoryredisConnectionFactory){//实例化redisTemplateRedisTemplate<String,Object>redisTemplate=newRedisTemplate<>();//设置连接工厂redisTemplate.setConnectionFactory(redisConnectionFactory);//key采用String的序列化redisTemplate.setKeySerializer(keySerializer());//value采用jackson序列化redisTemplate.setValueSerializer(valueSerializer());//Hashkey采用String的序列化redisTemplate.setHashKeySerializer(keySerializer());//Hashvalue采用jackson序列化redisTemplate.setHashValueSerializer(valueSerializer());//支持事务//redisTemplate.setEnableTransactionSupport(true);//执行函数,初始化RedisTemplateredisTemplate.afterPropertiesSet();returnredisTemplate;}/***key类型采用String序列化**@return*/privateRedisSerializer<String>keySerializer(){returnnewStringRedisSerializer();}/***value采用JSON序列化**@return*/privateRedisSerializer<Object>valueSerializer(){//设置jackson序列化Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer=newJackson2JsonRedisSerializer<>(Object.class);//设置序列化对象jackson2JsonRedisSerializer.setObjectMapper(getMapper());returnjackson2JsonRedisSerializer;}/***使用com.fasterxml.jackson.databind.ObjectMapper*对数据进行处理包括java8里的时间**@return*/privateObjectMappergetMapper(){ObjectMappermapper=newObjectMapper();//设置可见性mapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);//默认键入对象mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);//设置Java8时间序列化JavaTimeModuletimeModule=newJavaTimeModule();timeModule.addSerializer(LocalDateTime.class,newLocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss")));timeModule.addSerializer(LocalDate.class,newLocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addSerializer(LocalTime.class,newLocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));timeModule.addDeserializer(LocalDateTime.class,newLocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss")));timeModule.addDeserializer(LocalDate.class,newLocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addDeserializer(LocalTime.class,newLocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));//禁用把时间转为时间戳mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,false);mapper.registerModule(timeModule);returnmapper;}@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);returncontainer;}}

    和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer

    3.3、监听器

    RedisKeyExpirationListener

    packagecom.alian.expiration.listener;importcom.alian.expiration.service.RedisExpirationService;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.connection.Message;importorg.springframework.data.redis.listener.KeyExpirationEventMessageListener;importorg.springframework.data.redis.listener.RedisMessageListenerContainer;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassRedisKeyExpirationListenerextendsKeyExpirationEventMessageListener{@AutowiredprivateRedisExpirationServiceredisExpirationService; //把我们上面一步配置的bean注入进去publicRedisKeyExpirationListener(RedisMessageListenerContainerlistenerContainer){super(listenerContainer);}/***针对redis数据失效事件,进行数据处理**@parammessage*@parampattern*/@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){//用户做自己的业务处理即可,注意message.toString()可以获取失效的keyStringexpiredKey=message.toString();log.info("onMessage-->redis过期的key是:{}",expiredKey);try{//对过期key进行处理redisExpirationService.processingExpiredKey(expiredKey);log.info("过期key处理完成:{}",expiredKey);}catch(Exceptione){e.printStackTrace();log.error("处理redis过期的key异常:{}",expiredKey,e);}}}

    实现的步骤如下:

    • 继承KeyExpirationEventMessageListener

    • 把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中

    • 重写onMessage方法

    • 通过Message 的 toString() 方法就可以获取到过期的key

    • 对key中关键信息进行业务处理,比如 id

    3.4、服务类

    RedisExpirationService

    packagecom.alian.expiration.service;importcom.alian.expiration.util.SignUtils;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Service;importjava.util.concurrent.TimeUnit;@Slf4j@ServicepublicclassRedisExpirationService{@AutowiredprivateRedisTemplate<String,Object>redisTemplate;publicvoidprocessingExpiredKey(StringexpiredKey){//如果是优惠券的key(一定要规范命名)if(expiredKey.startsWith("com.mall.coupon.id")){//临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理StringtempKey=SignUtils.md5(expiredKey,"UTF-8");//临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)Booleanexist=redisTemplate.opsForValue().setIfAbsent(tempKey,"1",10,TimeUnit.SECONDS);if(Boolean.TRUE.equals(exist)){log.info("BusinessHanding...");//比如截取里面的id,然后关联数据库进行处理}else{log.info("Otherserviceishanding...");}}else{log.info("Expiredkeyswithoutprocessing");}}}

    基本流程如下:

    • 判断是否是需要处理的key,一般这种key通过命名规范加以处理

    • 以当前key生成一个新的key作为分布式key

    • 如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)

    • 设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key

    • 根据 key 的关键信息(比如截取id),进行业务处理

    3.5、工具类

    SignUtils

    packagecom.alian.expiration.util;importjava.security.MessageDigest;publicclassSignUtils{publicstaticfinalStringmd5(Strings,Stringcharset){char[]hexDigits=newchar[]{'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};try{byte[]btInput=s.getBytes(charset);MessageDigestmdInst=MessageDigest.getInstance("MD5");mdInst.update(btInput);byte[]md=mdInst.digest();intj=md.length;char[]str=newchar[j*2];intk=0;for(bytebyte0:md){str[k++]=hexDigits[byte0>>>4&15];str[k++]=hexDigits[byte0&15];}returnnewString(str);}catch(Exceptionvar11){return"";}}}

    四、测试

    4.1、测试类

    简单模拟下发送一个优惠券数据到redis,然后设置超时时间

    packagecom.alian.expiration;importlombok.extern.slf4j.Slf4j;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.TimeUnit;@Slf4j@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublicclassRedisKeyExpirationTest{@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@TestpublicvoidkeyExpiration(){//优惠券信息Stringid="2023021685264735";Map<String,String>map=newHashMap<>();map.put("id",id);map.put("amount","1000");map.put("type","1001");map.put("describe","满减红包");//缓存到redisredisTemplate.opsForHash().putAll("com.mall.coupon.id."+id,map);//设置过期时间redisTemplate.expire("com.mall.coupon.id."+id,10,TimeUnit.SECONDS);}}

    4.2、单实例

    单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:

    10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
    10:23:39 988 INFO [container-2]:Business Handing...
    10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
    10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
    10:23:50 005 INFO [container-3]:Expired keys without processing
    10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

    4.3、多实例

    多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing&hellip;):

    11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
    11:39:06 707 INFO [container-2]:Business Handing...
    11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
    11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
    11:39:16 796 INFO [container-3]:Expired keys without processing
    11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

    8091端口的服务结果如下(Other service is handing&hellip;):

    11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
    11:39:06 707 INFO [container-2]:Other service is handing...
    11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
    11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
    11:39:16 796 INFO [container-3]:Expired keys without processing
    11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

    结果分析:

    • 多实例的情况下,每个实例都会收到过期key通知

    • 通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复

    • 使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)

     </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
    本文:Redis监听过期的key实现流程是什么的详细内容,希望对您有所帮助,信息来源于网络。
    上一篇:利用pip安装python第三方库的方法有哪些下一篇:

    4 人围观 / 0 条评论 ↓快速评论↓

    (必须)

    (必须,保密)

    阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18