• DORIS 文件读取(研究中)


    前言

    该文章用于记录研究doris 1.1.1版本的后端文件读取流程。目前还只有部分流程。正在完善中。

    文章目录

    先说说Doris的数据存储结构


    1.tablet是真正存储在be的物理结构,有多个副本。参考[1]
    2.rowset是对于tablet下的数据集合进行的一次变更的版本,例如数据导入,删除和更新等。一个tablet下可能有多个rowset,doris会进行compaction对相邻版本的rowset进行合并,如果是group或者unique数据模型会进行相应的聚合操作。
    3.segment是rowset下面的数据分段,是以文件形式存在,里面包含有当前segment的列数据和相关的索引结构。
    4.column 是segment中列式存储的组织结构,包括data column和 index column。
    5.page是列数据的分块,每个列的具体数据存储在page当中。同样分为data page和iddex page。

        1. BE端OlapScanNode文件读取逻辑

      文件目录   doris/be/src/exec/olap_scan_node.cpp 

     开始读取文件的函数Status OlapScanNode::start_scan_thread(RuntimeState* state)

    1. Status OlapScanNode::start_scan_thread(RuntimeState* state) {
    2. if (_scan_ranges.empty()) {
    3. _transfer_done = true;
    4. return Status::OK();
    5. }
    6. // ranges constructed from scan keys
    7. std::vector> cond_ranges;
    8. RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
    9. // if we can't get ranges from conditions, we give it a total range
    10. if (cond_ranges.empty()) {
    11. cond_ranges.emplace_back(new OlapScanRange());
    12. }
    13. bool need_split = true;
    14. // If we have ranges more than 64, there is no need to call
    15. // ShowHint to split ranges
    16. if (limit() != -1 || cond_ranges.size() > 64) {
    17. need_split = false;
    18. }
    19. int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
    20. std::unordered_set disk_set;
    21. for (auto& scan_range : _scan_ranges) {
    22. auto tablet_id = scan_range->tablet_id;
    23. int32_t schema_hash = strtoul(scan_range->schema_hash.c_str(), nullptr, 10);
    24. std::string err;
    25. TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
    26. tablet_id, schema_hash, true, &err);
    27. if (tablet == nullptr) {
    28. std::stringstream ss;
    29. ss << "failed to get tablet: " << tablet_id << " with schema hash: " << schema_hash
    30. << ", reason: " << err;
    31. LOG(WARNING) << ss.str();
    32. return Status::InternalError(ss.str());
    33. }
    34. std::vector>* ranges = &cond_ranges;
    35. std::vector> split_ranges;
    36. if (need_split && !tablet->all_beta()) {
    37. auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count,
    38. _scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges,
    39. &split_ranges, _runtime_profile.get());
    40. if (st.ok()) {
    41. ranges = &split_ranges;
    42. }
    43. }
    44. // In order to avoid the problem of too many scanners caused by small tablets,
    45. // in addition to scanRange, we also need to consider the size of the tablet when
    46. // creating the scanner. One scanner is used for every 1Gb, and the final scanner_per_tablet
    47. // takes the minimum value calculated by scanrange and size.
    48. int size_based_scanners_per_tablet = 1;
    49. if (config::doris_scan_range_max_mb > 0) {
    50. size_based_scanners_per_tablet = std::max(
    51. 1, (int)tablet->tablet_footprint() / config::doris_scan_range_max_mb << 20);
    52. }
    53. int ranges_per_scanner =
    54. std::max(1, (int)ranges->size() /
    55. std::min(scanners_per_tablet, size_based_scanners_per_tablet));
    56. int num_ranges = ranges->size();
    57. for (int i = 0; i < num_ranges;) {
    58. std::vector scanner_ranges;
    59. scanner_ranges.push_back((*ranges)[i].get());
    60. ++i;
    61. for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
    62. (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
    63. ++j, ++i) {
    64. scanner_ranges.push_back((*ranges)[i].get());
    65. }
    66. OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation,
    67. _need_agg_finalize, *scan_range);
    68. scanner->set_batch_size(_batch_size);
    69. // add scanner to pool before doing prepare.
    70. // so that scanner can be automatically deconstructed if prepare failed.
    71. _scanner_pool.add(scanner);
    72. RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
    73. _bloom_filters_push_down));
    74. _olap_scanners.push_back(scanner);
    75. disk_set.insert(scanner->scan_disk());
    76. }
    77. }
    78. COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
    79. COUNTER_SET(_num_scanners, static_cast<int64_t>(_olap_scanners.size()));
    80. // PAIN_LOG(_olap_scanners.size());
    81. // init progress
    82. std::stringstream ss;
    83. ss << "ScanThread complete (node=" << id() << "):";
    84. _progress = ProgressUpdater(ss.str(), _olap_scanners.size(), 1);
    85. _transfer_thread = std::make_shared(&OlapScanNode::transfer_thread, this, state);
    86. return Status::OK();
    87. }

    其中以下部分先读取tablet元数据信息,有些tablet的元数据信息存放在Rocksdb

    1. TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
    2. tablet_id, schema_hash, true, &err);

    再然后创建真正的数据scanner

    1. OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation,
    2. _need_agg_finalize, *scan_range);

    其中OlapScanner在的prepare函数创建Rowset

    1. Status OlapScanner::prepare(
    2. const TPaloScanRange& scan_range, const std::vector& key_ranges,
    3. const std::vector& filters,
    4. const std::vector>>&
    5. bloom_filters)

    其中prepare函数中的下面这么代码会真正的创建Rowset

    1. OLAPStatus acquire_reader_st =
    2. _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers, _mem_tracker);

    再然后是tablet类中的函数调用

    1. OLAPStatus Tablet::capture_rs_readers(const std::vector& version_path,
    2. std::vector* rs_readers,
    3. std::shared_ptr parent_tracker)

     2. in-memory属性在BE端产生的效果

    该部分用于说明DORIS BE端是如何对in-memory属性进行数据缓存的,原理就是BE端在读取列时,使用LRU缓存。读取的顺序从以下文件开始:

    doris/be/src/olap/rowset/segment_v2/segment_iterator.cpp

    1. Status SegmentIterator::_read_columns(const std::vector& column_ids, RowBlockV2* block,
    2. size_t row_offset, size_t nrows) {
    3. for (auto cid : column_ids) {
    4. auto column_block = block->column_block(cid);
    5. ColumnBlockView dst(&column_block, row_offset);
    6. size_t rows_read = nrows;
    7. RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, &dst));
    8. DCHECK_EQ(nrows, rows_read);
    9. }
    10. return Status::OK();
    11. }

    读取page数据

    doris/be/src/olap/rowset/segment_v2/column_reader.cpp

    1. Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
    2. PageHandle* handle, Slice* page_body, PageFooterPB* footer,
    3. BlockCompressionCodec* codec) {
    4. iter_opts.sanity_check();
    5. PageReadOptions opts;
    6. opts.rblock = iter_opts.rblock;
    7. opts.page_pointer = pp;
    8. opts.codec = codec;
    9. opts.stats = iter_opts.stats;
    10. opts.verify_checksum = _opts.verify_checksum;
    11. opts.use_page_cache = iter_opts.use_page_cache;
    12. opts.kept_in_memory = _opts.kept_in_memory;
    13. opts.type = iter_opts.type;
    14. opts.encoding_info = _encoding_info;
    15. return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
    16. }

    其中决定是否启用in-memory从内存读取的关键是,其中的

    if (opts.use_page_cache && cache->is_cache_available(opts.type)),即启用in-memory属性且Cache有空间可用,相关的page在LRU缓存中没被弹出
    1. Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
    2. Slice* body, PageFooterPB* footer) {
    3. opts.sanity_check();
    4. opts.stats->total_pages_num++;
    5. auto cache = StoragePageCache::instance();
    6. PageCacheHandle cache_handle;
    7. StoragePageCache::CacheKey cache_key(opts.rblock->path_desc().filepath, opts.page_pointer.offset);
    8. if (opts.use_page_cache && cache->is_cache_available(opts.type) && cache->lookup(cache_key, &cache_handle, opts.type)) {
    9. // we find page in cache, use it
    10. *handle = PageHandle(std::move(cache_handle));
    11. opts.stats->cached_pages_num++;
    12. // parse body and footer
    13. Slice page_slice = handle->data();
    14. uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
    15. std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
    16. if (!footer->ParseFromString(footer_buf)) {
    17. return Status::Corruption("Bad page: invalid footer");
    18. }
    19. *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
    20. return Status::OK();
    21. }
    22. // every page contains 4 bytes footer length and 4 bytes checksum
    23. const uint32_t page_size = opts.page_pointer.size;
    24. if (page_size < 8) {
    25. return Status::Corruption(strings::Substitute("Bad page: too small size ($0)", page_size));
    26. }
    27. // hold compressed page at first, reset to decompressed page later
    28. std::unique_ptr<char[]> page(new char[page_size]);
    29. Slice page_slice(page.get(), page_size);
    30. {
    31. SCOPED_RAW_TIMER(&opts.stats->io_ns);
    32. RETURN_IF_ERROR(opts.rblock->read(opts.page_pointer.offset, page_slice));
    33. opts.stats->compressed_bytes_read += page_size;
    34. }
    35. if (opts.verify_checksum) {
    36. uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
    37. uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4);
    38. if (expect != actual) {
    39. return Status::Corruption(strings::Substitute(
    40. "Bad page: checksum mismatch (actual=$0 vs expect=$1)", actual, expect));
    41. }
    42. }
    43. // remove checksum suffix
    44. page_slice.size -= 4;
    45. // parse and set footer
    46. uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
    47. if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
    48. return Status::Corruption("Bad page: invalid footer");
    49. }
    50. uint32_t body_size = page_slice.size - 4 - footer_size;
    51. if (body_size != footer->uncompressed_size()) { // need decompress body
    52. if (opts.codec == nullptr) {
    53. return Status::Corruption("Bad page: page is compressed but codec is NO_COMPRESSION");
    54. }
    55. SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
    56. std::unique_ptr<char[]> decompressed_page(
    57. new char[footer->uncompressed_size() + footer_size + 4]);
    58. // decompress page body
    59. Slice compressed_body(page_slice.data, body_size);
    60. Slice decompressed_body(decompressed_page.get(), footer->uncompressed_size());
    61. RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
    62. if (decompressed_body.size != footer->uncompressed_size()) {
    63. return Status::Corruption(strings::Substitute(
    64. "Bad page: record uncompressed size=$0 vs real decompressed size=$1",
    65. footer->uncompressed_size(), decompressed_body.size));
    66. }
    67. // append footer and footer size
    68. memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
    69. footer_size + 4);
    70. // free memory of compressed page
    71. page = std::move(decompressed_page);
    72. page_slice = Slice(page.get(), footer->uncompressed_size() + footer_size + 4);
    73. opts.stats->uncompressed_bytes_read += page_slice.size;
    74. } else {
    75. opts.stats->uncompressed_bytes_read += body_size;
    76. }
    77. if (opts.encoding_info) {
    78. auto* pre_decoder = opts.encoding_info->get_data_page_pre_decoder();
    79. if (pre_decoder) {
    80. RETURN_IF_ERROR(pre_decoder->decode(
    81. &page, &page_slice,
    82. footer->data_page_footer().nullmap_size() + footer_size + 4));
    83. }
    84. }
    85. *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
    86. if (opts.use_page_cache && cache->is_cache_available(opts.type)) {
    87. // insert this page into cache and return the cache handle
    88. cache->insert(cache_key, page_slice, &cache_handle, opts.type, opts.kept_in_memory);
    89. *handle = PageHandle(std::move(cache_handle));
    90. } else {
    91. *handle = PageHandle(page_slice);
    92. }
    93. page.release(); // memory now managed by handle
    94. return Status::OK();
    95. }

    3 rowset与segment

    SegmentLoader ->  BetaRowsetSharedPtr = std::shared_ptr 
    1. OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
    2. // load segments
    3. RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
    4. _rowset, &_segment_cache_handle, read_context->reader_type == ReaderType::READER_QUERY));
    5. }

    Betarowset -> create_reader(){

    BetaRowsetReader

    }

    参考文档:

    1 https://blog.csdn.net/weixin_44012322/article/details/121163144

    https://www.jianshu.com/p/141ad958832d

    新一代列式存储格式Parquet_教练_我要踢球的博客-CSDN博客_parquet

    【Doris全面解析】Doris SQL 原理解析_ApacheDoris的博客-CSDN博客

    doris-查询原理_longlovefilm的博客-CSDN博客

    【Doris】Doris存储层设计介绍1——存储结构设计解析_九层之台起于累土的博客-CSDN博客_doris存储

    Doris Stream Load原理解析 - 墨天轮

  • 相关阅读:
    第二次提交PR啦
    layui视频上传,文件上传前条件验证,视频大小,视频时长判断
    填矩阵 码蹄集
    计算机网络-速记
    算法与数据结构 --- 遍历二叉树和线索二叉树
    springboot整合百度富文本编辑器ueditor实现图片上传和文件上传以及回显
    uniapp开发短视频系统仿哔哩哔哩
    BC1.2 PD协议
    java对象四大引用案例
    ES6空值合并运算符(??)
  • 原文地址:https://blog.csdn.net/innersense/article/details/126824446