• 【 OpenGauss源码学习 —— 列存储(update)】


    声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
    本文主要参考了 OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档

    概述

      在先前的学习中分别介绍了列存储(创建表)和列存储(Insert),相比大家对列存储也有了初步的认识和了解。本文接下来将学习列存储(update)的相关知识
      列存储和行存储是数据库中两种不同的数据存储方式,它们在数据更新操作上有一些显著的区别:

    1. 行存储(Row-based storage):
    • 存储方式: 行存储以行为单位存储数据,整个行的数据被存储在一起。
    • 更新操作: 当执行更新操作时,行存储需要更新整行的数据即使只修改了一部分字段,也需要更新整个行,这可能导致额外的存储和 I/O 操作。
    1. 列存储(Column-based storage):
    • 存储方式: 列存储以列为单位存储数据,每一列的数据被存储在一起。
    • 更新操作: 列存储在更新操作上具有优势,因为它只需要更新涉及到的列。当只修改表中的一部分字段时,只有相关列需要被更新,而其他列保持不变。这可以减少更新操作的开销。

    相关函数

      按照传统,我们按照一个实际案例来学习,创建一个名为 sales列存储表,用于存储销售数据,表的结构如下:

    --------创建表
    CREATE TABLE columnar_test (
        id INT,
        name VARCHAR(50),
        age INT,
        city VARCHAR(50)
    ) WITH (ORIENTATION = COLUMN);
    
    --------插入数据
    INSERT INTO columnar_test (id, name, age, city)
    VALUES
        (1, 'Alice', 25, 'New York'),
        (2, 'Bob', 30, 'San Francisco'),
        (3, 'Charlie', 28, 'Los Angeles'),
        (4, 'David', 35, 'Chicago'),
        (5, 'Eva', 22, 'Miami');
    
    postgres=# select * from columnar_test;
     id |  name   | age |     city
    ----+---------+-----+---------------
      1 | Alice   |  25 | New York
      2 | Bob     |  30 | San Francisco
      3 | Charlie |  28 | Los Angeles
      4 | David   |  35 | Chicago
      5 | Eva     |  22 | Miami
    (5 rows)
    
    --------更新数据
    UPDATE columnar_test SET name = 'kuchiki' where id = 1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    CStoreUpdate::ExecUpdate 函数

      CStoreUpdate::ExecUpdate 函数主要用于执行更新操作。首先,它确保输入的 VectorBatch 和相关的执行信息不为空,并进行必要的断言检查。然后,通过 JunkFilter废弃数据过滤器)执行删除操作,将批量数据中的符合删除条件的行进行删除。接着,调整批量数据的列数以适应执行环境,并检查批量数据是否符合表的约束条件。最后,根据是分区表还是非分区表,调用相应的批量插入方法BatchInsert)执行更新后的插入操作。函数返回更新的行数。这一系列操作确保了在更新数据时的一致性和有效性。

    函数接受两个参数:

    1. VectorBatch* batch: 表示待更新的批量数据,以 VectorBatch 的形式传入。VectorBatch 通常是一组行存储的数据包含多个列
    2. int options: 表示更新操作的选项,以整数形式传入。这个参数可能包含有关更新行为的一些额外信息,比如是否跳过写入日志(WAL)等。

      函数入参调试信息如下所示:

    (gdb) p *batch
    $1 = {<BaseObject> = {<No data fields>}, m_rows = 1, m_cols = 6, m_checkSel = false, m_sel = 0x7f8e87f29450, m_arr = 0x7f8e87f29880, m_sysColumns = 0x0,
      m_pCompressBuf = 0x0}
    (gdb) p *batch.m_sel
    $2 = true
    (gdb) p *batch.m_arr
    $4 = {<BaseObject> = {<No data fields>}, m_rows = 1, m_desc = {<BaseObject> = {<No data fields>}, typeId = 23, typeMod = -1, encoded = false}, m_const = false,
      m_flag = 0x7f8e87e8df40 "", m_buf = 0x7f8e87f22088, m_vals = 0x7f8e87f20060, m_addVar = (Datum (ScalarVector::*)(ScalarVector * const, Datum, int)) 0x173136a
         <ScalarVector::AddHeaderVar(unsigned long, int)>}
    (gdb) p options
    $5 = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

      VectorBatch 类的部分成员函数解释如下:

    class VectorBatch : public BaseObject {
    public:
        // 批量数据中的行数。
        int m_rows;
    
        // 批量数据中的列数。
        int m_cols;
    
        // 是否检查选择向量。
        bool m_checkSel;
    
        // 选择向量,用于标记批量数据中哪些行是有效的。
        bool* m_sel;
    
        // ScalarVector数组,每个ScalarVector对应批量数据中的一列。
        ScalarVector* m_arr;
    
        // SysColumns,用于存储系统列的容器。
        SysColContainer* m_sysColumns;
    
        // 压缩缓冲区,用于存储批量数据的压缩信息。
        StringInfo m_pCompressBuf;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

      CStoreUpdate::ExecUpdate 函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_update.cpp

    uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options)
    {
        // 断言:确保输入的VectorBatch和相关的执行信息不为空
        Assert(batch && m_resultRelInfo && m_delete);
        // 断言:确保(m_isPartition && m_partionInsert)或者(!m_isPartition && m_insert)条件成立
        Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert));
    
        // 获取废弃数据过滤器
        JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;
    
        // 调用删除操作,通过JunkFilter执行删除,并删除批量数据中符合条件的行
        m_delete->PutDeleteBatch(batch, junkfilter);
    
        // 记录批量数据的原始列数
        int oriCols = batch->m_cols;
        // 调整批量数据的列数以适应执行环境
        batch->m_cols = junkfilter->jf_cleanTupType->natts;
    
        // 检查批量数据是否符合表的约束条件
        if (m_relation->rd_att->constr)
            ExecVecConstraints(m_resultRelInfo, batch, m_estate);
    
        // 根据是分区表还是非分区表,调用相应的批量插入方法,执行更新后的插入操作
        if (m_isPartition)
            m_partionInsert->BatchInsert(batch, options);
        else
            m_insert->BatchInsert(batch, options);
    
        // 恢复批量数据的原始列数
        batch->m_cols = oriCols;
    
        // 返回更新的行数
        return (uint64)(uint32)batch->m_rows;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

      其中 CStoreUpdate::ExecUpdate 函数的调用关系如下所示:

       ┌──cstore_update.cpp─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
       │198     {                                                                                                                                                   │
       │199         Assert(sortTupDesc && m_resultRelInfo && m_delete);                                                                                             │
       │200                                                                                                                                                         │
       │201         JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;                                                                                        │
       │202                                                                                                                                                         │
       │203         // init delete sort state                                                                                                                       │
       │204         m_delete->InitSortState(sortTupDesc, junkfilter->jf_xc_part_id, junkfilter->jf_junkAttNo);                                                      │
       │205     }                                                                                                                                                   │
       │206                                                                                                                                                         │
       │207     uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options)                                                                          │
       │208     {                                                                                                                                                   │
    B+>│209         Assert(batch && m_resultRelInfo && m_delete);                                                                                                   │
       │210         Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert));                                                                     │
       │211                                                                                                                                                         │
       │212         JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;                                                                                        │
       │213                                                                                                                                                         │
       │214         // delete                                                                                                                                       │
       │215         m_delete->PutDeleteBatch(batch, junkfilter);                                                                                                    │
       │216                                                                                                                                                         │
       │217         int oriCols = batch->m_cols;                                                                                                                    │
       │218         batch->m_cols = junkfilter->jf_cleanTupType->natts;                                                                                             │
       │219                                                                                                                                                         │
       │220         // Check the constraints of the batch                                                                                                           │
       │221         if (m_relation->rd_att->constr)                                                                                                                 │
       │222             ExecVecConstraints(m_resultRelInfo, batch, m_estate);                                                                                       │
       └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
    multi-thre Thread 0x7f8e8 In: CStoreUpdate::ExecUpdate                                                                                  Line: 209  PC: 0x1b66ed5
    #0  CStoreUpdate::ExecUpdate (this=0x7f8e907491f8, batch=0x7f8e87f293e8, options=0) at cstore_update.cpp:209
    #1  0x000000000169360d in ExecVecUpdate (state=0x7f8e90748060, update_op=0x7f8e907491f8, batch=0x7f8e87f293e8, estate=0x7f8e8824e060,
        can_set_tag=true, options=0) at vecmodifytable.cpp:111
    #2  0x0000000001691e54 in ExecVecModifyTable (node=0x7f8e90748060) at vecmodifytable.cpp:662
    #3  0x000000000173425f in VectorEngine (node=0x7f8e90748060) at vecexecutor.cpp:171
    #4  0x0000000001687fd5 in ExecVecToRow (state=0x7f8e8675a060) at vectortorow.cpp:149
    #5  0x000000000159a439 in ExecProcNodeByType (node=0x7f8e8675a060) at execProcnode.cpp:677
    #6  0x000000000159a8dd in ExecProcNode (node=0x7f8e8675a060) at execProcnode.cpp:769
    #7  0x0000000001595232 in ExecutePlan (estate=0x7f8e8824e060, planstate=0x7f8e8675a060, operation=CMD_UPDATE, sendTuples=false, numberTuples=0,
        direction=ForwardScanDirection, dest=0x7f8e88245be8) at execMain.cpp:2124
    #8  0x0000000001591d6a in standard_ExecutorRun (queryDesc=0x7f8e8825d060, direction=ForwardScanDirection, count=0) at execMain.cpp:608
    #9  0x000000000139a5d4 in explain_ExecutorRun (queryDesc=0x7f8e8825d060, direction=ForwardScanDirection, count=0) at auto_explain.cpp:116
    #10 0x000000000159188f in ExecutorRun (queryDesc=0x7f8e8825d060, direction=ForwardScanDirection, count=0) at execMain.cpp:484
    ---Type <return> to continue, or q <return> to quit---
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    JunkFilter 结构体

      在 CStoreUpdate::ExecUpdate 函数中调用了,调用了一个 JunkFilter 结构体来获取废弃数据过滤器:

        // 获取废弃数据过滤器
        JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;
    
        // 调用删除操作,通过JunkFilter执行删除,并删除批量数据中符合条件的行
        m_delete->PutDeleteBatch(batch, junkfilter);
    
        // 记录批量数据的原始列数
        int oriCols = batch->m_cols;
        // 调整批量数据的列数以适应执行环境
        batch->m_cols = junkfilter->jf_cleanTupType->natts;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

      下面解释一下 JunkFilter 结构体的作用: JunkFilter 结构体用于存储有关垃圾属性junk attributes)的信息垃圾属性是元组中仅用于在执行器中存储中间信息的属性,不应包含在生成的元组中。例如,在执行 UPDATE 查询时,规划器向目标列表添加一个“垃圾”条目,以便由 ExecutePlan() 返回的元组包含额外的属性待更新的元组的 ctid。这是执行更新所需的,但我们不希望 ctid 成为存储的新元组的一部分!因此,我们应用一个“垃圾过滤器”来移除垃圾属性并形成真正的输出元组。垃圾过滤器代码还提供了从输入元组中提取垃圾属性值的例程。函数源码如下:(路径:src/include/nodes/execnodes.h

    /* 
     * JunkFilter结构体,用于存储垃圾属性信息
     */
    typedef struct JunkFilter {
        NodeTag type;               /* 结点类型 */
        List* jf_targetList;        /* 原始目标列表(包括垃圾属性) */
        TupleDesc jf_cleanTupType;  /* "清理"元组的元组描述符(不包括垃圾属性) */
        AttrNumber* jf_cleanMap;    /* 非垃圾属性的“原始”元组属性号和“清理”元组属性号的对应关系映射 */
        TupleTableSlot* jf_resultSlot; /* 用于保存清理后的元组的元组槽 */
        AttrNumber jf_junkAttNo;    /* 未被垃圾过滤器代码使用,可以被调用者用于记住特定垃圾属性的属性号 */
    #ifdef PGXC
        /* 
         * 在PGXC中,类似于jf_junkAttNo,jf_xc_node_id和jf_xc_wholerow用于保存xc_node_id和wholerow的垃圾属性号。
         * 在PG中,jf_junkAttNo仅用于ctid或wholerow之一,不需要同时包含两者;ctid用于物理关系,而wholerow用于视图。
         */
        AttrNumber jf_xc_node_id;     /* xc_node_id的垃圾属性号 */
        AttrNumber jf_xc_wholerow;    /* xc_wholerow的垃圾属性号 */
        AttrNumber jf_xc_part_id;     /* xc_part_id的垃圾属性号 */
        AttrNumber jf_xc_bucket_id;   /* xc_bucket_id的垃圾属性号 */
        List* jf_primary_keys;        /* 主键列表,用于PGXC */
    #endif
    } JunkFilter;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    CStoreInsert::BatchInsert 函数

      CStoreInsert::BatchInsert 函数是用于批量插入数据的函数 ,通过向列存储表中插入数据来进行大批量的数据加载

    该函数主要有三个步骤:

    1. 部分聚簇键的情况下:
    • 将数据放入排序容器中,当排序容器已满插入结束时执行排序。
    • 重置排序容器,并获取下一批值
    • 重置并释放所有内存块
    1. 没有部分聚簇键的情况下:
    • 缓存数据直到 batchrows 被填满。
    • 如果 batchrows 已满,执行批量插入。根据是否启用 DELTA,执行插入操作或者普通批量插入。
    • pBatch 的数据追加到 m_bufferedBatchRows 中,直到所有数据都已追加。
    1. 已经结束批量插入:
    • 如果启用 DELTA,执行 InsertDeltaTable 操作,否则执行普通批量插入
    • 重置 m_bufferedBatchRows,释放相关内存块。

      函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_delete.cpp

    // BatchInsert
    // 用于列存表的向量接口插入大批量数据
    void CStoreInsert::BatchInsert(_in_ VectorBatch* pBatch, _in_ int options)
    {
        // 确保 pBatch 不为空或者已经结束插入
        Assert(pBatch || IsEnd());
    
        // 切换内存上下文,以防止在大批量插入期间内存泄漏
        MemoryContext oldCnxt = MemoryContextSwitchTo(m_tmpMemCnxt);
    
        // 步骤 1: 表具有部分聚簇键
        // 我们需要将数据放入排序容器中,然后批量插入数据
        if (NeedPartialSort()) {
            // 确保 m_tmpBatchRows 不为空
            Assert(m_tmpBatchRows);
    
            // 如果有 pBatch,确保列数与表的属性数一致
            if (pBatch) {
                Assert(pBatch->m_cols == m_relation->rd_att->natts);
                m_sorter->PutVecBatch(m_relation, pBatch);
            }
    
            // 如果排序器已满或者已经结束,执行排序
            if (m_sorter->IsFull() || IsEnd()) {
                m_sorter->RunSort();
    
                // 重置并获取下一批值
                DoBatchInsert(options);
                m_sorter->Reset(IsEnd());
    
                // 重置并释放所有内存块
                m_tmpBatchRows->reset(false);
            }
        }
    
        // 步骤 2: 表没有部分聚簇键
        // 我们需要缓存数据直到 batchrows 被填满
        else {
            // 确保 m_bufferedBatchRows 不为空
            Assert(m_bufferedBatchRows);
    
            // 如果 batchrows 已满,执行批量插入
            if (IsEnd()) {
                // 根据是否启用 DELTA,执行插入操作
                if (ENABLE_DELTA(m_bufferedBatchRows)) {
                    InsertDeltaTable(m_bufferedBatchRows, options);
                } else {
                    BatchInsertCommon(m_bufferedBatchRows, options);
                }
                m_bufferedBatchRows->reset(true);
            }
    
            // 需要缓存数据直到 batchrows 被填满
            if (pBatch) {
                // 确保 pBatch 的行数不超过 BatchMaxSize
                Assert(pBatch->m_rows <= BatchMaxSize);
                // 确保 pBatch 的列数和表的属性数不为零
                Assert(pBatch->m_cols && m_relation->rd_att->natts);
                // 确保 m_bufferedBatchRows 的最大行数大于零
                Assert(m_bufferedBatchRows->m_rows_maxnum > 0);
                // 确保 m_bufferedBatchRows 的最大行数是 BatchMaxSize 的整数倍
                Assert(m_bufferedBatchRows->m_rows_maxnum % BatchMaxSize == 0);
    
                int startIdx = 0;
                // 将 pBatch 的数据追加到 m_bufferedBatchRows 中
                while (m_bufferedBatchRows->append_one_vector(
                           RelationGetDescr(m_relation), pBatch, &startIdx, m_cstorInsertMem)) {
                    // 执行批量插入
                    BatchInsertCommon(m_bufferedBatchRows, options);
                    m_bufferedBatchRows->reset(true);
                }
                // 确保所有数据都已追加
                Assert(startIdx == pBatch->m_rows);
            }
        }
    
        // 步骤 3: 如果已经结束批量插入,必须更新索引数据
        FlushIndexDataIfNeed();
    
        // 重置内存上下文
        MemoryContextReset(m_tmpMemCnxt);
        // 切回原始内存上下文
        (void)MemoryContextSwitchTo(oldCnxt);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    bulkload_rows::append_one_vector 函数

      bulkload_rows::append_one_vector 函数的主要作用是将一个向量追加到 batchrows 中,采用了一些策略来控制内存分配和数据的追加。其中,采用了三个元组第一个元组最后一个元组和一个随机元组)的采样,通过计算它们的大小来确定平均大小 tuple_size。接着,根据内存信息参数 m_memInfo总内存大小的限制,决定是执行列扩展还是按行方向追加数据。如果需要内存扩展,则进行相应的处理,并记录相关日志。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_vector.cpp

    /*
     * @Description: 将一个向量追加到 batchrows 中
     * @IN p_batch: 向量批次
     * @IN/OUT start_idx: 起始索引,将被更新
     * @IN tup_desc: 元组描述
     * @IN m_memInfo: 内存信息参数,用于控制内存分配
     * @Return: 如果行数或总内存达到限制,返回 true
     * @See also:
     */
    bool bulkload_rows::append_one_vector(TupleDesc tup_desc, VectorBatch* p_batch, int* start_idx, MemInfoArg* m_memInfo)
    {
        // 确保向量批次的行数大于零
        Assert(p_batch->m_rows > 0);
    
        // 检查是否还有待处理的向量
        if (*start_idx < p_batch->m_rows) {
            // 使用第一个元组、最后一个元组和一个随机元组作为采样来确定平均大小
            Size first_tuple_size = (this->*m_form_sample_tuple_size_func)(tup_desc, p_batch, *start_idx);
            Size last_tuple_size = (this->*m_form_sample_tuple_size_func)(tup_desc, p_batch, (p_batch->m_rows - 1));
            int random_pos = (((unsigned int)random()) % (p_batch->m_rows - *start_idx)) + *start_idx;
            Size random_tuple_size = (this->*m_form_sample_tuple_size_func)(tup_desc, p_batch, random_pos);
            Size tuple_size = MaxTriple(first_tuple_size, last_tuple_size, random_tuple_size);
    
            // 如果启用了内存信息参数,并且插入内存不为零
            if (m_memInfo != NULL && m_memInfo->MemInsert > 0 &&
                ((unsigned long)(unsigned int)m_memInfo->MemInsert < m_using_blocks_total_rawsize / 1024ULL)) {
                // 计算可用的扩展内存,最大为 dywlm_client_get_memory(),且不超过工作内存的 10%
                int64 spreadMem = Min(dywlm_client_get_memory() * 1024L, m_memInfo->MemInsert * 1024L) / 1024;
                // 如果扩展内存超过插入内存的 10%
                if (spreadMem > m_memInfo->MemInsert * 0.1) {
                    // 增加插入内存,并调整上下文的最大空间大小
                    m_memInfo->MemInsert += spreadMem;
                    m_memInfo->spreadNum++;
                    AllocSet context = (AllocSet)m_context->parent;
                    context->maxSpaceSize += spreadMem * 1024L;
    
                    MEMCTL_LOG(DEBUG2,
                               "CStoreInsert(Batch) 自动内存扩展 %ldKB 成功,工作内存为 %dKB,扩展次数为 %d。",
                               spreadMem,
                               m_memInfo->MemInsert,
                               m_memInfo->spreadNum);
                } else {
                    // 内存扩展失败,记录日志并返回 true
                    MEMCTL_LOG(LOG,
                               "CStoreInsert(Batch) 自动内存扩展 %ldKB 失败,工作内存为 %dKB。",
                               spreadMem,
                               m_memInfo->MemInsert);
                    return true;
                }
            }
    
            // 如果剩余总内存和每个元组的平均大小足够,执行列扩展
            if ((m_using_blocks_total_rawsize < BULKLOAD_MAX_MEMSIZE) &&
                ((BULKLOAD_MAX_MEMSIZE - m_using_blocks_total_rawsize) / (p_batch->m_rows - *start_idx) > tuple_size)) {
                return (this->*m_form_append_column_func)(tup_desc, p_batch, start_idx);
            } else {
                // 否则,按行方向追加
                return append_in_row_orientation(tup_desc, p_batch, start_idx);
            }
        } else {
            // 快速路径判断是否已处理完 p_batch
            return full_rownum() || full_rowsize();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

      调试信息如下所示:

    (gdb) p first_tuple_size
    $1 = 28
    (gdb) p last_tuple_size
    $2 = 28
    (gdb) p random_pos
    $3 = 0
    (gdb) p random_tuple_size
    $4 = 28
    (gdb) p tuple_size
    $5 = 28
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    bulkload_rows::append_in_column_orientation我函数

      bulkload_rows::append_in_column_orientation 函数是 bulkload_rows 结构体中的模板方法,用于在列存储场景下,将一个向量批次按列方向追加到批量行数据结构中。函数通过遍历属性处理每个属性的一批值,将其解码并追加到相应的列向量中,同时进行最小/最大值的比较。此外,函数还处理了被删除的列的情况,并根据实际情况更新行数处理游标。函数的主要目的是构建列存储的内部数据结构,以便有效地处理大规模数据的批量加载。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_vector.cpp

    /*
     * @Description: 在列方向追加一个向量
     * @IN p_batch: 一个向量批次
     * @IN/OUT start_idx: 起始位置
     * @IN tup_desc: 元组描述
     * @Return: 如果该 batchrow 已满,返回 true
     * @See also:
     */
    template <bool hasDroppedColumn>
    bool bulkload_rows::append_in_column_orientation(TupleDesc tup_desc, VectorBatch* p_batch, int* start_idx)
    {
        // 断言,确保向量批次非空,属性数量匹配,且当前行数未达到最大值
        Assert(p_batch);
        Assert(m_attr_num == tup_desc->natts);
        Assert(m_rows_curnum < m_rows_maxnum);
    
        // 获取元组属性数量和最大行数
        int const nattrs = tup_desc->natts;
        int const maxRows = Min(m_rows_maxnum - m_rows_curnum, p_batch->m_rows - *start_idx);
    
        // 获取 ScalarVector 和 bulkload_vector 的指针,以及元组属性信息
        ScalarVector* scalar_vector = p_batch->m_arr;
        bulkload_vector* vector = m_vectors;
        Form_pg_attribute* attrs = tup_desc->attrs;
    
        // 遍历属性
        for (int attrIdx = 0; attrIdx < nattrs; ++attrIdx) {
            // 如果有被删除的列,并且当前列为被删除的列
            if (hasDroppedColumn && (*attrs)->attisdropped) {
                /* 跳到下一个属性 */
                /* 对于被删除的列,设置所有值为 NULL */
                int destRowIdx = m_rows_curnum;
                for (int rowCnt = 0; rowCnt < maxRows; ++rowCnt) {
                    vector->m_values_nulls.set_null(destRowIdx++);
                }
                ++scalar_vector;
                ++vector;
                ++attrs;
                continue;
            }
            int srcRowIdx = *start_idx;
            int destRowIdx = m_rows_curnum;
    
            /* 处理一个属性的一批值 */
            for (int rowCnt = 0; rowCnt < maxRows; ++rowCnt) {
                if (unlikely(scalar_vector->IsNull(srcRowIdx))) {
                    /* 追加一个 NULL 值 */
                    vector->m_values_nulls.set_null(destRowIdx);
                } else {
                    /* 从标量向量解码值 */
                    Datum value = (vector->*(vector->m_decode))(scalar_vector, srcRowIdx);
    
                    /* 将此值追加到向量中 */
                    value = (vector->*(vector->m_append))(this, value, (*attrs)->attlen);
    
                    /* 比较最小/最大值 */
                    vector->m_minmax.m_compare(vector->m_minmax.m_min_buf,
                                               vector->m_minmax.m_max_buf,
                                               value,
                                               &(vector->m_minmax.m_first_compare),
                                               &(vector->m_minmax.m_varstr_maxlen));
                }
    
                /* 移动到该属性的下一个值 */
                ++srcRowIdx;
                ++destRowIdx;
            }
            /* 跳到下一个属性 */
            ++scalar_vector;
            ++vector;
            ++attrs;
        }
    
        /* 更新行数和处理游标 */
        m_rows_curnum += maxRows;
        *start_idx += maxRows;
    
        // 返回是否已满
        return full_rownum() || full_rowsize();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

      这里附上以上代码的调试信息:

    (gdb) p nattrs
    $1 = 4
    (gdb) p maxRows
    $2 = 1
    (gdb) p * scalar_vector
    $3 = {<BaseObject> = {<No data fields>}, m_rows = 1, m_desc = {<BaseObject> = {<No data fields>}, typeId = 23, typeMod = -1, encoded = false}, m_const = false,
      m_flag = 0x7f15adf9df40 "", m_buf = 0x7f15ae012088, m_vals = 0x7f15ae010060, m_addVar = (Datum (ScalarVector::*)(ScalarVector * const, Datum, int)) 0x173136a
         <ScalarVector::AddHeaderVar(unsigned long, int)>}
    (gdb) p * vector
    $4 = {<BaseObject> = {<No data fields>}, m_blocks = {m_head = 0x7f15ae6a2488, m_current = 0x7f15ae6a2488, m_block_num = 1, m_block_size = 8192}, m_attlen = 4,
      m_decode = (Datum (bulkload_vector::*)(const bulkload_vector * const, ScalarVector *,
        int)) 0x1b75c74 <bulkload_vector::decode_integer(ScalarVector*, int) const>, m_append = (Datum (bulkload_vector::*)(bulkload_vector * const,
        bulkload_rows *, Datum, int)) 0x1b7585a <bulkload_vector::append_int32(bulkload_rows*, unsigned long, int)>, m_values_nulls = {m_vals_points = 0x0,
        m_null_bitmap = 0x7f15ae558060 "", m_vals_num = 0, m_has_null = false, m_all_null = true}, m_minmax = {m_compare = 0x1b68ef1
         <CompareInt32(char*, char*, Datum, bool*, int*)>, m_finish_compare = 0x1b67159 <FinishCompareFixedLength(char const*, char const*, CUDesc*)>,
        m_min_buf = '\000' <repeats 31 times>, m_max_buf = '\000' <repeats 31 times>, m_varstr_maxlen = 0, m_first_compare = true}}
    (gdb) p ** attrs@4
    $5 = {{attrelid = 32784, attname = {data = "id", '\000' <repeats 61 times>}, atttypid = 23, attstattarget = -1, attlen = 4, attnum = 1, attndims = 0,
        attcacheoff = 0, atttypmod = -1, attbyval = true, attstorage = 112 'p', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false,
        attislocal = true, attcmprmode = 127 '\177', attinhcount = 0, attcollation = 0, attkvtype = 0 '\000'}, {attrelid = 32784, attname = {
          data = "name", '\000' <repeats 59 times>}, atttypid = 1043, attstattarget = -1, attlen = -1, attnum = 2, attndims = 0, attcacheoff = -1, atttypmod = 54,
        attbyval = false, attstorage = 120 'x', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false, attislocal = true,
        attcmprmode = 127 '\177', attinhcount = 0, attcollation = 100, attkvtype = 0 '\000'}, {attrelid = 32784, attname = {
          data = "age", '\000' <repeats 60 times>}, atttypid = 23, attstattarget = -1, attlen = 4, attnum = 3, attndims = 0, attcacheoff = -1, atttypmod = -1,
        attbyval = true, attstorage = 112 'p', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false, attislocal = true,
        attcmprmode = 127 '\177', attinhcount = 0, attcollation = 0, attkvtype = 0 '\000'}, {attrelid = 32784, attname = {
          data = "city", '\000' <repeats 59 times>}, atttypid = 1043, attstattarget = -1, attlen = -1, attnum = 4, attndims = 0, attcacheoff = -1, atttypmod = 54,
        attbyval = false, attstorage = 120 'x', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false, attislocal = true,
        attcmprmode = 127 '\177', attinhcount = 0, attcollation = 100, attkvtype = 0 '\000'}}
    
    # 执行 Datum value = (vector->*(vector->m_decode))(scalar_vector, srcRowIdx);前后 value 值的变化
    (gdb) p value
    $16 = 139731090381120
          的变化
    (gdb) p value
    $17 = 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    ExecVecUpdate 函数

      ExecVecUpdate 函数用于执行矢量化的更新操作。函数接受一个 VecModifyTableState 状态对象、一个表示更新操作update_op 对象、一个包含更新数据的矢量批次 batch执行计划状态对象 estate、一个布尔值 can_set_tag 和一些选项。函数调用 update_opExecUpdate 方法来执行实际的更新操作,并返回更新的行数。如果 can_set_tag 为真,它会更新执行计划状态对象中的 es_processed 字段,表示处理的总行数。最后,函数返回空指针。函数源码如下:(路径:src/gausskernel/runtime/vecexecutor/vecnode/vecmodifytable.cpp

    /*
     * @Description: 模板函数,用于执行矢量化的更新操作。
     * @TemplateParameters:
     *    - T: 表示更新操作的类型,可能是 CStoreUpdate 或其他类型。
     * @Parameters:
     *    - state: VecModifyTableState 状态对象,表示矢量修改表的状态。
     *    - update_op: 更新操作的对象,可能是 CStoreUpdate 或其他类型。
     *    - batch: 包含更新数据的矢量批次。
     *    - estate: 执行计划状态对象,表示当前执行计划的状态。
     *    - can_set_tag: 一个布尔值,如果为真,表示可以更新执行计划状态的处理行数字段。
     *    - options: 更新操作的选项。
     * @Return: 返回空指针。
     */
    template <class T>
    VectorBatch* ExecVecUpdate(VecModifyTableState* state, T* update_op, VectorBatch* batch, EState* estate, bool can_set_tag, int options)
    {
        // 执行更新操作,获取更新的行数。
        uint64 updateRows = update_op->ExecUpdate(batch, options);
    
        // 如果 can_set_tag 为真,则更新执行计划状态对象的处理行数字段。
        if (can_set_tag) {
            (estate->es_processed) += updateRows;
        }
    
        // 返回空指针。
        return NULL;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    CStoreUpdate::EndUpdate 函数

      CStoreUpdate::EndUpdate 函数是 CStoreUpdate 类的方法,用于结束更新操作。首先,通过断言确保删除操作对象存在。然后,执行删除操作将删除的行数记录下来。接着,根据表是否是分区表来判断使用分区插入对象或普通插入对象。如果是分区表,则结束分区插入批量插入操作;如果是普通表,则设置插入操作结束标志,并执行批量插入操作。该函数完成了更新操作的收尾工作,包括删除和插入的处理。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_update.cpp

    /*
     * @Description: 结束更新操作,执行相应的清理工作。
     * @Parameters:
     *    - options: 更新操作的选项。
     */
    void CStoreUpdate::EndUpdate(int options)
    {
        // 断言删除操作对象存在。
        Assert(m_delete);
        // 断言插入操作对象存在,并且根据是否是分区表判断使用分区插入对象或普通插入对象。
        Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert));
    
        // 结束删除操作。
        m_delete->ExecDelete();
    
        // 如果是分区表,结束分区插入批量插入。
        if (m_isPartition) {
            m_partionInsert->EndBatchInsert();
        }
        // 如果是普通表,设置插入操作结束标志,并执行批量插入操作。
        else {
            m_insert->SetEndFlag();
            m_insert->BatchInsert(nullptr, options);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

      函数调用关系如下所示:

       ┌──cstore_update.cpp─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
       │231                                                                                                                                                         │
       │232         return (uint64)(uint32)batch->m_rows;                                                                                                           │
       │233     }                                                                                                                                                   │
       │234                                                                                                                                                         │
       │235     void CStoreUpdate::EndUpdate(_in_ int options)                                                                                                      │
       │236     {                                                                                                                                                   │
    B+>│237         Assert(m_delete);                                                                                                                               │
       │238         Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert));                                                                     │
       │239                                                                                                                                                         │
       │240         // end delete                                                                                                                                   │
       │241         m_delete->ExecDelete();                                                                                                                         │
       │242                                                                                                                                                         │
       │243         // end insert                                                                                                                                   │
       │244         if (m_isPartition) {                                                                                                                            │
       │245             m_partionInsert->EndBatchInsert();                                                                                                          │
       │246         } else {                                                                                                                                        │
       │247             m_insert->SetEndFlag();                                                                                                                     │
       │248             m_insert->BatchInsert((VectorBatch*)NULL, options);                                                                                         │
       │249         }                                                                                                                                               │
       │250     }                                                                                                                                                   │
       │251                                                                                                                                                         │
       │252                                                                                                                                                         │
       │253                                                                                                                                                         │
       │254                                                                                                                                                         │
       │255                                                                                                                                                         │
       └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
    multi-thre Thread 0x7f15b In: CStoreUpdate::EndUpdate                                                                                   Line: 237  PC: 0x1b6705b
    #0  CStoreUpdate::EndUpdate (this=0x7f15ae54b1f8, options=0) at cstore_update.cpp:237
    #1  0x00000000016927a1 in ExecVecModifyTable (node=0x7f15ae54a060) at vecmodifytable.cpp:762
    #2  0x000000000173425f in VectorEngine (node=0x7f15ae54a060) at vecexecutor.cpp:171
    #3  0x0000000001687fd5 in ExecVecToRow (state=0x7f15ae544060) at vectortorow.cpp:149
    #4  0x000000000159a439 in ExecProcNodeByType (node=0x7f15ae544060) at execProcnode.cpp:677
    #5  0x000000000159a8dd in ExecProcNode (node=0x7f15ae544060) at execProcnode.cpp:769
    #6  0x0000000001595232 in ExecutePlan (estate=0x7f15ae540060, planstate=0x7f15ae544060, operation=CMD_UPDATE, sendTuples=false, numberTuples=0,
        direction=ForwardScanDirection, dest=0x7f15ae465be8) at execMain.cpp:2124
    #7  0x0000000001591d6a in standard_ExecutorRun (queryDesc=0x7f15ae47c860, direction=ForwardScanDirection, count=0) at execMain.cpp:608
    #8  0x000000000139a5d4 in explain_ExecutorRun (queryDesc=0x7f15ae47c860, direction=ForwardScanDirection, count=0) at auto_explain.cpp:116
    #9  0x000000000159188f in ExecutorRun (queryDesc=0x7f15ae47c860, direction=ForwardScanDirection, count=0) at execMain.cpp:484
    #10 0x000000000146f859 in ProcessQuery (plan=0x7f15b31e0e30, sourceText=0x7f15ae464060 "UPDATE columnar_test SET name = 'kuchiki' where id = 1;", params=0x0,
        dest=0x7f15ae465be8, completionTag=0x7f15b2d47f90 "") at pquery.cpp:291
    ---Type <return> to continue, or q <return> to quit---
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    JAVA8新特性-Stream
    随机森林算法
    医学访问学者面试技巧
    电影评分数据分析案例-Spark SQL
    陈芳允于1971年提出双星定位
    【云原生 | Kubernetes 系列】----亲和与反亲和
    【C语言】指针和数组笔试题解析
    mysql之两阶段提交
    HCIP 第十六天
    [免费专栏] Android安全之数据存储与数据安全【大集合】
  • 原文地址:https://blog.csdn.net/qq_43899283/article/details/134466514