• CMU15445 (Fall 2020) 数据库系统 Project#2 - B+ Tree 详解(上篇)


    前言

    考虑到 B+ 树较为复杂,CMU15-445 将 B+ 树实验拆成了两部分,这篇博客将介绍 Checkpoint#1、Checkpoint#2 删除操作和迭代器的实现过程,搭配教材 《DataBase System Concepts》食用更佳。

    B+ 树索引

    许多查询只涉及文件中的少量记录,例如“找出物理系所有教师”的查询就只涉及教师记录中的一小部分,如果数据库系统读取表中每一个元组并检查系名称是否为“物理”,这样是十分低效的。理想情况下,数据库系统应能直接定位这些记录。为了支持这种快速访问方式,数据库系统使用了索引。

    有两种基本的索引类型:

    • 顺序索引:基于值的顺序排序。
    • 散列索引:基于将值平均分布到若干散列桶中。一个值所属的散列桶是由一个函数决定的,该函数称为散列函数(hash function)。

    对于等值查询,哈希索引的速度最快,时间复杂度为 O(1)" role="presentation">O(1) ,但是对于范围查询,哈希索引表示无能为力。这时候顺序索引就可以派上用场了,目前数据库系统用的最多的顺序索引是 B+ 树索引,时间复杂度为 O(logN)" role="presentation">O(logN)。下图显示了一棵 B+ 树,它是右下角数据表第二列的索引:

    可以看到 B+ 树是一种多路平衡搜索树,由叶节点和内部节点组成,其中叶节点可以保存整条记录或者是记录的行 ID,内部节点只会保存键和指向子节点的指针,这样数据库系统就能读入更多的内部节点到内存中,减少磁盘 IO 次数。除了根节点外,其他节点都是至少半满的。

    代码实现

    B+ Tree Page

    B+ Tree Parent Page

    B+ 树的每个节点对应着一个 Page,内部节点对应 BPlusTreeInternalPage,叶节点对应 BPlusTreeLeafPage,这两种页结构相似,所以先实现父类 BPlusTreePage,它的结构如下图所示(图片使用 Excalidraw 绘制):

    header 占用了 24 字节,各个字段的大小和描述如下表所示:

    Variable Name Size Description
    page_type_ 4 Page Type (internal or leaf)
    lsn_ 4 Log sequence number (Used in Project 4)
    size_ 4 Number of Key & Value pairs in page
    max_size_ 4 Max number of Key & Value pairs in page
    parent_page_id_ 4 Parent Page Id
    page_id_ 4 Self Page Id

    BPlusTreePage 声明如下:

    复制#define MappingType std::pair
    
    #define INDEX_TEMPLATE_ARGUMENTS template 
    
    // define page type enum
    enum class IndexPageType { INVALID_INDEX_PAGE = 0, LEAF_PAGE, INTERNAL_PAGE };
    
    class BPlusTreePage {
     public:
      bool IsLeafPage() const;
      bool IsRootPage() const;
      void SetPageType(IndexPageType page_type);
    
      int GetSize() const;
      void SetSize(int size);
      void IncreaseSize(int amount);
    
      int GetMaxSize() const;
      void SetMaxSize(int max_size);
      int GetMinSize() const;
    
      page_id_t GetParentPageId() const;
      void SetParentPageId(page_id_t parent_page_id);
    
      page_id_t GetPageId() const;
      void SetPageId(page_id_t page_id);
    
      void SetLSN(lsn_t lsn = INVALID_LSN);
    
     private:
      IndexPageType page_type_ __attribute__((__unused__));
      lsn_t lsn_ __attribute__((__unused__));
      int size_ __attribute__((__unused__));
      int max_size_ __attribute__((__unused__));
      page_id_t parent_page_id_ __attribute__((__unused__));
      page_id_t page_id_ __attribute__((__unused__));
    };
    

    该类的实现较为简单,但是 GetMinSize() 函数值得细细推敲。根据教材中的描述,当最大键值对数量为 N" role="presentation">N 时,节点的最少键值对数量存在三种情况:

    • 根节点:
      • 根节点是叶节点时,内部至少需要 1 个键值对,这个很好理解,比如往空的 B+ 树插入一个键值对后,新建的根节点肯定只会有一个键值对;
      • 根节点是内部节点是,内部至少需要 2 个键值对,除去最左侧的无效键,还需要一个键与输入的键进行比较;
    • 内部节点:节点插入数据之后可能溢出,这时需要进行分裂操作,为了简化分裂代码的编写,内部节点和根节点会留出一个键值对的位置作为哨兵,实际最大键值对数量为 N1" role="presentation">N1,加上最左侧的无效键,最小键值对数量为 (N2)/2+1" role="presentation">(N2)/2+1
    • 叶节点:最小键值对数量为 (N1)/2" role="presentation">(N1)/2
    复制/*
     * Helper methods to get/set page type
     * Page type enum class is defined in b_plus_tree_page.h
     */
    bool BPlusTreePage::IsLeafPage() const { return page_type_ == IndexPageType::LEAF_PAGE; }
    bool BPlusTreePage::IsRootPage() const { return parent_page_id_ == INVALID_PAGE_ID; }
    void BPlusTreePage::SetPageType(IndexPageType page_type) { page_type_ = page_type; }
    
    /*
     * Helper methods to get/set size (number of key/value pairs stored in that
     * page)
     */
    int BPlusTreePage::GetSize() const { return size_; }
    void BPlusTreePage::SetSize(int size) { size_ = size; }
    void BPlusTreePage::IncreaseSize(int amount) { size_ += amount; }
    
    /*
     * Helper methods to get/set max size (capacity) of the page
     */
    int BPlusTreePage::GetMaxSize() const { return max_size_; }
    void BPlusTreePage::SetMaxSize(int size) { max_size_ = size; }
    
    /*
     * Helper method to get min page size
     * Generally, min page size == max page size / 2
     */
    int BPlusTreePage::GetMinSize() const {
      // 数据库系统概念中文第六版 Page #486
      // 叶节点最少有一个键,而内部节点除了无效的第一个键外还需一个键
      if (IsRootPage()) {
        return IsLeafPage() ? 1 : 2;
      }
    
      // 内部节点的最小键值对数量为 ⌈(max_size_ - 1 - 1)/2⌉ + 1
      if (!IsLeafPage()) {
        return (max_size_ - 2 + 1) / 2 + 1;
      }
    
      // 叶节点最小键值对数量为 ⌈(max_size_ - 1)/2⌉
      return (max_size_ - 1 + 1) / 2;
    }
    
    /*
     * Helper methods to get/set parent page id
     */
    page_id_t BPlusTreePage::GetParentPageId() const { return parent_page_id_; }
    void BPlusTreePage::SetParentPageId(page_id_t parent_page_id) { parent_page_id_ = parent_page_id; }
    
    /*
     * Helper methods to get/set self page id
     */
    page_id_t BPlusTreePage::GetPageId() const { return page_id_; }
    void BPlusTreePage::SetPageId(page_id_t page_id) { page_id_ = page_id; }
    
    /*
     * Helper methods to set lsn
     */
    void BPlusTreePage::SetLSN(lsn_t lsn) { lsn_ = lsn; }
    

    B+ Tree Internal Page

    内部节点页的结构如下图所示,它比父类多了一个 array 数组成员,用于存放 key | page_id 键值对,其中 page_id 是子节点的页 id:

    ,声明如下述代码所示。可以看到 array 的长度声明为 0,这种写法比较神奇,它的意思是 array 的实际长度是在运行时确定的。实验一中我们实现了缓冲池管理器,用于缓存 Page,而节点页就存放在 char Page::data_[PAGE_SIZE] 中,当 PAGE_SIZE 为 4096 字节时,array 最多能放 INTERNAL_PAGE_SIZE 个键值对:

    复制#define B_PLUS_TREE_INTERNAL_PAGE_TYPE BPlusTreeInternalPage
    #define INTERNAL_PAGE_HEADER_SIZE 24
    #define INTERNAL_PAGE_SIZE ((PAGE_SIZE - INTERNAL_PAGE_HEADER_SIZE) / (sizeof(MappingType)))
    /**
     * Store n indexed keys and n+1 child pointers (page_id) within internal page.
     * Pointer PAGE_ID(i) points to a subtree in which all keys K satisfy:
     * K(i) <= K < K(i+1).
     * NOTE: since the number of keys does not equal to number of child pointers,
     * the first key always remains invalid. That is to say, any search/lookup
     * should ignore the first key.
     */
    INDEX_TEMPLATE_ARGUMENTS
    class BPlusTreeInternalPage : public BPlusTreePage {
     public:
      void Init(page_id_t page_id, page_id_t parent_id = INVALID_PAGE_ID, int max_size = INTERNAL_PAGE_SIZE);
    
      KeyType KeyAt(int index) const;
      void SetKeyAt(int index, const KeyType &key);
      int ValueIndex(const ValueType &value) const;
      ValueType ValueAt(int index) const;
    
      ValueType Lookup(const KeyType &key, const KeyComparator &comparator) const;
      void PopulateNewRoot(const ValueType &old_value, const KeyType &new_key, const ValueType &new_value);
      int InsertNodeAfter(const ValueType &old_value, const KeyType &new_key, const ValueType &new_value);
      void Remove(int index);
      ValueType RemoveAndReturnOnlyChild();
    
      // Split and Merge utility methods
      void MoveAllTo(BPlusTreeInternalPage *recipient, const KeyType &middle_key, BufferPoolManager *buffer_pool_manager);
      void MoveHalfTo(BPlusTreeInternalPage *recipient, BufferPoolManager *buffer_pool_manager);
    
     private:
      void CopyNFrom(MappingType *items, int size, BufferPoolManager *buffer_pool_manager);
      MappingType array[0];
    };
    

    先来实现几个简单的函数,其中 Init() 函数必须在内部节点页在缓冲池中被创建出来后调用:

    复制/*
     * Init method after creating a new internal page
     * Including set page type, set current size, set page id, set parent id and set max page size
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::Init(page_id_t page_id, page_id_t parent_id, int max_size) {
      SetPageType(IndexPageType::INTERNAL_PAGE);
      SetSize(0);
      SetPageId(page_id);
      SetParentPageId(parent_id);
      SetMaxSize(max_size);
    }
    
    /*
     * Helper method to get/set the key associated with input "index"(a.k.a array offset)
     */
    INDEX_TEMPLATE_ARGUMENTS
    KeyType B_PLUS_TREE_INTERNAL_PAGE_TYPE::KeyAt(int index) const { return array[index].first; }
    
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::SetKeyAt(int index, const KeyType &key) { array[index].first = key; }
    
    /*
     * Helper method to find and return array index(or offset), so that its value equals to input "value"
     */
    INDEX_TEMPLATE_ARGUMENTS
    int B_PLUS_TREE_INTERNAL_PAGE_TYPE::ValueIndex(const ValueType &value) const {
      for (int i = 0; i < GetSize(); ++i) {
        if (ValueAt(i) == value) {
          return i;
        }
      }
      return -1;
    }
    
    /*
     * Helper method to get the value associated with input "index"(a.k.a array offset)
     */
    INDEX_TEMPLATE_ARGUMENTS
    ValueType B_PLUS_TREE_INTERNAL_PAGE_TYPE::ValueAt(int index) const { return array[index].second; }
    

    B+ Tree Leaf Page

    BPlusTreeLeafPage 的结构和 BPlusTreeInternalPage 相似,只是在 header 区域多了一个 next_page_id_ 字段,该字段是下一个叶节点的指针,用于将叶节点串成单向链表。同时 array 内部键值对存放的是 key | RID,指向实际数据的位置。

    类声明如下:

    复制#define B_PLUS_TREE_LEAF_PAGE_TYPE BPlusTreeLeafPage
    #define LEAF_PAGE_HEADER_SIZE 28
    #define LEAF_PAGE_SIZE ((PAGE_SIZE - LEAF_PAGE_HEADER_SIZE) / sizeof(MappingType))
    
    /**
     * Store indexed key and record id(record id = page id combined with slot id,
     * see include/common/rid.h for detailed implementation) together within leaf
     * page. Only support unique key.
     */
    INDEX_TEMPLATE_ARGUMENTS
    class BPlusTreeLeafPage : public BPlusTreePage {
     public:
      void Init(page_id_t page_id, page_id_t parent_id = INVALID_PAGE_ID, int max_size = LEAF_PAGE_SIZE);
      // helper methods
      page_id_t GetNextPageId() const;
      void SetNextPageId(page_id_t next_page_id);
      KeyType KeyAt(int index) const;
      int KeyIndex(const KeyType &key, const KeyComparator &comparator) const;
      const MappingType &GetItem(int index);
    
      // insert and delete methods
      int Insert(const KeyType &key, const ValueType &value, const KeyComparator &comparator);
      bool Lookup(const KeyType &key, ValueType *value, const KeyComparator &comparator) const;
      int RemoveAndDeleteRecord(const KeyType &key, const KeyComparator &comparator);
    
      // Split and Merge utility methods
      void MoveHalfTo(BPlusTreeLeafPage *recipient, BufferPoolManager *buffer_pool_manager);
      void MoveAllTo(BPlusTreeLeafPage *recipient);
      void MoveFirstToEndOf(BPlusTreeLeafPage *recipient);
      void MoveLastToFrontOf(BPlusTreeLeafPage *recipient);
    
     private:
      void CopyNFrom(MappingType *items, int size);
      page_id_t next_page_id_;
      MappingType array[0];
    };
    

    先来实现几个简单的函数:

    复制/**
     * Init method after creating a new leaf page
     * Including set page type, set current size to zero, set page id/parent id, set next page id and set max size
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_LEAF_PAGE_TYPE::Init(page_id_t page_id, page_id_t parent_id, int max_size) {
      SetPageType(IndexPageType::LEAF_PAGE);
      SetSize(0);
      SetPageId(page_id);
      SetParentPageId(parent_id);
      SetNextPageId(INVALID_PAGE_ID);
      SetMaxSize(max_size);
    }
    
    /**
     * Helper methods to set/get next page id
     */
    INDEX_TEMPLATE_ARGUMENTS
    page_id_t B_PLUS_TREE_LEAF_PAGE_TYPE::GetNextPageId() const { return next_page_id_; }
    
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_LEAF_PAGE_TYPE::SetNextPageId(page_id_t next_page_id) { next_page_id_ = next_page_id; }
    
    /*
     * Helper method to find and return the key associated with input "index"(a.k.a array offset)
     */
    INDEX_TEMPLATE_ARGUMENTS
    KeyType B_PLUS_TREE_LEAF_PAGE_TYPE::KeyAt(int index) const { return array[index].first; }
    
    /*
     * Helper method to find and return the key & value pair associated with input "index"(a.k.a array offset)
     */
    INDEX_TEMPLATE_ARGUMENTS
    const MappingType &B_PLUS_TREE_LEAF_PAGE_TYPE::GetItem(int index) { return array[index]; }
    

    至此可以画出整棵 B+ 树的结构:

    B+ Tree 查找

    B+ 树的搜索较为简单,先从根节点开始,将内部节点与搜索的 key 进行二分搜索,找出包含 key 的子节点的指针,然后向缓冲池要子节点页,接着在子节点上重复上述过程直到叶节点为止,最后在叶节点上进行二分搜索。

    复制/**
     * Return the only value that associated with input key
     * This method is used for point query
     * @return : true means key exists
     */
    INDEX_TEMPLATE_ARGUMENTS
    bool BPLUSTREE_TYPE::GetValue(const KeyType &key, std::vector *result, Transaction *transaction) {
      if (IsEmpty()) {
        return false;
      }
    
      // 在叶节点中寻找 key
      LeafPage *leaf = ToLeafPage(FindLeafPage(key));
    
      ValueType value;
      auto success = leaf->Lookup(key, &value, comparator_);
      if (success) {
        result->push_back(value);
      }
    
      buffer_pool_manager_->UnpinPage(leaf->GetPageId(), false);
      return success;
    }
    
    /*
     * Helper function to decide whether current b+tree is empty
     */
    INDEX_TEMPLATE_ARGUMENTS
    bool BPLUSTREE_TYPE::IsEmpty() const { return root_page_id_ == INVALID_PAGE_ID; }
    
    /**
     * Find leaf page containing particular key, if leftMost flag == true, find
     * the left most leaf page
     */
    INDEX_TEMPLATE_ARGUMENTS
    Page *BPLUSTREE_TYPE::FindLeafPage(const KeyType &key, bool leftMost) {
      auto page_id = root_page_id_;
      auto page = buffer_pool_manager_->FetchPage(page_id);
      if (!page) {
        return nullptr;
      }
    
      // 定位到包含 key 的叶节点
      auto node = ToTreePage(page);
      while (!node->IsLeafPage()) {
        InternalPage *inode = ToInternalPage(node);
    
        // 寻找下一个包含 key 的节点
        if (!leftMost) {
          page_id = inode->Lookup(key, comparator_);
        } else {
          page_id = inode->ValueAt(0);
        }
    
        // 移动到子节点
        buffer_pool_manager_->UnpinPage(page->GetPageId(), false);
        page = buffer_pool_manager_->FetchPage(page_id);
        node = ToTreePage(page);
      }
    
      return page;
    }
    

    由于经常要进行 Page –> BPlusTreePage –> BPlusTreeLeafPage/BplusTreeInternalPage 的转换,所以在类声明内部添加了几个转换函数:

    复制BPlusTreePage *ToTreePage(Page *page) { return reinterpret_cast(page->GetData()); }
    InternalPage *ToInternalPage(Page *page) { return reinterpret_cast(page->GetData()); }
    LeafPage *ToLeafPage(Page *page) { return reinterpret_cast(page->GetData()); }
    
    InternalPage *ToInternalPage(BPlusTreePage *page) { return reinterpret_cast(page); }
    LeafPage *ToLeafPage(BPlusTreePage *page) { return reinterpret_cast(page); }
    

    内部节点二分查找 InternalPage::LoopUp() 的实现如下所示,这里使用了一种大一统的二分查找写法,无需考虑索引是否加减 1 的问题以及 leftright 指针比较的时候要不要加等号的问题,只要写好分区条件就行,非常好用,墙裂推荐花几分钟看下 B 站的这个教程「二分查找为什么总是写错?」

    因为第一个键无效,所以 left 的初始值为 0,而将索引分给 left 区域的条件就是 KeyAt(mid) 小于等于 key,这样循环结束的时候 left 对应的就是最后一个小于等于 key 的键,指针指向的子节点必包含 key

    复制/*
     * Find and return the child pointer(page_id) which points to the child page
     * that contains input "key"
     * Start the search from the second key(the first key should always be invalid)
     */
    INDEX_TEMPLATE_ARGUMENTS
    ValueType B_PLUS_TREE_INTERNAL_PAGE_TYPE::Lookup(const KeyType &key, const KeyComparator &comparator) const {
      // 二分查找大一统写法:https://www.bilibili.com/video/BV1d54y1q7k7/
      int left = 0, right = GetSize();
      while (left + 1 < right) {
        int mid = left + (right - left) / 2;
        if (comparator(KeyAt(mid), key) <= 0) {
          left = mid;
        } else {
          right = mid;
        }
      }
      return ValueAt(left);
    }
    

    叶节点二分查找的代码为:

     0) {
          right = mid - 1;
        } else {
          left = mid + 1;
        }
      }
      return false;
    }
    ">复制/*
     * For the given key, check to see whether it exists in the leaf page. If it
     * does, then store its corresponding value in input "value" and return true.
     * If the key does not exist, then return false
     */
    INDEX_TEMPLATE_ARGUMENTS
    bool B_PLUS_TREE_LEAF_PAGE_TYPE::Lookup(const KeyType &key, ValueType *value, const KeyComparator &comparator) const {
      int left = 0, right = GetSize() - 1;
      while (left <= right) {
        int mid = left + (right - left) / 2;
        int cmp = comparator(KeyAt(mid), key);
        if (cmp == 0) {
          *value = array[mid].second;
          return true;
        } else if (cmp > 0) {
          right = mid - 1;
        } else {
          left = mid + 1;
        }
      }
      return false;
    }
    

    B+ Tree 插入

    B+ 树的插入较为复杂,定位到需要插入的叶节点后需要考虑节点溢出的问题,可以在这个算法可视化网站查看 B+ 树的插入过程。教材中给出了插入算法的伪代码:

    总体流程就是先判断树是否为空,如果是空的就新建一个叶节点作为根节点并插入键值对,否则将键值对插到对应的叶节点 N" role="presentation">N 中:

    Init(root_page_id_, INVALID_PAGE_ID, leaf_max_size_);
      root->Insert(key, value, comparator_);
    
      UpdateRootPageId(1);
      buffer_pool_manager_->UnpinPage(root_page_id_, true);
    }
    
    INDEX_TEMPLATE_ARGUMENTS
    Page *BPLUSTREE_TYPE::NewPage(page_id_t *page_id) {
      auto page = buffer_pool_manager_->NewPage(page_id);
      if (!page) {
        throw Exception(ExceptionType::OUT_OF_MEMORY, "Can't create new page");
      }
    
      return page;
    }
    ">复制/**
     * Insert constant key & value pair into b+ tree
     * if current tree is empty, start new tree, update root page id and insert
     * entry, otherwise insert into leaf page.
     * @return: since we only support unique key, if user try to insert duplicate
     * keys return false, otherwise return true.
     */
    INDEX_TEMPLATE_ARGUMENTS
    bool BPLUSTREE_TYPE::Insert(const KeyType &key, const ValueType &value, Transaction *transaction) {
      if (!IsEmpty()) {
        return InsertIntoLeaf(key, value, transaction);
      }
    
      StartNewTree(key, value);
      return true;
    }
    
    /**
     * Insert constant key & value pair into an empty tree
     * User needs to first ask for new page from buffer pool manager(NOTICE: throw
     * an "out of memory" exception if returned value is nullptr), then update b+
     * tree's root page id and insert entry directly into leaf page.
     */
    INDEX_TEMPLATE_ARGUMENTS
    void BPLUSTREE_TYPE::StartNewTree(const KeyType &key, const ValueType &value) {
      // 创建一个叶节点作为根节点,并插入新数据
      LeafPage *root = ToLeafPage(NewPage(&root_page_id_));
      root->Init(root_page_id_, INVALID_PAGE_ID, leaf_max_size_);
      root->Insert(key, value, comparator_);
    
      UpdateRootPageId(1);
      buffer_pool_manager_->UnpinPage(root_page_id_, true);
    }
    
    INDEX_TEMPLATE_ARGUMENTS
    Page *BPLUSTREE_TYPE::NewPage(page_id_t *page_id) {
      auto page = buffer_pool_manager_->NewPage(page_id);
      if (!page) {
        throw Exception(ExceptionType::OUT_OF_MEMORY, "Can't create new page");
      }
    
      return page;
    }
    

    如果叶节点 N" role="presentation">N 没有满,那么直接插入就行,否则需要对叶节点 N" role="presentation">N 进行分裂操作:

    1. 在缓冲池中新建一个页为叶节点 N" role="presentation">N
    2. N" role="presentation">N 的右半部分键值对移动到 N" role="presentation">N 中,并更新 next_page_id_ 指针;
    3. N" role="presentation">N 最左侧的键 K" role="presentation">KN" role="presentation">N 的指针插到 N" role="presentation">N 的父节点中

    对应代码为:

    
    N *BPLUSTREE_TYPE::Split(N *node) {
      // 创建新节点
      page_id_t new_page_id;
      auto new_page = NewPage(&new_page_id);
      N *new_node = reinterpret_cast(new_page->GetData());
    
      // 将右半边的 item 移动到新节点中
      auto max_size = node->IsLeafPage() ? leaf_max_size_ : internal_max_size_;
      new_node->Init(new_page_id, node->GetParentPageId(), max_size);
      node->MoveHalfTo(new_node, buffer_pool_manager_);
    
      return new_node;
    }
    ">复制/**
     * Insert constant key & value pair into leaf page
     * User needs to first find the right leaf page as insertion target, then look
     * through leaf page to see whether insert key exist or not. If exist, return
     * immdiately, otherwise insert entry. Remember to deal with split if necessary.
     * @return: since we only support unique key, if user try to insert duplicate
     * keys return false, otherwise return true.
     */
    INDEX_TEMPLATE_ARGUMENTS
    bool BPLUSTREE_TYPE::InsertIntoLeaf(const KeyType &key, const ValueType &value, Transaction *transaction) {
      // 定位到包含 key 的叶节点
      LeafPage *leaf = ToLeafPage(FindLeafPage(key));
    
      // 不能插入相同的键
      ValueType exist_value;
      if (leaf->Lookup(key, &exist_value, comparator_)) {
        buffer_pool_manager_->UnpinPage(leaf->GetPageId(), false);
        return false;
      }
    
      // 如果叶节点没有满,就直接插进去(array 最后留了一个空位),否则分裂叶节点并更新父节点
      auto size = leaf->Insert(key, value, comparator_);
    
      if (size == leaf_max_size_) {
        LeafPage *new_leaf = Split(leaf);
        InsertIntoParent(leaf, new_leaf->KeyAt(0), new_leaf, transaction);
        buffer_pool_manager_->UnpinPage(new_leaf->GetPageId(), true);
      }
    
      buffer_pool_manager_->UnpinPage(leaf->GetPageId(), true);
      return true;
    }
    
    /**
     * Split input page and return newly created page.
     * Using template N to represent either internal page or leaf page.
     * User needs to first ask for new page from buffer pool manager(NOTICE: throw
     * an "out of memory" exception if returned value is nullptr), then move half
     * of key & value pairs from input page to newly created page
     */
    INDEX_TEMPLATE_ARGUMENTS
    template <typename N>
    N *BPLUSTREE_TYPE::Split(N *node) {
      // 创建新节点
      page_id_t new_page_id;
      auto new_page = NewPage(&new_page_id);
      N *new_node = reinterpret_cast(new_page->GetData());
    
      // 将右半边的 item 移动到新节点中
      auto max_size = node->IsLeafPage() ? leaf_max_size_ : internal_max_size_;
      new_node->Init(new_page_id, node->GetParentPageId(), max_size);
      node->MoveHalfTo(new_node, buffer_pool_manager_);
    
      return new_node;
    }
    

    之前提到过,节点留了一个键值对的位置充当哨兵,这样当内部键值对数量达到 max_size_ - 1 时,可以把新的键值对插进去而不用像伪代码中说的那样在内存中开辟一块能够容纳 max_size_ + 1 个键值对的空间。

    插入之前需要先定位到第一个大于等于 key 的键值对位置,同样可以使用大一统写法的二分查找做到这一点:

    复制/*
     * Insert key & value pair into leaf page ordered by key
     * @return  page size after insertion
     */
    INDEX_TEMPLATE_ARGUMENTS
    int B_PLUS_TREE_LEAF_PAGE_TYPE::Insert(const KeyType &key, const ValueType &value, const KeyComparator &comparator) {
      // 获取第一个大于等于 key 的索引并将之后的键值对右移
      auto index = KeyIndex(key, comparator);
      std::copy_backward(array + index, array + GetSize(), array + GetSize() + 1);
    
      array[index] = {key, value};
      IncreaseSize(1);
      return GetSize();
    }
    
    /**
     * Helper method to find the first index i so that array[i].first >= key
     * NOTE: This method is only used when generating index iterator
     */
    INDEX_TEMPLATE_ARGUMENTS
    int B_PLUS_TREE_LEAF_PAGE_TYPE::KeyIndex(const KeyType &key, const KeyComparator &comparator) const {
      // 二分查找大一统写法:https://www.bilibili.com/video/BV1d54y1q7k7/
      int left = -1, right = GetSize();
      while (left + 1 < right) {
        int mid = left + (right - left) / 2;
        if (comparator(KeyAt(mid), key) < 0) {
          left = mid;
        } else {
          right = mid;
        }
      }
      return right;
    }
    

    叶节点使用 MoveHalfTo() 函数进行右半部分键值对的移动操作,这个函数的签名和初始代码不太一样,多加了一个 buffer_pool_manager 参数:

    CopyNFrom(array + start, N);
    
      // 更新下一个页 id
      recipient->SetNextPageId(GetNextPageId());
      SetNextPageId(recipient->GetPageId());
    
      IncreaseSize(-N);
    }
    
    /*
     * Copy starting from items, and copy {size} number of elements into me.
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_LEAF_PAGE_TYPE::CopyNFrom(MappingType *items, int size) {
      std::copy(items, items + size, array + GetSize());
      IncreaseSize(size);
    }
    ">复制/*
     * Remove half of key & value pairs from this page to "recipient" page
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_LEAF_PAGE_TYPE::MoveHalfTo(BPlusTreeLeafPage *recipient,
                                                __attribute__((unused)) BufferPoolManager *buffer_pool_manager) {
      // 根节点分裂之后就变成了叶节点
      int start = !IsRootPage() ? GetMinSize() : GetMaxSize() / 2;
      int N = GetSize() - start;
      recipient->CopyNFrom(array + start, N);
    
      // 更新下一个页 id
      recipient->SetNextPageId(GetNextPageId());
      SetNextPageId(recipient->GetPageId());
    
      IncreaseSize(-N);
    }
    
    /*
     * Copy starting from items, and copy {size} number of elements into me.
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_LEAF_PAGE_TYPE::CopyNFrom(MappingType *items, int size) {
      std::copy(items, items + size, array + GetSize());
      IncreaseSize(size);
    }
    

    父节点的插入算法较为复杂,如果父节点插入子节点传来的键值对之后也溢出了,需要进行分裂,并将新分裂出来的父节点的最左侧无效键和自己的指针插入父节点中。如果分裂的是根节点,需要新建根节点,之前的根节点变成内部节点,并将树的高度 +1。

    复制INDEX_TEMPLATE_ARGUMENTS
    void BPLUSTREE_TYPE::InsertIntoParent(BPlusTreePage *old_node, const KeyType &key, BPlusTreePage *new_node,
                                          Transaction *transaction) {
      // 根节点发生分裂需要新建一个根节点,B+树的高度 +1
      if (old_node->IsRootPage()) {
        auto root_page = NewPage(&root_page_id_);
    
        // 创建新节点并更新子节点指针
        InternalPage *root = ToInternalPage(root_page);
        root->Init(root_page_id_, INVALID_PAGE_ID, internal_max_size_);
        root->PopulateNewRoot(old_node->GetPageId(), key, new_node->GetPageId());
    
        // 更新父节点指针
        old_node->SetParentPageId(root_page_id_);
        new_node->SetParentPageId(root_page_id_);
    
        UpdateRootPageId(0);
        buffer_pool_manager_->UnpinPage(root_page_id_, true);
        return;
      }
    
      // 找到父节点并将新节点的最左侧 key 插入其中
      auto parent_id = old_node->GetParentPageId();
      InternalPage *parent = ToInternalPage(buffer_pool_manager_->FetchPage(parent_id));
      auto size = parent->InsertNodeAfter(old_node->GetPageId(), key, new_node->GetPageId());
    
      // 父节点溢出时需要再次分裂
      if (size == internal_max_size_) {
        InternalPage *new_page = Split(parent);
        InsertIntoParent(parent, new_page->KeyAt(0), new_page, transaction);
        buffer_pool_manager_->UnpinPage(new_page->GetPageId(), true);
      }
    
      buffer_pool_manager_->UnpinPage(parent_id, true);
    }
    

    内部节点分裂的代码为:

    CopyNFrom(array + start, N, buffer_pool_manager);
      IncreaseSize(-N);
    }
    ">复制/*
     * Populate new root page with old_value + new_key & new_value
     * When the insertion cause overflow from leaf page all the way upto the root
     * page, you should create a new root page and populate its elements.
     * NOTE: This method is only called within InsertIntoParent()(b_plus_tree.cpp)
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::PopulateNewRoot(const ValueType &old_value, const KeyType &new_key,
                                                         const ValueType &new_value) {
      array[0].second = old_value;
      array[1] = {new_key, new_value};
      SetSize(2);
    }
    
    /*
     * Insert new_key & new_value pair right after the pair with its value == old_value
     * @return:  new size after insertion
     */
    INDEX_TEMPLATE_ARGUMENTS
    int B_PLUS_TREE_INTERNAL_PAGE_TYPE::InsertNodeAfter(const ValueType &old_value, const KeyType &new_key,
                                                        const ValueType &new_value) {
      // 找到 old_value 的位置并将后面的 item 后移一格
      auto index = ValueIndex(old_value) + 1;
      std::copy_backward(array + index, array + GetSize(), array + GetSize() + 1);
    
      array[index] = {new_key, new_value};
      IncreaseSize(1);
      return GetSize();
    }
    
    /*
     * Remove half of key & value pairs from this page to "recipient" page
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::MoveHalfTo(BPlusTreeInternalPage *recipient,
                                                    BufferPoolManager *buffer_pool_manager) {
      // 第一个 key 本来就没有用,所以直接拷贝过来也没关系
      auto start = GetMinSize();
      int N = GetSize() - start;
      recipient->CopyNFrom(array + start, N, buffer_pool_manager);
      IncreaseSize(-N);
    }
    

    B+ Tree 删除

    B+ 树的删除操作的伪代码如下图所示:

    首先需要找到待删除的 key 所在的叶节点 N" role="presentation">N,然后移除叶节点上的键值对。如果移除之后叶节点仍然处于半满状态,删除大功告成,否则需要调用 CoalesceOrRedistribute() 函数对叶节点进行调整,使其再次达到半满状态。

    复制INDEX_TEMPLATE_ARGUMENTS
    void BPLUSTREE_TYPE::Remove(const KeyType &key, Transaction *transaction) {
      if (IsEmpty()) {
        return;
      }
    
      // 定位到叶节点并删除键值对
      LeafPage *leaf = ToLeafPage(FindLeafPage(key));
      auto old_size = leaf->GetSize();
      auto size = leaf->RemoveAndDeleteRecord(key, comparator_);
    
      // 叶节点删除之后没有处于半满状态需要合并相邻节点或者重新分配
      if (size < leaf->GetMinSize()) {
        CoalesceOrRedistribute(leaf, transaction);
      }
    
      buffer_pool_manager_->UnpinPage(leaf->GetPageId(), old_size != size);
    }
    

    叶节点移除键值对的代码为:

    复制INDEX_TEMPLATE_ARGUMENTS
    int B_PLUS_TREE_LEAF_PAGE_TYPE::RemoveAndDeleteRecord(const KeyType &key, const KeyComparator &comparator) {
      // 寻找第一个大于等于 key 的下标
      auto index = KeyIndex(key, comparator);
    
      // 没找到 key 就直接返回
      if (index < 0 || index == GetSize() || comparator(KeyAt(index), key) != 0) {
        return GetSize();
      }
    
      // 将 index 后面的键值对往前移动一格
      for (int i = index; i < GetSize() - 1; ++i) {
        array[i] = array[i + 1];
      }
    
      IncreaseSize(-1);
      return GetSize();
    }
    

    记叶节点的兄弟节点为 N" role="presentation">N,叶节点 N" role="presentation">N 的调整方式有两种:

    1. N" role="presentation">NN" role="presentation">N 的键值对数量和大于叶节点的最大键值对数量时,从兄弟节点 N" role="presentation">N 中拿走一个键值对,然后更新父节点的键
    2. 否则,将叶节点合并到兄弟节点中并删除 N" role="presentation">N,由于 N" role="presentation">N 被删除了,所以还得删除父节点上 N" role="presentation">N 对应的键值对,然后递归地对父节点进行调整

    对应的代码为:

    复制INDEX_TEMPLATE_ARGUMENTS
    template <typename N>
    bool BPLUSTREE_TYPE::CoalesceOrRedistribute(N *node, Transaction *transaction) {
      if (node->IsRootPage()) {
        return AdjustRoot(node);
      }
    
      // 找到相邻的兄弟节点
      InternalPage *parent = ToInternalPage(buffer_pool_manager_->FetchPage(node->GetParentPageId()));
      auto index = parent->ValueIndex(node->GetPageId());
      auto sibling_index = index > 0 ? index - 1 : 1;  // idnex 为 0 时必定有右兄弟节点
      N *sibling = reinterpret_cast(buffer_pool_manager_->FetchPage(parent->ValueAt(sibling_index))->GetData());
    
      // 如果两个节点的大小和大于 max_size-1,就直接重新分配,否则直接合并兄弟节点
      bool is_merge = sibling->GetSize() + node->GetSize() <= node->GetMaxSize() - 1;
      if (is_merge) {
        Coalesce(&sibling, &node, &parent, index, transaction);
      } else {
        Redistribute(sibling, node, index);
      }
    
      buffer_pool_manager_->UnpinPage(parent->GetPageId(), true);
      buffer_pool_manager_->UnpinPage(sibling->GetPageId(), true);
    
      return is_merge;
    }
    

    可以看到 CoalesceOrRedistribute() 的开头特别处理了根节点被调整的情况:

    • 当根节点不是叶节点且被删除地只剩最左边指向子节点的指针时,需要移除根节点,并将子节点变为新的根节点。比如移除下图的 5 之后,两个叶节点发生了合并,这时候要将合并后的子节点作为根节点;

    • 当根节点就是叶节点且内部一个键值对也没有时,说明 B+ 树是空的,直接移除根节点;

    对应的代码为:

    复制INDEX_TEMPLATE_ARGUMENTS
    bool BPLUSTREE_TYPE::AdjustRoot(BPlusTreePage *old_root_node) {
      bool is_deleted = false;
    
      // 根节点只包含一个无效键时需要删除根节点,将子节点变为根节点;根节点为叶节点且为没有键值对时,删除整棵树
      if (!old_root_node->IsLeafPage() && old_root_node->GetSize() == 1) {
        InternalPage *old_root = ToInternalPage(old_root_node);
        root_page_id_ = old_root->RemoveAndReturnOnlyChild();
    
        // 更新子节点的元数据
        InternalPage *child = ToInternalPage(buffer_pool_manager_->FetchPage(root_page_id_));
        child->SetParentPageId(INVALID_PAGE_ID);
        buffer_pool_manager_->UnpinPage(root_page_id_, true);
    
        UpdateRootPageId();
        is_deleted = true;
      } else if (old_root_node->IsLeafPage() && old_root_node->GetSize() == 0) {
        root_page_id_ = INVALID_PAGE_ID;
        UpdateRootPageId();
        is_deleted = true;
      }
    
      return is_deleted;
    }
    

    合并相邻节点

    如果删除了右下角叶节点 N" role="presentation">N 的 7,就会导致它和左兄弟节点 N" role="presentation">N 进行合并。这时候由于 N" role="presentation">N 已经不存在了,需要将父节点中 N" role="presentation">N 对应的键值对也删除掉。删除之后父节点也不再处于半满状态,需要和左兄弟节点进行合并,并调整递归调整父节点。

    删除之后的 B+ 树长这样:

    有些时候不存在左兄弟节点,需要合并的是右兄弟节点。为了简化代码的编写,这时候可以交换 N" role="presentation">NN" role="presentation">N 的指针,保证合并方向是从右到左。移动内部节点的时候由于 N" role="presentation">N 最左侧的键是无效的,需要替换为 N" role="presentation">N 在父节点中对应的键:

    复制INDEX_TEMPLATE_ARGUMENTS
    template <typename N>
    bool BPLUSTREE_TYPE::Coalesce(N **neighbor_node, N **node,
                                  BPlusTreeInternalPagepage_id_t, KeyComparator> **parent, int index,
                                  Transaction *transaction) {
      // 如果兄弟节点在右边,需要交换两个指针的值,这样就能确保数据移动方向是从右到左
      if (index == 0) {
        std::swap(node, neighbor_node);
      }
    
      N *child = *node, *neighbor_child = *neighbor_node;
      InternalPage *parent_node = *parent;
    
      // 内部节点要从父节点获取插到 node 中的键,右兄弟节点对应的是第一个有效键,左兄弟节点对应的就是 index 处的键
      KeyType middle_key;
      auto middle_index = index == 0 ? 1 : index;
      if (!child->IsLeafPage()) {
        middle_key = parent_node->KeyAt(middle_index);
      }
    
      // 将键值对移动到兄弟节点之后删除节点
      child->MoveAllTo(neighbor_child, middle_key, buffer_pool_manager_);
      buffer_pool_manager_->UnpinPage(child->GetPageId(), true);
      buffer_pool_manager_->DeletePage(child->GetPageId());
    
      // 删除父节点中的键值对,并递归调整父节点
      parent_node->Remove(middle_index);
      return CoalesceOrRedistribute(parent_node, transaction);
    }
    

    可以调用 MoveAllTo() 完成兄弟节点间键值对的移动,叶节点移动键值对的代码为:

    复制INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_LEAF_PAGE_TYPE::MoveAllTo(BPlusTreeLeafPage *recipient,
                                               __attribute__((unused)) const KeyType &middle_key,
                                               __attribute__((unused)) BufferPoolManager *buffer_pool_manager) {
      recipient->CopyNFrom(array, GetSize());
      recipient->SetNextPageId(GetNextPageId());
      SetSize(0);
    }
    

    内部节点移动键值对的代码如下:

    复制INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::MoveAllTo(BPlusTreeInternalPage *recipient, const KeyType &middle_key,
                                                   BufferPoolManager *buffer_pool_manager) {
      SetKeyAt(0, middle_key);
      recipient->CopyNFrom(array, GetSize(), buffer_pool_manager);
      SetSize(0);
    }
    

    重新分配

    重新分配只是将兄弟节点的一个键值对移动到 N" role="presentation">N 中,移动之后需要更新 N" role="presentation">N 中的键和父节点中 N" role="presentation">N 对应的键,代码如下:

    复制INDEX_TEMPLATE_ARGUMENTS
    template <typename N>
    void BPLUSTREE_TYPE::Redistribute(N *neighbor_node, N *node, int index) {
      // 更新父节点
      InternalPage *parent = ToInternalPage(buffer_pool_manager_->FetchPage(node->GetParentPageId()));
    
      // 内部节点要从父节点获取插到 node 中的键,右兄弟节点对应的是第一个有效键,左兄弟节点对应的就是 index 处的键
      KeyType middle_key;
      auto middle_index = index == 0 ? 1 : index;
      if (!node->IsLeafPage()) {
        middle_key = parent->KeyAt(middle_index);
      }
    
      // 兄弟节点在右边,移动第一个键值对给 node,否则将兄弟节点的最后一个键值对移给 node 并更新父节点的键
      if (index == 0) {
        neighbor_node->MoveFirstToEndOf(node, middle_key, buffer_pool_manager_);
        parent->SetKeyAt(middle_index, neighbor_node->KeyAt(0));
      } else {
        neighbor_node->MoveLastToFrontOf(node, middle_key, buffer_pool_manager_);
        parent->SetKeyAt(middle_index, node->KeyAt(0));
      }
    
      buffer_pool_manager_->UnpinPage(parent->GetPageId(), true);
    }
    

    内部节点在移动键值对的时候需要更新子节点的 parent_id_ N" role="presentation">N

    CopyLastFrom(array[0], buffer_pool_manager);
      recipient->SetKeyAt(recipient->GetSize() - 1, middle_key);  // 将父节点的键作为最后一个键
      Remove(0);
    }
    
    /* Append an entry at the end. */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::CopyLastFrom(const MappingType &pair, BufferPoolManager *buffer_pool_manager) {
      array[GetSize()] = pair;
    
      // 更新子节点的 parent id
      auto child = reinterpret_cast(buffer_pool_manager->FetchPage(pair.second)->GetData());
      child->SetParentPageId(GetPageId());
      buffer_pool_manager->UnpinPage(child->GetPageId(), true);
    
      IncreaseSize(1);
    }
    
    /*
     * Remove the last key & value pair from this page to head of "recipient" page.
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::MoveLastToFrontOf(BPlusTreeInternalPage *recipient, const KeyType &middle_key,
                                                           BufferPoolManager *buffer_pool_manager) {
      recipient->CopyFirstFrom(array[GetSize() - 1], buffer_pool_manager);
      recipient->SetKeyAt(1, middle_key); // 将父节点的键作为第一个有效键
      IncreaseSize(-1);
    }
    
    /* Append an entry at the beginning.*/
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::CopyFirstFrom(const MappingType &pair, BufferPoolManager *buffer_pool_manager) {
      std::copy_backward(array, array + GetSize(), array + GetSize() + 1);
      array[0] = pair;
    
      // 更新子节点的 parent id
      auto child = reinterpret_cast(buffer_pool_manager->FetchPage(pair.second)->GetData());
      child->SetParentPageId(GetPageId());
      buffer_pool_manager->UnpinPage(child->GetPageId(), true);
    
      IncreaseSize(1);
    }
    ">复制/*
     * Remove the first key & value pair from this page to tail of "recipient" page.
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::MoveFirstToEndOf(BPlusTreeInternalPage *recipient, const KeyType &middle_key,
                                                          BufferPoolManager *buffer_pool_manager) {
      recipient->CopyLastFrom(array[0], buffer_pool_manager);
      recipient->SetKeyAt(recipient->GetSize() - 1, middle_key);  // 将父节点的键作为最后一个键
      Remove(0);
    }
    
    /* Append an entry at the end. */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::CopyLastFrom(const MappingType &pair, BufferPoolManager *buffer_pool_manager) {
      array[GetSize()] = pair;
    
      // 更新子节点的 parent id
      auto child = reinterpret_cast(buffer_pool_manager->FetchPage(pair.second)->GetData());
      child->SetParentPageId(GetPageId());
      buffer_pool_manager->UnpinPage(child->GetPageId(), true);
    
      IncreaseSize(1);
    }
    
    /*
     * Remove the last key & value pair from this page to head of "recipient" page.
     */
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::MoveLastToFrontOf(BPlusTreeInternalPage *recipient, const KeyType &middle_key,
                                                           BufferPoolManager *buffer_pool_manager) {
      recipient->CopyFirstFrom(array[GetSize() - 1], buffer_pool_manager);
      recipient->SetKeyAt(1, middle_key); // 将父节点的键作为第一个有效键
      IncreaseSize(-1);
    }
    
    /* Append an entry at the beginning.*/
    INDEX_TEMPLATE_ARGUMENTS
    void B_PLUS_TREE_INTERNAL_PAGE_TYPE::CopyFirstFrom(const MappingType &pair, BufferPoolManager *buffer_pool_manager) {
      std::copy_backward(array, array + GetSize(), array + GetSize() + 1);
      array[0] = pair;
    
      // 更新子节点的 parent id
      auto child = reinterpret_cast(buffer_pool_manager->FetchPage(pair.second)->GetData());
      child->SetParentPageId(GetPageId());
      buffer_pool_manager->UnpinPage(child->GetPageId(), true);
    
      IncreaseSize(1);
    }
    

    B+ Tree 迭代器

    为了遍历叶节点,需要知道叶节点的页 id 和当前所处键值对在叶节点的索引,所以 B+ 树迭代器声明如下:

    复制#define INDEXITERATOR_TYPE IndexIterator
    
    INDEX_TEMPLATE_ARGUMENTS
    class IndexIterator {
      using LeafPage = BPlusTreeLeafPage;
    
     public:
      // you may define your own constructor based on your member variables
      IndexIterator(BufferPoolManager *buffer_pool_manager, B_PLUS_TREE_LEAF_PAGE_TYPE *page, int index);
      ~IndexIterator();
    
      bool isEnd();
    
      const MappingType &operator*();
    
      IndexIterator &operator++();
    
      bool operator==(const IndexIterator &itr) const;
    
      bool operator!=(const IndexIterator &itr) const;
    
     private:
      // add your own private member variables here
      BufferPoolManager *buffer_pool_manager_;
      LeafPage *page_;
      int index_;
      page_id_t page_id_;
    };
    

    迭代时可能会从一个叶节点跳到下一个,所以需要更新当前叶节点的页 id 和键值对索引。迭代结束的时候需要将 page_id_ 置为 INVALID_PAGE_ID

    复制INDEX_TEMPLATE_ARGUMENTS
    INDEXITERATOR_TYPE::IndexIterator(BufferPoolManager *buffer_pool_manager, B_PLUS_TREE_LEAF_PAGE_TYPE *page, int index)
        : buffer_pool_manager_(buffer_pool_manager),
          page_(page),
          index_(index),
          page_id_(page ? page->GetPageId() : INVALID_PAGE_ID){};
    
    INDEX_TEMPLATE_ARGUMENTS
    INDEXITERATOR_TYPE::~IndexIterator() {
      if (!isEnd()) {
        buffer_pool_manager_->UnpinPage(page_->GetPageId(), false);
      }
    };
    
    INDEX_TEMPLATE_ARGUMENTS
    bool INDEXITERATOR_TYPE::isEnd() { return page_id_ == INVALID_PAGE_ID; }
    
    INDEX_TEMPLATE_ARGUMENTS bool INDEXITERATOR_TYPE::operator==(const IndexIterator &itr) const {
      return itr.page_id_ == page_id_ && itr.index_ == index_;
    }
    
    INDEX_TEMPLATE_ARGUMENTS
    bool INDEXITERATOR_TYPE::operator!=(const IndexIterator &itr) const { return !operator==(itr); }
    
    INDEX_TEMPLATE_ARGUMENTS
    const MappingType &INDEXITERATOR_TYPE::operator*() { return page_->GetItem(index_); }
    
    INDEX_TEMPLATE_ARGUMENTS
    INDEXITERATOR_TYPE &INDEXITERATOR_TYPE::operator++() {
      if (isEnd()) {
        return *this;
      }
    
      if (index_ < page_->GetSize() - 1) {
        index_++;
      } else {
        index_ = 0;
        buffer_pool_manager_->UnpinPage(page_id_, false);
    
        // 移动到下一页
        page_id_ = page_->GetNextPageId();
        if (page_id_ != INVALID_PAGE_ID) {
          page_ = reinterpret_cast(buffer_pool_manager_->FetchPage(page_id_)->GetData());
        } else {
          page_ = nullptr;
        }
      }
    
      return *this;
    }
    

    在 B+ 树中实现迭代器接口:

    复制/**
     * Input parameter is void, find the leaftmost leaf page first, then construct index iterator
     */
    INDEX_TEMPLATE_ARGUMENTS
    INDEXITERATOR_TYPE BPLUSTREE_TYPE::begin() {
      KeyType key;
      LeafPage *page = ToLeafPage(FindLeafPage(key, true));
      return INDEXITERATOR_TYPE(buffer_pool_manager_, page, 0);
    }
    
    /**
     * Input parameter is low key, find the leaf page that contains the input key
     * first, then construct index iterator
     */
    INDEX_TEMPLATE_ARGUMENTS
    INDEXITERATOR_TYPE BPLUSTREE_TYPE::Begin(const KeyType &key) {
      LeafPage *page = ToLeafPage(FindLeafPage(key));
      auto index = page->KeyIndex(key, comparator_);
      return INDEXITERATOR_TYPE(buffer_pool_manager_, page, index);
    }
    
    /**
     * Input parameter is void, construct an index iterator representing the end
     * of the key/value pair in the leaf node
     */
    INDEX_TEMPLATE_ARGUMENTS
    INDEXITERATOR_TYPE BPLUSTREE_TYPE::end() { return INDEXITERATOR_TYPE(buffer_pool_manager_, nullptr, 0); }
    

    测试

    在终端输入:

    复制cd build
    cmake ..
    make
    
    # 绘制 B+ 树的工具
    make b_plus_tree_print_test
    ./test/b_plus_tree_print_test
    
    # 从 grading scope 拔下来的测试代码
    make b_plus_tree_checkpoint_1_test
    make b_plus_tree_checkpoint_2_sequential_test
    ./test/b_plus_tree_checkpoint_1_test
    ./test/b_plus_tree_checkpoint_2_sequential_test
    

    绘制 B+ 树的工具运行效果如下:

    在 VSCode 中安装一个 Dot 文件查看器就能看到效果了:

    测试结果如下,顺利通过了所有测试用例:

    后记

    这次实验实现了单线程 B+ 树的查找、插入、删除和迭代操作,难度相比于实验一有了质的提升,写完之后成就感十足,可能这也是 Andy 将实验二拆成两部分的原因之一吧,下个实验就要实现 B+ 树的并发了,期待一波~~

  • 相关阅读:
    058_末晨曦Vue技术_过渡 & 动画之过渡的类名
    css最终章之浮动、定位、溢出属性处理、z-index属性、透明度
    coreldraw2019安装包下载安装步骤教程
    【Bug—eNSP】华为eNsp路由器设备启动一直是0解决方案!
    麒麟系统开发笔记(十二):在国产麒麟系统上编译GDAL库、搭建基础开发环境和基础Demo
    八道指针笔试题
    try - catch 语句真的会影响性能吗?
    vulhub redis-4-unacc
    ASP.NET Core 6框架揭秘实例演示[35]:利用Session保留语境
    【图论】图文详解Tarjan算法有向图缩点
  • 原文地址:https://www.cnblogs.com/zhiyiYo/p/17472784.html