• 共享内存+inotify机制实现多进程低延迟数据共享


    本文是对共享内存实现多进程低延迟队列 10us_Sweet_Oranges的博客-CSDN博客的部分修正。

    起因
    之前的博客写过通过“inotify +file”的形式来实现多进程队列(跨进程共享)的文章。这种方式在通常情况下表现不错,但是这里存在一个问题就是“当消费者过慢,会产生大量的击穿内核高速缓冲区io,导致消费者卡在读取数据的瓶颈上,无法使用负载均衡等手段来提高处理能力。”

    为了解决上述问题,引入了共享内存,众所周知,这是所有ipc中最快的通信方式,从根本上解决这个问题。下面通过实现一个producer 和 consumer 程序,来展示我的设计思路。

    生产者
    由于物理内存有限,生产者会使用一个环形缓冲区来保证热点数据始终在内存中(类似A/B缓存这个长度为2的最小环形队列一样)。

    同时为了保证消费者的接入配置最小化,生产者将配置通过一个固定大小的结构体映射到内存中(类似我们的A/B缓冲区,前面加了一个header,header中放元信息,而A/B缓冲区放树)。消费者首先映射结构体读取配置信息,从结构体中的得知缓冲区大小后执行mremap进行重新调整大小,这样消费者只需要知道共享内存的地址(一个文件名),就可以实现消费(我们的A/B缓冲区不存在重新调整大小的情况,因为已经将数据源单帧数据大小写到配置文件中了)。同时采用了消息计数,来标识消费者是否已经处理所有消息,触发等待。当新数据到达后,唤醒消费者。我们选择了通过向指定文件写入一个字节的内容触发inotify,虽然通过信号量也可以实现,但是使用信号量会导致生产者要多开一个线程实现管理,引入额外的复杂度。

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. #include
    13. DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
    14. DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
    15. DEFINE_string(shm_file, "test", "shm file path");
    16. DEFINE_string(shm_key, "", "shm key");
    17. using namespace std;
    18. class Producer
    19. {
    20. public:
    21. Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
    22. {
    23. shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
    24. }
    25. bool Init(const std::string &key)
    26. {
    27. fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
    28. if (fd_ < 0)
    29. {
    30. printf("1. open inotify path failed\n");
    31. return false;
    32. }
    33. else
    34. {
    35. printf("1. open inotify path successed\n");
    36. }
    37. // 打开共享内存
    38. shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
    39. if (shm_fd_ < 0)
    40. {
    41. printf("2. shm_open failed\n");
    42. return false;
    43. }
    44. else
    45. {
    46. printf("2. open shm path successed\n");
    47. }
    48. uint64_t size = shm_size_ + sizeof(SHM_Data);
    49. printf("size = %ld\n", size);
    50. if (ftruncate(shm_fd_, size) == -1)
    51. {
    52. printf("3. ftruncate failed\n");
    53. return false;
    54. }
    55. else
    56. {
    57. printf("3. ftruncate successed\n");
    58. }
    59. shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
    60. if (shm_data_ == MAP_FAILED)
    61. {
    62. printf("4. mmap failed\n");
    63. return false;
    64. }
    65. else
    66. {
    67. printf("4. mmap successed\n");
    68. }
    69. shm_data_->total = 0;
    70. shm_data_->size = shm_size_;
    71. memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
    72. memcpy(shm_data_->key, key.c_str(), key.size());
    73. return true;
    74. }
    75. void Write(const char *line)
    76. {
    77. for (int i = 0; line[i] != '\0'; i++)
    78. {
    79. if (current_offset_ >= shm_size_)
    80. {
    81. current_offset_ = 0;
    82. }
    83. shm_data_->buffer[current_offset_++] = line[i];
    84. }
    85. if (current_offset_ >= shm_size_)
    86. {
    87. current_offset_ = 0;
    88. }
    89. shm_data_->buffer[current_offset_++] = '\0';
    90. shm_data_->total++;
    91. struct timeval tv;
    92. struct timezone tz;
    93. write(fd_, "8", 1);
    94. gettimeofday(&tv, &tz);
    95. //std::cout << "second : \t" << tv.tv_sec << std::endl; //秒
    96. std::cout <" second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
    97. }
    98. private:
    99. struct SHM_Data
    100. {
    101. uint64_t total; // 记录消息总数
    102. char inotify_name[512]; // inotify 文件名
    103. char key[64]; // 当前数据标识
    104. uint64_t size; // 环形缓冲区大小
    105. char buffer[]; // 环形缓冲区
    106. };
    107. SHM_Data *shm_data_ = nullptr; // 共享内存
    108. int fd_;
    109. int shm_fd_;
    110. uint64_t shm_size_ = 0;
    111. uint64_t buffer_size_ = 0;
    112. uint64_t total_read = 0;
    113. uint64_t current_offset_ = 0;
    114. int count = 0;//tmp
    115. std::string inotify_path_;
    116. std::string shm_path_;
    117. };
    118. int main(int argc, char *argv[])
    119. {
    120. char line[10] = "123456789";
    121. gflags::ParseCommandLineFlags(&argc, &argv, true);
    122. Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
    123. producer.Init(FLAGS_shm_key);//FLAGS开头的这些指的是用户命令行输入的,如果不输入,默认就是空字符串
    124. for(int i =0;i<200;i++)//测试两百次
    125. {
    126. producer.Write(line);
    127. sleep(1);
    128. }
    129. // producer.Write(line);
    130. std::cout << "write finished" << std::endl;
    131. }

    消费者

    消费者实现就相对简单一些,读取配置结构体(header),执行mremap调整大小。 如果机器性能足够,可以选择不等待inotify,类似自旋锁的方式。这种方式测试发现新消息能在10us左右被消费者感知,使用inoitfy新消息感知需要40us左右。

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. #include
    13. DEFINE_string(shm_file, "test", "shm file path");
    14. DEFINE_bool(shm_nowait, false, "shm no wait mode");
    15. #define EVENT_SIZE (sizeof(struct inotify_event))
    16. #define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
    17. using namespace std;
    18. class Consumer
    19. {
    20. public:
    21. Consumer(int tag, const std::string &shm_path)
    22. : tag_(tag), shm_path_(shm_path)
    23. {
    24. line_size_ = 1024;
    25. inotify_buffer_ = new char[BUF_LEN];
    26. line_ = new char[line_size_];
    27. }
    28. ~Consumer()
    29. {
    30. delete[] line_;
    31. delete[] inotify_buffer_;
    32. }
    33. bool Init(const std::string &key)
    34. {
    35. shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
    36. if (shm_fd_ < 0)
    37. {
    38. printf("1. shm_open failed\n");
    39. return false;
    40. }
    41. else
    42. {
    43. printf("1. shm_open successed\n");
    44. }
    45. SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
    46. if (shm_info_ == MAP_FAILED)
    47. {
    48. printf("2. mmap info failed\n");
    49. return false;
    50. }
    51. else
    52. {
    53. printf("2. mmap info successed\n");
    54. }
    55. printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
    56. if (strcasecmp(shm_info_->key, key.c_str()) != 0)
    57. {
    58. printf("3. key not match \n");
    59. return false;
    60. }
    61. else
    62. {
    63. printf("3. key matched \n");
    64. }
    65. // 开始监听文件变化
    66. inotify_fd_ = inotify_init();
    67. if (inotify_fd_ < 0)
    68. {
    69. printf("4. inotify_init failed\n");
    70. return false;
    71. }
    72. else
    73. {
    74. printf("4. inotify_init successed\n");
    75. }
    76. shm_size_ = shm_info_->size;
    77. uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
    78. printf("realsize = %ld\n", real_size);
    79. inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
    80. shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
    81. if (shm_data_ == MAP_FAILED)
    82. {
    83. printf("5. mmap data failed\n");
    84. return false;
    85. }
    86. else
    87. {
    88. printf("5. mmap data successed\n");
    89. }
    90. return true;
    91. }
    92. void Loop()
    93. {
    94. while (true)
    95. {
    96. while (total_read < shm_data_->total)
    97. {
    98. // printf("5---------------\n");
    99. for (int i = 0; i < line_size_; i++)
    100. {
    101. if (current_offset_ >= shm_size_)
    102. {
    103. current_offset_ = 0;
    104. }
    105. line_[i] = shm_data_->buffer[current_offset_++];
    106. if (line_[i] == '\0')
    107. {
    108. break;
    109. }
    110. }
    111. total_read++;
    112. printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
    113. }
    114. // printf("6.-------\n");
    115. if (!FLAGS_shm_nowait)
    116. {
    117. //printf("7.-------\n");
    118. read(inotify_fd_, inotify_buffer_, BUF_LEN);
    119. struct timeval tv;
    120. struct timezone tz;
    121. gettimeofday(&tv, &tz);
    122. // std::cout<< "second : \t" << tv.tv_sec << std::endl; //秒
    123. std::cout <" second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
    124. }
    125. }
    126. }
    127. private:
    128. struct SHM_Data
    129. {
    130. uint64_t total; // 记录消息总数
    131. char inotify_name[512]; // inotify 文件名
    132. char key[64]; // 当前数据标识
    133. uint64_t size; // 环形缓冲区大小
    134. char buffer[]; // 环形缓冲区
    135. };
    136. SHM_Data *shm_data_ = nullptr; // 共享对象指针
    137. uint64_t shm_size_ = 0; // 共享内存大小
    138. uint64_t line_size_ = 0; // 每条数据最大值
    139. uint64_t total_read = 0; // 当前读取总记录数
    140. uint64_t current_offset_ = 0; // 当前读取的偏移量
    141. int count = 0;//tmp
    142. std::string shm_path_;
    143. int inotify_fd_;
    144. int shm_fd_;
    145. int tag_;
    146. char *line_;
    147. char *inotify_buffer_;
    148. };
    149. DEFINE_string(shm_key, "", "shm key");
    150. int main(int argc, char *argv[])
    151. {
    152. gflags::ParseCommandLineFlags(&argc, &argv, true);
    153. Consumer consumer(2, FLAGS_shm_file);
    154. if (!consumer.Init(FLAGS_shm_key))
    155. {
    156. return 1;
    157. }
    158. consumer.Loop();
    159. }

    Loop()这部分要改成回调机制。

    编译

    因为这里使用了gflag,所以需要先编译安装gflag。过程参考linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装

    对于本文的两个程序,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译。

    结果如下:

    可以发现,这种机制下,生产者生产完数据后,消费者能在100us内感知到,这个效率还是非常高的。

     使用top也看不出来这两个进程对CPU的明显消耗。

    本文的消费者目前采用的是while循环来持续监听inotify,这个机制还是要改造成事件机制比较好。

    参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050

     

  • 相关阅读:
    Spring中InitializingBean接口的功能
    Flink学习笔记(一):Flink重要概念和原理
    导入Embassy库进行爬虫
    逻辑判断与正则表达式文本处理
    MongoDB 主从集群 1
    Spring Authorization Server授权服务器入门
    babel.config.js配置文件详解
    LLM - LLaMA-2 获取文本向量并计算 Cos 相似度
    el-input无法输入
    叠氮N3/巯基SH/生物素Biotin修饰CdSe/CdS硒化镉/硫化镉量子点
  • 原文地址:https://blog.csdn.net/jinking01/article/details/126123539