[转帖]Redis进阶(发布订阅,PipeLine,持久化,内存淘汰)

redis,进阶,发布,订阅,pipeline,持久,内存,淘汰 · 浏览次数 : 0

小编点评

**3.2.1 RDB 文件创建方式 自动保存 触发机制** **RDB 文件创建方式** RDB 文件是一种二进制文件,其存储格式和 AOF 文件相同,但数据存储方式不同。RDB 文件在内存中进行持久化,而 AOF 文件则在磁盘上进行持久化。 **自动保存** 当配置了 `save m n` 时,RDB 文件会自动在内存中进行备份,并在 `save 900 1` 或 `save 60 10000` 等情况下,触发 bgsave 命令进行全量备份。 **触发机制** RDB 文件创建过程会触发以下事件: 1. **save m n** 检查内存中是否有至少 `m` 秒内发生 `n` 次变化。 2. **save 900 1** 检查内存中是否有至少 900 秒内发生 1 个变化。 3. **save 60 10000** 检查内存中是否有至少 10000 秒内发生 1 个变化。 **其他信息** * `save` 和 `bgsave` 是在 Redis 中用于持久化的命令。 * `AOF` 文件是用于 Redis 的持久化格式,而 `RDB` 文件是用于 Redis 的内存持久化格式。 * `RDB` 文件的压缩效率比 `AOF` 文件更高,但压缩后的文件大小可能仍然很大。

正文

目录

 

1、发布订阅

1.1 什么是发布订阅

1.2 客户端实例演示

1.3 Java API演示

1.4 Redis发布订阅和rabbitmq的区别

2、批量操作

2.1 普通模式与 PipeLine 模式

2.2 适用场景

2.3 源码解析

2.4 Pipelining的局限性

2.5 事务与 LUA Scripting

3、持久化

3.1 为什么需要持久化

3.2 持久化方式

3.3 文件同步方式

3.4 文件重写(压缩)

3.5 总结

4、内存淘汰

4.1 最大内存设置

4.2 数据淘汰机制


1、发布订阅

1.1 什么是发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息并且可以订阅任意数量的频道。

发布订阅(Pub/Sub):目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者------是不是与设计模式里面的观察者模式一个妈妈生的?

最经典的应用场景就是微博和公众号,任何粉丝只要关注(订阅)了某一个人的微博或者公众号,该微博或者公众号就有有状态更新,都会将消息推送(发布)到粉丝....

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

20210203232133811.png

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

1.2 客户端实例演示

以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为 redisChat:

  1. redis 127.0.0.1:6379> SUBSCRIBE redisChat
  2. Reading messages... (press Ctrl-C to quit)
  3. 1) "subscribe"
  4. 2) "redisChat"
  5. 3) (integer) 1

现在,我们先重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就能接收到消息。

  1. redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
  2. (integer) 1
  3. redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by runoob.com"
  4. (integer) 1
  5. # 订阅者的客户端会显示如下消息
  6. 1) "message"
  7. 2) "redisChat"
  8. 3) "Redis is a great caching technique"
  9. 1) "message"
  10. 2) "redisChat"
  11. 3) "Learn redis by runoob.com"

下表列出了 redis 发布订阅常用命令:

序号命令及描述
1PSUBSCRIBE pattern... 订阅一个或多个符合语法的频道。---> PSUBSCRIBE redisChat* 订阅所有以redisChat开头的频道
2PUBLISH channel message 将信息发送到指定的频道。
3PUNSUBSCRIBE pattern ... 退订所有给定模式的频道。
4SUBSCRIBE channel ... 订阅给定的一个或多个频道的信息。

 

1.3 Java API演示

1.3.1 引入jedis依赖

  1. <dependency>
  2.            <groupId>redis.clients</groupId>
  3.            <artifactId>jedis</artifactId>
  4.            <version>2.9.3</version>
  5.        </dependency>

1.3.2 Publisher (发布者)

  1. package com.ydt.redis.pubsub;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import java.io.BufferedReader;
  5. import java.io.IOException;
  6. import java.io.InputStreamReader;
  7. public class Publisher extends Thread{
  8.    private final JedisPool jedisPool;
  9.    public Publisher(JedisPool jedisPool) {
  10.        this.jedisPool = jedisPool;
  11.   }
  12.    
  13.    @Override
  14.    public void run() {
  15.        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
  16.        Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接
  17.        while (true) {
  18.            String line = null;
  19.            try {
  20.                line = reader.readLine();
  21.                if (!"quit".equals(line)) {
  22.                    jedis.publish("mychannel", line);   //从 mychannel 的频道上推送消息
  23.               } else {
  24.                    break;
  25.               }
  26.           } catch (IOException e) {
  27.                e.printStackTrace();
  28.           }
  29.       }
  30.   }
  31. }

1.3.3 Subscriber(订阅者)

  1. package com.ydt.redis.pubsub;
  2. import redis.clients.jedis.JedisPubSub;
  3. //订阅者需要继承JedisPubSub,来重写它的三个方法
  4. public class Subscriber extends JedisPubSub {
  5.    public Subscriber(){}
  6.    @Override
  7.    public void onMessage(String channel, String message) {       //收到消息会调用
  8.        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
  9.   }
  10.    @Override
  11.    public void onSubscribe(String channel, int subscribedChannels) {    //订阅了频道会调用
  12.        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
  13.                channel, subscribedChannels));
  14.   }
  15.    @Override
  16.    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅 会调用
  17.        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
  18.                channel, subscribedChannels));
  19.   }
  20. }

1.3.4 SubThread(订阅频道)

  1. package com.ydt.redis.pubsub;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. public class SubThread extends Thread {
  5.    private final JedisPool jedisPool;
  6.    private final Subscriber subscriber = new Subscriber();
  7.    private final String channel = "mychannel";
  8.    public SubThread(JedisPool jedisPool) {
  9.        super("SubThread");
  10.        this.jedisPool = jedisPool;
  11.   }
  12.    @Override
  13.    public void run() {
  14.        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
  15.        Jedis jedis = null;
  16.        try {
  17.            jedis = jedisPool.getResource();   //取出一个连接
  18.            jedis.subscribe(subscriber, channel);    //通过subscribe 的api去订阅,入参是订阅者和频道名
  19.       } catch (Exception e) {
  20.            System.out.println(String.format("subsrcibe channel error, %s", e));
  21.       } finally {
  22.            if (jedis != null) {
  23.                jedis.close();
  24.           }
  25.       }
  26.   }
  27. }

1.3.5 测试

  1. package com.ydt.redis.pubsub;
  2. import redis.clients.jedis.JedisPool;
  3. import redis.clients.jedis.JedisPoolConfig;
  4. public class PubSubDemo {
  5.    public static void main( String[] args )
  6.   {
  7.        // 连接redis服务端
  8.        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "192.168.223.128", 6379);
  9.        
  10.        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "192.168.223.128", 6379));
  11.        SubThread subThread1 = new SubThread(jedisPool);  //订阅者1
  12.        subThread1.start();
  13.        SubThread subThread2 = new SubThread(jedisPool);  //订阅者2
  14.        subThread2.start();
  15.        Publisher publisher = new Publisher(jedisPool);    //发布者
  16.        publisher.start();
  17.   }
  18. }

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

1.4 Redis发布订阅和rabbitmq的区别

  1. 可靠性
  2. redis :没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中(临时内存,没有消费者就直接清除,有消费者就直接消费掉);
  3. rabbitmq:具有消息消费确认机制,如果发布一条消息,还没有消费者消费该队列,那么这条消息将一直存放在队列中,直到有消费者消费了该条消息,以此可以保证消息的可靠消费;
  4. 实时性
  5. redis:实时性高,redis作为高效的缓存服务器,所有数据都存在在服务器中,所以它具有更高的实时性
  6. 消费者负载均衡
  7. rabbitmq队列可以被多个消费者同时监控消费,但是每一条消息只能被消费一次,由于rabbitmq的消费确认机制,因此它能够根据消费者的消费能力而调整它的负载(主要通过Routing Key);
  8. redis发布订阅模式,一个频道可以被多个消费者同时订阅,当有消息到达时,会将该消息依次发送给每个订阅者;
  9. 持久性
  10. redis:redis的持久化是针对于整个redis缓存的内容,它有RDB和AOF两种持久化方式(redis持久化方式,可以提前看本课件第三节),可以将整个redis实例持久化到磁盘,以此来做数据备份,防止异常情况下导致数据丢失。
  11. rabbitmq:队列,消息都可以选择性持久化,持久化粒度更小,更灵活;
  12. 队列监控
  13. rabbitmq实现了后台监控平台,可以在该平台上看到所有创建的队列的详细情况,良好的后台管理平台可以方便我们更好的使用;
  14. redis没有所谓的监控平台。
  15. 总结
  16. redis: 轻量级,低延迟,高并发,低可靠性;
  17. rabbitmq:重量级,高可靠,异步,不保证实时;
  18. rabbitmq是一个专门的AMQP协议队列,他的优势就在于提供可靠的队列服务,并且可做到异步,而redis主要是用于缓存的,redis的发布订阅模块,可用于实现及时性,且可靠性低的功能。

 

2、批量操作

概要理论

Redis 的 pipeline(管道)功能在命令行中没有,但 redis 是支持 pipeline 的,而且在各个语言版的 client 中都有相应的实现。 由于网络开销延迟,就算 redis server 端有很强的处理能力,也会由于收到的 client 消息少,而造成吞吐量小。当 client 使用 pipelining 发送命令时,redis server 必须将部分请求放到队列中(使用内存),执行完毕后一次性发送结果!

Pipeline 在某些场景下非常有用,比如有多个 command 需要被“及时的”提交,而不需要“及时的”响应,那么 pipeline 就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是 TCP 连接中减少了“交互往返”的时间

2.1 普通模式与 PipeLine 模式

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

 

很明显,减少了连接次数,节省了网络资源

下面,我们通过一个demo看下具体效果,分别以两种方式往redis中插入10000条数据

pom.xml

  1. <dependencies>
  2.        <dependency>
  3.            <groupId>org.springframework</groupId>
  4.            <artifactId>spring-context</artifactId>
  5.            <version>5.0.5.RELEASE</version>
  6.        </dependency>
  7.        <dependency>
  8.            <groupId>junit</groupId>
  9.            <artifactId>junit</artifactId>
  10.            <version>4.12</version>
  11.        </dependency>
  12.        <dependency>
  13.            <groupId>redis.clients</groupId>
  14.            <artifactId>jedis</artifactId>
  15.            <version>2.9.3</version>
  16.        </dependency>
  17.        <!-- spring-redis -->
  18.        <dependency>
  19.            <groupId>org.springframework.data</groupId>
  20.            <artifactId>spring-data-redis</artifactId>
  21.            <version>1.6.4.RELEASE</version>
  22.        </dependency>
  23.        <dependency>
  24.            <groupId>org.apache.commons</groupId>
  25.            <artifactId>commons-pool2</artifactId>
  26.            <version>2.4.2</version>
  27.        </dependency>
  28.    </dependencies>

spring配置文件

  1. <!--Jedis连接池的相关配置-->
  2.    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
  3.        <property name="maxTotal">
  4.            <value>200</value>
  5.        </property>
  6.        <property name="maxIdle">
  7.            <value>50</value>
  8.        </property>
  9.        <property name="testOnBorrow" value="true"/>
  10.        <property name="testOnReturn" value="true"/>
  11.    </bean>
  12.    <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
  13.        <constructor-arg name="poolConfig" ref="jedisPoolConfig" />
  14.        <constructor-arg name="host" value="127.0.0.1" />
  15.        <constructor-arg name="port" value="6379" type="int" />
  16.        <constructor-arg name="timeout" value="30000" type="int" />
  17.    </bean>

测试代码

  1. @Test
  2.    public void testGeneralAndPipeline(){
  3.        JedisPool jedisPool = (JedisPool) context.getBean("jedisPool");
  4.        Jedis jedis = jedisPool.getResource();
  5.        Logger logger = Logger.getLogger(ClusterTest.class);
  6.        long start = System.currentTimeMillis();
  7.        for (int i = 0; i < 10000; i++) {
  8.            jedis.set(String.valueOf(i), String.valueOf(i));
  9.       }
  10.        long end = System.currentTimeMillis();
  11.        logger.info("the general total time is:" + (end - start));
  12.        Pipeline pipe = jedis.pipelined(); // 先创建一个 pipeline 的链接对象
  13.        long start_pipe = System.currentTimeMillis();
  14.        for (int i = 0; i < 10000; i++) {
  15.            pipe.set(String.valueOf(i), String.valueOf(i));
  16.       }
  17.        pipe.sync(); // 获取所有的 response
  18.        long end_pipe = System.currentTimeMillis();
  19.        logger.info("the pipe total time is:" + (end_pipe - start_pipe));
  20.   }

执行结果:

  1. 16:03:17,592 INFO ClusterTest:114 - the general total time is:2634
  2. 16:03:17,692 INFO ClusterTest:123 - the pipe total time is:77

可以很明显的看到,差别是巨大的!

2.2 适用场景

有些系统可能对可靠性要求很高,每次操作都需要立马知道这次操作是否成功,是否数据已经写进 redis 了,那这种场景就不适合。----总时间长,单条响应快

  还有的系统,可能是批量的将数据写入 redis,允许一定比例的写入失败,那么这种场景就可以使用了,比如10000条一下进入 redis,可能失败了2条无所谓,后期有补偿机制就行了,比如短信群发这种场景,如果一下群发10000条,按照第一种模式去实现,那这个请求过来,要很久才能给客户端响应,这个延迟就太长了,如果客户端请求设置了超时时间5秒,那肯定就抛出异常了,而且本身群发短信要求实时性也没那么高,这时候用 pipeline 最好了。---总时间短,单条响应跟总时间一样,自然比普通模式响应要慢

2.3 源码解析

第一阶段:

循环调用set方法,将所有的command命令写入输出流,但是输出流一直没有调用flush命令将数据刷入缓存!

  1. protected Connection sendCommand(final Command cmd, final byte[]... args) {
  2.    try {
  3.      connect();
  4.      Protocol.sendCommand(outputStream, cmd, args);
  5.      pipelinedCommands++;
  6.      return this;
  7.   } catch (JedisConnectionException ex) {
  8.   .......
  9.   }
  10.    
  11.  private static void sendCommand(final RedisOutputStream os, final byte[] command,
  12.      final byte[]... args) {
  13.    try {
  14.      os.write(ASTERISK_BYTE);
  15.      os.writeIntCrLf(args.length + 1);
  16.      os.write(DOLLAR_BYTE);
  17.      os.writeIntCrLf(command.length);
  18.      os.write(command);
  19.      os.writeCrLf();
  20.      for (final byte[] arg : args) {
  21.        os.write(DOLLAR_BYTE);
  22.        os.writeIntCrLf(arg.length);
  23.        os.write(arg);
  24.        os.writeCrLf();
  25.     }
  26.   } catch (IOException e) {
  27.      throw new JedisConnectionException(e);
  28.   }
  29. }

第二阶段:

调用sync方法,一次性将outputStream里面的数据通过Socket刷入redis serverr

  1. public List<Object> getAll(int except) {
  2.  List<Object> all = new ArrayList<Object>();
  3.  flush();
  4.  while (pipelinedCommands > except) {
  5.    try {
  6.      all.add(readProtocolWithCheckingBroken());
  7.   } catch (JedisDataException e) {
  8.      all.add(e);
  9.   }
  10.    pipelinedCommands--;
  11. }
  12.  return all;
  13. }
  14. protected void flush() {
  15.    try {
  16.      outputStream.flush();
  17.   } catch (IOException ex) {
  18.      broken = true;
  19.      throw new JedisConnectionException(ex);
  20.   }
  21. }

 

对比普通模式:

每一次set的时候都会调用flush方法,将输出流中的字节数据刷入redis server

  1. public String set(final String key, final String value) {
  2.  checkIsInMultiOrPipeline();//检查是否管道模式批量提交
  3.  client.set(key, value);
  4.  return client.getStatusCodeReply();
  5. }
  6. public String getStatusCodeReply() {
  7.    flush();
  8.    pipelinedCommands--;
  9.    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
  10.    if (null == resp) {
  11.      return null;
  12.   } else {
  13.      return SafeEncoder.encode(resp);
  14.   }
  15. }

 

2.4 Pipelining的局限性

Pipelining只能用于执行连续且无相关性的命令,当某个命令的生成需要依赖于前一个命令的返回时,就无法使用Pipelining了。通过 LUA Scripting功能,可以规避这一局限性

2.5 事务与 LUA Scripting

2.5.1 事务实现

Pipelining能够让Redis在一次交互中处理多条命令,然而在一些场景下,我们可能需要在此基础上确保这一组命令是连续执行的。比如获取当前累计的count

  1. 127.0.0.1:6379> SET count 100
  2. OK
  3. 127.0.0.1:6379> INCR count
  4. (integer) 101
  5. 127.0.0.1:6379> get count
  6. "101"

如果在GET和SET命令之间插进来一个INCR count,就会使客户端拿到的count不准确。

Redis的事务可以确保复数命令执行时的原子性。

也就是说Redis能够保证:一个事务中的一组命令是绝对连续执行的,在这些命令执行完成之前,绝对不会有来自于其他连接的其他命令插进去执行

通过MULTI和EXEC命令来把这两个命令加入一个事务中:

  1. 127.0.0.1:6379> MULTI
  2. OK
  3. 127.0.0.1:6379> set name laohu
  4. QUEUED
  5. 127.0.0.1:6379> set age 18
  6. QUEUED
  7. 127.0.0.1:6379> set sex man
  8. QUEUED
  9. 127.0.0.1:6379> EXEC
  10. 1) OK
  11. 2) OK
  12. 3) OK

Java实现:

  1. @Test
  2. public void testMulti() throws InterruptedException {
  3. for (int i = 0; i < 10; i++) {
  4. if( i == 0 ){
  5. jedis.set("" + i, i +"");
  6. }else {
  7. jedis.set("" + i, (i + Integer.valueOf(jedis.get((i-1)+""))) + "");
  8. }
  9. Thread.sleep(10000);//此处有人捣乱?
  10. }
  11. }
  12. @Test
  13. public void testTransaction() throws InterruptedException {
  14. Transaction transaction = jedis.multi();
  15. int last = 0;
  16. for (int i = 0; i < 10; i++) {
  17. if( i == 0 ){
  18. transaction.set("" + i, i +"");
  19. }else {
  20. transaction.set("" + i, (i + last) + "");
  21. }
  22. last = i +last;
  23. }
  24. transaction.exec();
  25. }
  • Redis在接收到MULTI命令后便会开启一个事务,这之后的所有读写命令都会保存在队列中但并不执行

  • 直到接收到EXEC命令后,Redis会把队列中的所有命令连续顺序执行,并以数组形式返回每个命令的返回结果。

  • 可以使用DISCARD命令放弃当前的事务,将保存的命令队列清空。

  • 需要注意的是,Redis事务不支持回滚:如果一个事务中的命令出现了语法错误,大部分客户端驱动会返回错误

2.6.5版本以上的Redis也会在执行EXEC时检查队列中的命令是否存在语法错误,如果存在,则会自动放弃事务并返回错误。 但如果一个事务中的命令有非语法类的错误(比如对String执行HSET操作),无论客户端驱动还是Redis都无法在真正执行这条命令之前发现,所以事务中的所有命令仍然会被依次执行。

在这种情况下,会出现一个事务中部分命令成功部分命令失败的情况,然而与RDBMS(关系型数据库)不同,Redis不提供事务回滚的功能,所以只能通过其他方法进行数据的回滚。

 

2.4.2 通过事务实现CAS

Redis提供了WATCH命令与事务搭配使用,实现CAS乐观锁的机制。

  1. 127.0.0.1:6379> watch count
  2. OK
  3. 127.0.0.1:6379> MULTI
  4. OK
  5. 127.0.0.1:6379> INCR count
  6. QUEUED
  7. 127.0.0.1:6379> INCR count #这一步执行完后,开启另外一个窗口,执行INCR count,修改该key,让事务执行时返回失败
  8. QUEUED
  9. 127.0.0.1:6379> EXEC
  10. (nil)
  11. 127.0.0.1:6379> get count
  12. "101"

WATCH的机制是:在事务EXEC命令执行时,Redis会检查被WATCH的key,只有被WATCH的key从WATCH起始时至今没有发生过变更,EXEC才会被执行。

如果WATCH的key在WATCH命令到EXEC命令之间发生过变化,则EXEC命令会返回失败

Java模拟场景,100个人下单,100个库存,加上watch和不加的区别

  1. @Test
  2. public void testWatch() throws Exception {
  3. jedis.set("test_count", "100");
  4. ExecutorService executorService = Executors.newCachedThreadPool();
  5. for (int i = 0; i < 200; i++) {
  6. Thread.sleep(100);
  7. executorService.execute(new Runnable() {
  8. @Override
  9. public void run() {
  10. go();
  11. }
  12. });
  13. }
  14. }
  15. private void go(){
  16. JedisPool jedisPool = (JedisPool) context.getBean("jedisPool");
  17. Jedis jedis = jedisPool.getResource();
  18. try {
  19. String key_s = "user_name";// 抢到的用户
  20. String key = "test_count";// 商品数量
  21. String clientName = UUID.randomUUID().toString().replace("-", "");// 用户名字
  22. while (true) {
  23. try {
  24. /*jedis.watch(key);// key加上乐观锁*/
  25. System.out.println("用户:" + clientName + "开始抢商品");
  26. System.out.println("当前商品的个数:" + jedis.get(key));
  27. int prdNum = Integer.parseInt(jedis.get(key));// 当前商品个数
  28. if (prdNum > 0) {
  29. Transaction transaction = jedis.multi();// 标记一个事务块的开始
  30. transaction.set(key, String.valueOf(prdNum - 1));
  31. List<Object> result = transaction.exec();// 原子性提交事物
  32. if (result == null || result.isEmpty()) {
  33. System.out.println("用户:" + clientName + "没有抢到商品");// 可能是watch-key被外部修改,或者是数据操作被驳回
  34. } else {
  35. jedis.sadd(key_s, clientName);// 将抢到的用户存起来
  36. System.out.println("用户:" + clientName + "抢到商品");
  37. break;
  38. }
  39. } else {
  40. System.out.println("库存为0,用户:" + clientName + "没有抢到商品");
  41. break;
  42. }
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. } finally {
  46. /*jedis.unwatch();// exec,discard,unwatch命令都会清除连接中的所有监视*/
  47. }
  48. } // while
  49. } catch (Exception e) {
  50. System.out.println("redis bug:" + e.getMessage());
  51. } finally {
  52. // 释放jedis连接
  53. try {
  54. jedis.close();
  55. } catch (Exception e) {
  56. System.out.println("redis bug:" + e.getMessage());
  57. }
  58. }
  59. }

 

2.4.3 LUA Scripting

通过EVAL与EVALSHA命令,可以让Redis执行LUA脚本。这就类似于RDBMS的存储过程一样,可以把客户端与Redis之间密集的读/写交互放在服务端进行,避免过多的数据交互,提升性能。

Scripting功能是作为事务功能的替代者诞生的,事务提供的所有能力Scripting都可以做到。Redis官方推荐使用LUA Script来代替事务,以兼容性能和避免管道事务执行的无序性问题

  1. @Test
  2. public void testLua(){
  3. StringBuffer sb = new StringBuffer();
  4. for (int i = 0; i < 5; i++) {
  5. sb.append("redis.call('set','lua"+i+"','hello lua"+i+"');");
  6. }
  7. sb.append("return redis.call('get' ,'lua3')");
  8. System.out.println(jedis.eval(String.valueOf(sb)));
  9. }

3、持久化

3.1 为什么需要持久化

因为Redis是内存数据库,它将自己的数据存储在内存里面,一旦Redis服务器进程退出或者运行Redis服务器的计算机停机,Redis服务器中的数据就会丢失。

为了避免数据丢失,所以Redis提供了持久化机制,将存储在内存中的数据保存到磁盘中,用于在Redis服务器进程退出或者运行Redis服务器的计算机停机导致数据丢失时,快速的恢复之前Redis存储在内存中的数据。

3.2 持久化方式

3.2.1 Redis持久化---RDB

RDB持久化是将某个时间点上Redis中的数据保存到一个RDB文件中,也叫快照持久化;

该文件是一个经过压缩的二进制文件,通过该文件可以还原生成RDB文件时Redis中的数据 ;

过程如下:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

3.2.1.1 创建RDB文件

Redis提供了2个命令来创建RDB文件,一个是SAVE,另一个是BGSAVE。

SAVE命令会阻塞Redis服务器进程,直到RDB文件创建完毕为止,在服务器进程阻塞期间,服务器不能处理任何命令请求

20210203232311144.png

BGSAVE命令会派生出一个子进程,然后由子进程负责创建RDB文件,服务器进程(父进程)继续处理命令请求

20210203232322758.png

PS: 因为BGSAVE命令可以在不阻塞服务器进程的情况下执行,所以推荐使用BGSAVE命令

 

3.2.1.2 RDB文件创建方式

自动保存

触发机制有如下:

1、save m n

自动触发最常见的情况是在配置文件中通过配置save m n,指定当m秒内发生n次变化时,会触发bgsave。

  1. save 900 1
  2. save 300 10
  3. save 60 10000

其中save 900 1的含义是:当时间到900秒时,如果Redis数据发生了至少1次变化,则执行bgsave;save 300 10和save 60 10000同理。当三个save条件满足任意一个时,都会引起bgsave的调用。

PS:1、Redis的save m n,是通过serverCron函数、dirty计数器、和lastsave时间戳来实现的。

20210203232333765.png

2、主从复制场景下,如果从节点执行全量复制操作,则主节点会执行 bgsave 命令,并将rdb文件发送给从节点 (主从复制后面会说到)

3、执行shutdown命令时也会触发自动保存

20210203232343431.png

4、重启redis,执行debug reload时也会自动触发保存

20210203232351379.png

手动保存

save和bgsave命令,推荐使用bgsave,不过说了等于没说,基本上都不会直接手动保存,~ ~

3.2.1.3 总结

优点:

RDB文件小,非常适合定时备份,用于灾难恢复。

因为RDB文件中直接存储的是内存数据,而AOF文件中存储的是一条条命令,需要执行命令。Redis加载RDB文件的速度比AOF快很多。

缺点:

RDB持久化方式不能做到实时/秒级持久化。实时持久化要全量刷内存到磁盘,成本太高,影响性能。RDB文件是二进制文件,随着Redis不断迭代有多个rdb文件的版本,不支持跨版本兼容。老的Redis无法识别新的RDB文件格式。

 

3.2.2 Redis持久化---AOF

3.2.2.1 AOF持久化概念

AOF持久化是通过保存Redis服务器所执行的写命令来记录数据库数据的 ,图示

20210203232413492.png

 

3.2.2.2 开启AOF

AOF持久化默认情况下是关闭的,使用前需要先开启,编辑redis.conf文件,修改如下:

20210203232423402.png

执行命令生成如下文件:

20210203232621669.png

编辑打开,可以看到里面的所有命令:

20210203232629606.png

3.3 文件同步方式

appendfsync配置主要是保证集群中数据的一致性

选项安全性效率
everysec(推荐)安全性适中,故障停机,数据库也只会丢失一秒的命令数据每秒同步一次
always安全性最高,故障停机,数据库也只会丢失一个事件循环中所产生的命令数据每一个命令
no安全性最低,故障停机,数据库会丢失上次同步AOF文件之后的所有写命令数据不主动同步,完全依赖操作系统,一般linux为30秒左右一次

 

3.4 文件重写(压缩)

动机: 随着运行时间的增长,执行的命令越来越多,会导致AOF文件越来越大,当AOF文件过大时,redis会执行重写机制来压缩AOF文件

AOF文件重写并不需要对现有的AOF文件进行任何读取、分析或者写入操作,而是通过读取服务器当前的数据库数据来实现的 ,主要是将文件中无效命令剔除,如:

  • 同一个key的值,只保留最后一次写入

  • 已删除或者已过期数据相关命令会被去除

自动配置:

  1. auto-aof-rewrite-min-size 64MB // 当文件小于64M时不进行重写
  2. auto-aof-rewrite-min-percenrage 100 // 当文件比上次重写后的文件大100%时进行重写

手动触发:

20210203232641535.png

响应

20210203232648706.png

重写的过程

  • 从主进程中fork出子进程,并拿到fork时的AOF文件数据写到一个临时AOF文件中---》分离子进程

  • 在重写过程中,redis收到的命令会同时写到AOF缓冲区和重写缓冲区中,这样保证重写不丢失重写过程中的命令---》过程写操作命令收集

  • 重写完成后通知主进程,主进程会将AOF缓冲区中的数据追加到子进程生成的文件中---》追加写操作命令数据

  • redis会原子的将旧文件替换为新文件,并开始将数据写入到新的aof文件上---》替换旧AOF文件

效果图:

重写前:2021020323375829.png----------》重写后20210203233806702.png

 

 

3.5 总结

AOF优缺点

优点:数据的完整性和一致性更高 ​ 缺点:因为AOF记录的内容多,文件会越来越大,数据恢复也会越来越慢,因为不管怎么压缩,它的命令基数总是在不断增长的。

RDB和AOF模式一般都会一起使用,毕竟数据的稳定性高于一切,两者同时持久化时,数据的恢复流程图如下:

 

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

4、内存淘汰

4.1 最大内存设置

默认情况下,在32位OS中,Redis最大使用3GB的内存,在64位OS中则没有限制。

在使用Redis时,应该对数据占用的最大空间有一个基本准确的预估,并为Redis设定最大使用的内存。

否则在64位OS中Redis会无限制地占用内存(当物理内存被占满后会使用swap空间),容易引发各种各样的问题。

通过如下配置控制Redis使用的最大内存:

maxmemory 100mb

在内存占用达到了maxmemory后,再向Redis写入数据时,Redis会:

  • 根据配置的数据淘汰策略尝试淘汰数据,释放空间

  • 如果没有数据可以淘汰,或者没有配置数据淘汰策略,那么Redis会对所有写请求返回错误,但读请求仍然可以正常执行

    异常信息:io.lettuce.core.RedisCommandExecutionException: OOM command not allowed when used memory > 'maxmemory'.

     

在为Redis设置maxmemory时,需要注意:

如果采用了Redis的主从同步,主节点向从节点同步数据时,会占用掉一部分内存空间

如果maxmemory过于接近主机的可用内存,会导致数据同步时内存不足。

所以设置的maxmemory不要过于接近主机可用的内存,留出一部分预留用作主从同步。

4.2 数据淘汰机制

Redis提供了5种数据淘汰策略:

  • volatile-lru:使用LRU算法进行数据淘汰(淘汰上次使用时间最早的,且使用次数最少的key),只淘汰设定了有效期的key

  • allkeys-lru:使用LRU算法进行数据淘汰,所有的key都可以被淘汰

  • volatile-random:随机淘汰数据,只淘汰设定了有效期的key

  • allkeys-random:随机淘汰数据,所有的key都可以被淘汰

  • volatile-ttl:淘汰剩余有效期最短的key

最好为Redis指定一种有效的数据淘汰策略以配合maxmemory设置,避免在内存使用满后发生写入失败的情况。

一般来说,推荐使用的策略是volatile-lru,并辨识Redis中保存的数据的重要性。

对于那些重要的,绝对不能丢弃的数据(如配置类数据等),应不设置有效期,这样Redis就永远不会淘汰这些数据。

对于那些相对不是那么重要的,并且能够热加载的数据(比如缓存最近登录的用户信息,当在Redis中找不到时,程序会去DB中读取),可以设置上有效期,这样在内存不够时Redis就会淘汰这部分数据。

配置方法:

maxmemory-policy volatile-lru   #默认是noeviction,即不进行数据淘汰

 

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树进阶任务Java问答88997 人正在系统学习中

与[转帖]Redis进阶(发布订阅,PipeLine,持久化,内存淘汰)相似的内容:

[转帖]Redis进阶(发布订阅,PipeLine,持久化,内存淘汰)

目录 1、发布订阅 1.1 什么是发布订阅 1.2 客户端实例演示 1.3 Java API演示 1.4 Redis发布订阅和rabbitmq的区别 2、批量操作 2.1 普通模式与 PipeLine 模式 2.2 适用场景 2.3 源码解析 2.4 Pipelining的局限性 2.5 事务与 L

[转帖]Redis学习三(进阶功能).

https://www.cnblogs.com/jmcui/p/11707970.html 阅读目录 一、排序 二、事务 三、流水线(pipeline) 四、发布订阅 回到顶部 一、排序 redis 支持对 list,set 和 zset 元素的排序,排序的时间复杂度是 O(N+M*log(M))。

[转帖]Redis进阶实践之六Redis Desktop Manager连接Windows和Linux系统上的Redis服务

https://www.cnblogs.com/PatrickLiu/p/8360057.html 一、引言 今天本来没有打算写这篇文章,但是,今天测试Redis的时候发现了两个问题,第一个问题是:Redis Desktop Manager无法连接虚拟机上Linux系统上的Redis服务,第二个问题

[转帖]【Redis系列】Redis发布版本历史及特性

目录 概述Redis2.6Redis2.8Redis3.0Redis3.2Redis4.0Redis5.0Redis6.0Redis7.0 概述 Redis 使用标准版本标记进行版本控制:major.minor.patchlevel。 偶数的版本号表示稳定的版本, 例如 1.2,2.0,2.2,2.

[转帖]Redis 最佳实践(上)

https://my.oschina.net/jiagoushi/blog/5601975 引言 尽管 redis 是一款非常优秀的 NoSQL 数据库,但更重要的是,作为使用者我们应该学会在不同的场景中如何更好的使用它,更大的发挥它的价值。主要可以从这四个方面进行优化:Redis 键值设计、批处理

[转帖]RedisTemplate写入Redis数据出现无意义乱码前缀\xac\xed\x00\x05

背景 项目使用Spring的RedisTemplate进行Redis数据存取操作,实际应用中发现Redis中key和value会出现“无意义”乱码前缀\xac\xed\x00\x05t\x00-(样例\xac\xed\x00\x05t\x00-abcd:abc:xxxxxx:passport:ass

[转帖]当Redis数据磁盘坏掉之后会发生什么

https://www.zeekling.cn/articles/2020/11/09/1604937462651.html 问题简介 当Redis cluster集群数据所在磁盘的RAID卡坏掉了之后会发生什么?集群会不会进行故障迁移,以及怎么快速恢复。 问题1:cluster集群会进行故障迁移嘛

[转帖]为什么我Redis中key惊现“乱码”?

为什么Redis中key会惊现“乱码”? 最近在做一个秒杀项目,过程中大量应用到了redis。 而我在用ElasticJob进行数据化初始化到Redis数据库时发现这些key都出现了一段前缀“乱码”。 数据结构为Hash,可以观察到hashkey也带有前缀“乱码” 这究竟是怎么回事呢?原来问题出在这

[转帖]Redis进阶实践之十四 Redis-cli命令行工具使用详解第一部分

https://www.cnblogs.com/PatrickLiu/p/8508975.html 一、介绍 redis学了有一段时间了,以前都是看视频,看教程,很少看官方的东西。现在redis的东西要看的都差不多看完了。网上的东西也不多了。剩下来就看看官网的东西吧,一遍翻译,一遍测试。不错的使用体

[转帖]Redis进阶实践之十五 Redis-cli命令行工具使用详解第二部分(结束)

https://www.cnblogs.com/PatrickLiu/p/8527770.html 一、介绍 今天继续redis-cli使用的介绍,上一篇文章写了一部分,写到第9个小节,今天就来完成第二部分。话不多说,开始我们今天的讲解。如果要想看第一篇文章,地址如下:http://www.cnbl