• redis 管道(pipeline),深入解读



    前言

    本文源码参考版本:redis-6.2redisson-3.17.5jedis-4.2.0lettuce-6.1.8

    管道,你肯定不陌生,你家里的自来水管、天然气管等,应用相当广泛。这些管道有啥特点?传输特定的物质、流式,… 等等。

    我们知道,redis 是 C/S 模式,即客户端 + 服务端。当两端想要通信时,需要先建立特定的 TCP 握手连接,在进行通信时,采用 发送请求 -> 等待响应 这种一问一答模式。

    当然,这种一问一答模式在 redis 中应用非常广泛,可以说,绝大部分都是这种场景。但是,还是会存在一些特殊的场景,这个时候,我们可能更需要通过批量的方式来处理。

    比如,在一个接口或者某个操作中需要从 redis 查询一大批 key。我们用 redis 查询单条 key 是极快的,但要轮训查询一大批 key,就显得有些慢了。

    redis 提供了 pipeline 解决方案,可以按批次进行查询和响应。

    一、动手试试

    1. 对比

    我们使用客户端工具 redisson ,普通模式和 pipeline 模式分别插入 100 w 条数据进行对比:

        @Test
        public void test() {
            RedissonClient redissonClient = RedissonInit.create();
            StopWatch stopWatch = new StopWatch("test pipeline");
    
    
            // test one
            stopWatch.start("one");
            String test_redisson_one_key = "test_redisson_one_key";
            doForSet((index) -> redissonClient.getBucket(test_redisson_one_key + index)
                    .set("value:" + index, 120, TimeUnit.SECONDS));
            stopWatch.stop();
    
            // test pipeline (batch)
            stopWatch.start("pipeline");
            String test_redisson_batch_key = "test_redisson_batch_key";
            RBatch batch = redissonClient.createBatch();
    
            doForSet((index) -> {
                RBucketAsync<String> bucket = batch.getBucket(test_redisson_batch_key + index);
                bucket.setAsync("value:" + index, 120, TimeUnit.SECONDS);
            });
            batch.execute();
    
            stopWatch.stop();
    
            System.out.println(stopWatch.prettyPrint());
        }
    
        private void doForSet(Consumer<Integer> action) {
            for (int i = 0; i < 1000000; i++) {
                action.accept(i);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    看看运行结果:

    StopWatch 'test pipeline': running time = 130774631969 ns
    ---------------------------------------------
    ns         %     Task name
    ---------------------------------------------
    122750452037  094%  one
    8024179932  006%  pipeline
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    总耗时 130s,其中 pipeline 耗时 8s,占总耗时的 6%,与普通模式相比,提升近 20倍!

    2. 请求

    看到了效果,接下来要看看两端通信的一些细节。我们先设置日志级别,方便打印明细数据:

        @Before
        public void init() {
            ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory
                    .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
            root.setLevel(Level.TRACE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    然后查看 redisson 客户端发送的数据报文:

    *4
    $6
    PSETEX
    $24
    test_redisson_batch_key0
    $6
    120000
    $7
    value:0
    *4
    $6
    PSETEX
    $24
    test_redisson_batch_key1
    $6
    120000
    $7
    value:1
    
    ...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这是 RESP 协议的数据,包括命令都被表示为参数,* 表示一条新的命令开始,*4表示有 4 个参数 ,$6 表示参数长度为6。

    这个命令实际协议值如下:

    *4\r\n$6\r\nPSETEX\r\n$24\r\ntest_redisson_batch_key0\r\n$6\r\n120000\r\n$7\r\nv\r\nlue:0\r\n*4\r\n$6\r\nPSETEX\r\n$24\r\ntest_redisson_batch_key1\r\n$6\r\n120000
    \r\n$7\r\nvalue:1\r\n...
    
    • 1
    • 2

    值得注意的是,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)结尾。

    客户端基本上就是这种格式批量发送数据,可以大胆猜测下,服务器是如何接收、如何处理?

    二、原理

    1.模式

    Redis 是 C/S 模式,正常情况下, 我们是一次请求、一次应答,如下图;

    在这里插入图片描述

    这是最经典的一种模式,应用非常广泛。如果你没有批量处理需求,这种模式应该是最合适的。

    但如果你遇到一些特殊的场景,比如一次性要查询 100、1000 次 redis,这样的 1000 次的请求 + 应答,主要的消耗都浪费在网络通信中,非常不划算。

    那能不能做到一次性批量请求和批量回复?

    其实,这种场景不仅是请求 redis,还有很多类似于这种多次请求的诉求,比如 mysql、http 等都会遇到,解决思路就是 批量操作,以减少网络损耗。如下图:

    在这里插入图片描述
    当然,上图是比较理想的情况,在客户端攒批请求,然后服务端批量响应。

    我们还可以这样,客户端不做攒批,所有请求一条条发送到服务端攒批,然后服务端批量响应,这也是一种思路。

    我画了张图,你可以参考下:

    在这里插入图片描述
    细心的你应该发现了,redis 中事务的实现便是这么模式。其实,pipeline 也可以做成这样,在 redisson 的批量模式中,便有这样的实现。

    当然,你说有没有可能出现多批次应答?也是有可能的,效果是这样:

    在这里插入图片描述

    小结下,在多个 请求/应答 的模式中,我们可以找到两个优化点进行批量操作;

    • 批量发送请求:客户端攒够一批请求,一次性发送到服务器,或者 多批次发送
    • 批量响应请求:服务器一次性响应客户端,或者 服务器多批次响应

    pipeline 的核心思想是,从请求上或者响应上批量处理。

    2.服务端

    如果你来写服务端,你会如何接收/处理 pipeline 一批命令呢?一次性接收所有命令?然后一次性处理所有命令?

    想象下,如果批次命令过多,比如 10000 条命令,采用一次性接收的话,服务端需要在 read 系统调用上消耗 ‘过长’ 的时间(毕竟网络传输不一定可靠)。

    如果一次性处理呢?10000 条指令处理也需要相对 ‘较长’ 的时间,此时其他客户端请求是不是就没法处理了?

    接着,你可能注意到了,我们的请求发送是一条条紧密挨着发送,我们解析数据的时候,肯定会顺序解析,既然一次性执行所有命令不靠谱, 那是不是可以解析一条处理一条?

    没错,redis 服务端便采用这种方式,接收一条指令处理一条指令,直到处理完所有指令,最后批量响应给客户端

    那如果一次性发送特别多指令,又当如何?假设目前一个客户端连接,一次性发送 100k 条命令,能做到一次性响应 100k 指令?

    redis 服务端的处理逻辑中,每轮主循环(aeMain)从 socket 取定长(len = 16k)指令, 并 while 解析一条执行一条、将结果写到客户端缓冲区,直到这批指令解析完成。

    发送客户端时机是 下一轮主循环的 beforeSleep 中,这种一次性读取的批次可以一次性响应。理想情况下,处理是这样的:

    在这里插入图片描述

    但是 100k 条指令,应该要分多个批次处理,那响应也就是多批次了,处理又是这样的:

    在这里插入图片描述

    综上,管道是围绕 批次发送 或者 响应请求 来提升速度,另外,这里我们不用太纠结是一次性批次发送或者响应,还是分多批次。

    本质就是要通过批次来解决问题,同时又不能阻塞服务端其他请求的正常处理,所以,你会看到,redis 服务端并不是一次性读取所有指令然后执行,而是分批次处理(当然,前提是你批量发送的命令特别多)。

    假设在某一个批次中,有 100 个连接(fd)同时有 read 事件,其中一个连接的数据非常大,redis 会读取此连接部分数据;也就是说,当前批次只会处理一个大连接的部分指令 + 其他 99 个连接的指令,然后分别响应客户端。

    至于其中一个还没处理完的连接,需要等到下一批次,继续这种按此规则进行处理。

    总之,我们需要注意的是,对于网络传输和缓冲区,一般都需要排队处理,服务端/客户端都需要 read 读响应。

    3.客户端

    有了上面讲述的几种优化点,客户端基本就是按照以上方式来处理即可:

    • 将数据发送到服务端做攒批
    • 客户端本地内存做攒批

    下面,我们来来看看常见的 jedis、redisson、lettuce 三个客户端的处理方式。

    3.1 redisson

    提供了 客户端攒批服务端攒批 操作:

        public enum ExecutionMode {
    
            // 服务端攒批,原子性执行
            REDIS_READ_ATOMIC,
    
            // 服务端攒批,原子性执行
            REDIS_WRITE_ATOMIC,
    
            // 客户端内存攒批
            IN_MEMORY,
            
            // 客户端内存攒批,但在服务端需要原子性执行
            IN_MEMORY_ATOMIC,
            
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1)客户端攒批,发送数据格式是这样:

    *4
    $6
    PSETEX
    $24
    test_redisson_batch_key0
    $6
    120000
    $7
    value:0
    *4
    $6
    PSETEX
    $24
    test_redisson_batch_key1
    $6
    120000
    $7
    value:1
    
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2)服务端攒批,其实是借助于 MULTI 和 EXEC 指令,发送数据是这样的:

    *1
    $5
    MULTI
    
    • 1
    • 2
    • 3

    响应:+OK

    *4
    $6
    PSETEX
    $24
    test_redisson_batch_key0
    $6
    120000
    $7
    value:0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    响应:+QUEUED

    *1
    $4
    EXEC
    
    • 1
    • 2
    • 3

    响应:+OK

    可见,redisson 处理还是比较灵活, 你可以根据自己的需求进行选择更加合理的方式。

    3.2 jedis

    jedis 作为老牌 redis 客户端,实现很简单,基本上就是 redis 指令的 java 封装版。

    只提供了一种 客户端攒批 的方式,当然这是经典的一种批量处理方式:

        private final JedisPool pool = new JedisPool("127.0.0.1", 6379);
    
        @Test
        public void testJedisBatch() {
            try (Jedis jedis = pool.getResource()) {
                Pipeline p = jedis.pipelined();
    
                String test_jedis_batch_key = "test_jedis_batch_key";
    
                doForSet(
                        (index) ->
                                // 这一步只是 write 到缓冲区
                                p.set(
                        test_jedis_batch_key + index, "value:" + index, SetParams.setParams().ex(120)));
                // 这一步执行 flush,真正将请求发送到 redis server
                p.sync();
            }
        }
    
        private void doForSet(Consumer<Integer> action) {
            for (int i = 0; i < 100000; i++) {
                action.accept(i);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    通过 set 这类操作,其实是将指令追加到客户端缓冲区,等到最后调用 sync 方法时,才是真正 flush 所有数据到服务端。

    3.3 lettuce

    lettuce 提供了和 jedis 类似,也是在 客户端进行攒批,然后一次性发送到 redis 服务端进行处理。

    不同的是 lettuce 底层是通过 Netty 进行通信,因此,也就间接提供了异步请求操作:

        @Test
        public void testLettuce() {
            RedisClient client = RedisClient.create("redis://127.0.0.1");
    
            StatefulRedisConnection<String, String> connection = client.connect();
            RedisAsyncCommands<String, String> commands = connection.async();
    
            // disable auto-flushing
            commands.setAutoFlushCommands(false);
    
            // perform a series of independent calls
            List<RedisFuture<?>> futures = Lists.newArrayList();
            doForSet((index) -> {
                        futures.add(commands.set("key-" + index, "value-" + index));
                        futures.add(commands.expire("key-" + index, 3600));
                    }
            );
    
            // 这一步执行 flush,真正将请求发送到 redis server
            commands.flushCommands();
    
            // synchronization example: Wait until all futures complete
            boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
                    futures.toArray(new RedisFuture[futures.size()]));
            Assert.assertTrue(result);
    
            // later
            connection.close();
        }
    
        private void doForSet(Consumer<Integer> action) {
            for (int i = 0; i < 10; i++) {
                action.accept(i);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35


    对比以上三种客户端,原理基本类似,实现也不负责。可以根据你目前项目使用的客户端,选择其提供的 pipeline 能力使用即可。

    总结

    pipeline 需要客户端和服务端同时支持的一种批量操作,我们一般选择在客户端攒批,然后一次性发送到服务端;

    服务端按顺序处理,然后一次性响应客户端。

    当然,由于内存、网络等各种原因,pipeline 也不一定完全是一次发送、一次性响应模式,可以多次发送,也可以多次响应,比较灵活,具体就看客户端的实现了。

    从实现上来看也比较简单:

    • 如果选择客户端攒批:服务端提供连续、顺序处理即可,客户端本地内存缓存所有操作,然后一次性发送到服务端。
    • 如果选择服务端攒批:这种就更方便了,直接借助于服务端(redis)提供事务支持的 MULTI、EXEC 指令即可。

    redis pipeline 在特定的场景下是不错的提速手段,你可以在你的实战项目中应用起来!




    相关参考:
  • 相关阅读:
    C++刷题测试样例输入输出
    vue3知识点:provide 与 inject
    【Golang开发面经】米哈游(一轮游)
    SASE , sdp等
    OLAP引擎也能实现高性能向量检索,据说QPS高于milvus!
    Java版分布式微服务云开发架构 Spring Cloud+Spring Boot+Mybatis 电子招标采购系统功能清单
    C语言实现将一个正整数分解质因数。例如:输入90,打印出90=2*3*3*5
    Linux内核源码分析 (B.2)虚拟地址空间布局架构
    检测网络:让网络检测通过图表呈现 - gping
    javaee ssm框架整合例子 ssm例子,需要哪些依赖,配置文件如何配置
  • 原文地址:https://blog.csdn.net/ldw201510803006/article/details/126093441