• muduo库的高性能日志库(五)——AsyncLogging文件


    概述

    这一部分就是muduo库之所以十分高效的原因,将前端与后端联系起来,实现了多生产者单消费者的异步网络日志库。

    在多线程服务器程序当中,异步日志(非阻塞日志)是必须的,因为如果在网络IO线程或业务线程中直接往磁盘写数据的话,写操作偶尔可能阻塞长达数秒之久。这可能导致请求方超时,或耽误发送心跳消息,在分布式系统过更可能发生连锁效应。因此在正常的实现业务处理流程中应该彻底避免磁盘IO,这在one loop per thread模型的非阻塞服务端程序中尤为重要

    问题

    1. 在AsyncLogging中的CountDownLatch,有什么用?是否必要呢?

    设计结构

    使用双缓冲技术,准备两块缓冲A和B,缓冲A接收日志消息,缓冲B将日志消息写入磁盘。当A写满,交换A和B,后端将B写入磁盘文件,前端则往A写的日志消息,如此反复。

      //Buffer使用c++11的std::unique_ptr进行管理,自动管理声明周期,其具备移动语义,
      //提高了缓冲区交换的效率。mutex_用于保护后面4个成员。buffer_存放供后端写入的buffer(目前最多16个)。
      typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; //缓冲区约4MB(4000*1000;)
      typedef std::vector<std::unique_ptr<Buffer>> BufferVector;  //因为要采用多缓冲技术,用vector来管理多个缓冲区
      typedef BufferVector::value_type BufferPtr;   //指针
    
      const int flushInterval_;
      //atomic修饰变量是原子类型
      std::atomic<bool> running_;
      const string basename_; //日志文件名
      const off_t rollSize_;  //滚动大小
      muduo::Thread thread_;
      muduo::CountDownLatch latch_;
      muduo::MutexLock mutex_;
      muduo::Condition cond_ GUARDED_BY(mutex_);
      BufferPtr currentBuffer_ GUARDED_BY(mutex_);  //当前缓冲区
      BufferPtr nextBuffer_ GUARDED_BY(mutex_);   //预备缓冲区
      BufferVector buffers_ GUARDED_BY(mutex_);   //待写入文件的已填满缓冲,供后端使用
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    多生产者(产生日志消息)

    前端在生产一条日志消息短的时候会调用AsyncLogging::append
    在这个函数中,若当前缓冲区currentBuffer_最够大,就直接写入该缓冲区。
    currentBuffer_缓冲区剩余空间不够大,使用move将当前缓冲区移动到待写入缓冲区队列当中(Buffers_),则使用移动语义move将备用缓冲区移用为当前缓冲区,然后在进行消息追加。
    当前端写入速度过快,一下子把两个缓冲区都用完了,那就重新分配一块buffer,作为当前缓冲区。

    //日志发送者(多生产者)
    void AsyncLogging::append(const char* logline, int len)
    {
      muduo::MutexLockGuard lock(mutex_);
      //判断当前缓冲区是否空间足够
      if (currentBuffer_->avail() > len)
      {
        //直接追加到当前缓冲区当中
        currentBuffer_->append(logline, len);
      }
      //当前缓冲区空间不够
      else
      {
        //使用move将当前缓冲区移动到待写入文件队列当中
        buffers_.push_back(std::move(currentBuffer_));
        //查看备用缓冲区是否还在
        if (nextBuffer_)
        {
          //存在
          //当备用缓冲区移动给当前缓冲区
          currentBuffer_ = std::move(nextBuffer_);
        }
        else得在start结得在start结
        {
          //不存在,new一个新的buffer
          currentBuffer_.reset(new Buffer); // Rarely happens
        }
        currentBuffer_->append(logline, len);
        cond_.notify();   //通知消费者(写端),有一个待写入的已填满缓冲
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    单消费(消费日志消息)

    要提前准备两个缓冲区
    目的是缩短临界区大小,防止阻塞

    
    //单消费者
    void AsyncLogging::threadFunc()
    {
      assert(running_ == true);
      latch_.countDown();
      LogFile output(basename_, rollSize_, false); //采用非线程安全
      BufferPtr newBuffer1(new Buffer);
      BufferPtr newBuffer2(new Buffer);
      newBuffer1->bzero();
      newBuffer2->bzero();
      BufferVector buffersToWrite;
      buffersToWrite.reserve(16);
      while (running_)
      {
        assert(newBuffer1 && newBuffer1->length() == 0);
        assert(newBuffer2 && newBuffer2->length() == 0);
        assert(buffersToWrite.empty());
        //临界区
        {
          muduo::MutexLockGuard lock(mutex_);
          //暂无待写入内容
          if (buffers_.empty())  // unusual usage!
          {
            cond_.waitForSeconds(flushInterval_); //有超时时长
          }
          //将当前缓冲区放入待写入队列当中
          buffers_.push_back(std::move(currentBuffer_));
    
          //将新的缓冲移动给当前缓冲
          currentBuffer_ = std::move(newBuffer1);
    
          //交换缓冲区,可以缩短临界区。这样就可以在非临界区完成对待写入队列进行操作
          buffersToWrite.swap(buffers_);
    
          //保证前端一直有预备缓冲区
          if (!nextBuffer_)
          {
            nextBuffer_ = std::move(newBuffer2);
          }
        }
      
        //出临界区
        assert(!buffersToWrite.empty());
        //短时间写入大量日志,超出待写入缓冲区大小限制,日志堆积,为异常情况
        if (buffersToWrite.size() > 25)
        {
          char buf[256];
          snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
                   Timestamp::now().toFormattedString().c_str(),
                   buffersToWrite.size()-2);
          fputs(buf, stderr);
          output.append(buf, static_cast<int>(strlen(buf)));
          //可能是避免浪费,后面还要使用
          buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
        }
    
        for (const auto& buffer : buffersToWrite)
        {
          // FIXME: use unbuffered stdio FILE ? or use ::writev ?
          //数据写入
          output.append(buffer->data(), buffer->length());
        }
    
        if (buffersToWrite.size() > 2)
        {
          // drop non-bzero-ed buffers, avoid trashing
          //丢弃垃圾
          buffersToWrite.resize(2);
        }
    
    
       //重新填充newbuffer1和newbuffer2
        if (!newBuffer1)
        {
          assert(!buffersToWrite.empty());
          newBuffer1 = std::move(buffersToWrite.back());
          buffersToWrite.pop_back();
          newBuffer1->reset();
        }
    
        if (!newBuffer2)
        {
          assert(!buffersToWrite.empty());
          newBuffer2 = std::move(buffersToWrite.back());
          buffersToWrite.pop_back();
          newBuffer2->reset();
        }
        //清空
        buffersToWrite.clear();
        output.flush();
      }
    
      //日志关闭再flush
      output.flush();
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    问题解决

    1.这里的CountDownLatch作用就是,让wait()之后的语句在countDown()之后执行,这里就是想在start()结束前进入threadFunc()函数。若不添加CountDownLatch,就可能会导致前端调用第一次的notify()唤醒丢失。也就是生产者notify()以后,消费者(异步线程ThreadFunc)未收到信号,而生产者已将消息产出。

    思考一下,这里的CountDownLatch是否必要呢,在我看来是不必要的原因如下

    因为每一次notify(),都会把前面收到的信息一起写入文件,极端情况,生产者只notify()一次就退出,那么在flushInterval_之后,之前消息也会被写入,就算不到flushInterval_那么也会在析构函数中进行一次notify()保证无消息漏掉。

    综上我认为CountDownLatch并不必要

  • 相关阅读:
    工业机器人多物料双姿态循环搬运工艺集成
    ucharts最详细教程(含踩坑记录)
    人工智能职业教育怎么搞?操作系统层级的解法来了
    厉害了!阿里内部都用的Spring+MyBatis源码手册,实战理论两不误
    Javase | StringBuffer、StringBuilder
    分布式全局唯一 ID生成器(百度UidGenerator)
    玩转DIY可视化打造专属小程序
    Day15-Python基础学习之PySpark
    vue3封装Axios库的 API 请求并使用拦截器来处理请求和响应
    矩阵求导简记
  • 原文地址:https://blog.csdn.net/m0_61705102/article/details/127952825