本文是对共享内存实现多进程低延迟队列 10us_Sweet_Oranges的博客-CSDN博客的部分修正。
起因
之前的博客写过通过“inotify +file”的形式来实现多进程队列(跨进程共享)的文章。这种方式在通常情况下表现不错,但是这里存在一个问题就是“当消费者过慢,会产生大量的击穿内核高速缓冲区io,导致消费者卡在读取数据的瓶颈上,无法使用负载均衡等手段来提高处理能力。”
为了解决上述问题,引入了共享内存,众所周知,这是所有ipc中最快的通信方式,从根本上解决这个问题。下面通过实现一个producer 和 consumer 程序,来展示我的设计思路。
生产者
由于物理内存有限,生产者会使用一个环形缓冲区来保证热点数据始终在内存中(类似A/B缓存这个长度为2的最小环形队列一样)。
同时为了保证消费者的接入配置最小化,生产者将配置通过一个固定大小的结构体映射到内存中(类似我们的A/B缓冲区,前面加了一个header,header中放元信息,而A/B缓冲区放树)。消费者首先映射结构体读取配置信息,从结构体中的得知缓冲区大小后执行mremap进行重新调整大小,这样消费者只需要知道共享内存的地址(一个文件名),就可以实现消费(我们的A/B缓冲区不存在重新调整大小的情况,因为已经将数据源单帧数据大小写到配置文件中了)。同时采用了消息计数,来标识消费者是否已经处理所有消息,触发等待。当新数据到达后,唤醒消费者。我们选择了通过向指定文件写入一个字节的内容触发inotify,虽然通过信号量也可以实现,但是使用信号量会导致生产者要多开一个线程实现管理,引入额外的复杂度。
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- #include
-
- DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
- DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
- DEFINE_string(shm_file, "test", "shm file path");
- DEFINE_string(shm_key, "", "shm key");
-
- using namespace std;
-
- class Producer
- {
- public:
- Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
- {
- shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
- }
-
- bool Init(const std::string &key)
- {
- fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
- if (fd_ < 0)
- {
- printf("1. open inotify path failed\n");
- return false;
- }
- else
- {
- printf("1. open inotify path successed\n");
- }
- // 打开共享内存
- shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
- if (shm_fd_ < 0)
- {
- printf("2. shm_open failed\n");
- return false;
- }
- else
- {
- printf("2. open shm path successed\n");
- }
- uint64_t size = shm_size_ + sizeof(SHM_Data);
- printf("size = %ld\n", size);
-
- if (ftruncate(shm_fd_, size) == -1)
- {
- printf("3. ftruncate failed\n");
- return false;
- }
- else
- {
- printf("3. ftruncate successed\n");
- }
-
- shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
- if (shm_data_ == MAP_FAILED)
- {
- printf("4. mmap failed\n");
- return false;
- }
- else
- {
- printf("4. mmap successed\n");
- }
- shm_data_->total = 0;
- shm_data_->size = shm_size_;
- memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
- memcpy(shm_data_->key, key.c_str(), key.size());
- return true;
- }
- void Write(const char *line)
- {
- for (int i = 0; line[i] != '\0'; i++)
- {
- if (current_offset_ >= shm_size_)
- {
- current_offset_ = 0;
- }
- shm_data_->buffer[current_offset_++] = line[i];
- }
- if (current_offset_ >= shm_size_)
- {
- current_offset_ = 0;
- }
- shm_data_->buffer[current_offset_++] = '\0';
- shm_data_->total++;
-
- struct timeval tv;
- struct timezone tz;
-
- write(fd_, "8", 1);
-
- gettimeofday(&tv, &tz);
- //std::cout << "second : \t" << tv.tv_sec << std::endl; //秒
- std::cout <
" second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒 - }
-
- private:
- struct SHM_Data
- {
- uint64_t total; // 记录消息总数
- char inotify_name[512]; // inotify 文件名
- char key[64]; // 当前数据标识
- uint64_t size; // 环形缓冲区大小
- char buffer[]; // 环形缓冲区
- };
- SHM_Data *shm_data_ = nullptr; // 共享内存
- int fd_;
- int shm_fd_;
- uint64_t shm_size_ = 0;
- uint64_t buffer_size_ = 0;
- uint64_t total_read = 0;
- uint64_t current_offset_ = 0;
- int count = 0;//tmp
- std::string inotify_path_;
- std::string shm_path_;
- };
-
- int main(int argc, char *argv[])
- {
- char line[10] = "123456789";
- gflags::ParseCommandLineFlags(&argc, &argv, true);
-
- Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
- producer.Init(FLAGS_shm_key);//FLAGS开头的这些指的是用户命令行输入的,如果不输入,默认就是空字符串
- for(int i =0;i<200;i++)//测试两百次
- {
- producer.Write(line);
- sleep(1);
- }
- // producer.Write(line);
- std::cout << "write finished" << std::endl;
- }
消费者实现就相对简单一些,读取配置结构体(header),执行mremap调整大小。 如果机器性能足够,可以选择不等待inotify,类似自旋锁的方式。这种方式测试发现新消息能在10us左右被消费者感知,使用inoitfy新消息感知需要40us左右。
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- DEFINE_string(shm_file, "test", "shm file path");
- DEFINE_bool(shm_nowait, false, "shm no wait mode");
-
- #define EVENT_SIZE (sizeof(struct inotify_event))
- #define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
- using namespace std;
-
- class Consumer
- {
-
- public:
- Consumer(int tag, const std::string &shm_path)
- : tag_(tag), shm_path_(shm_path)
- {
- line_size_ = 1024;
- inotify_buffer_ = new char[BUF_LEN];
- line_ = new char[line_size_];
- }
- ~Consumer()
- {
- delete[] line_;
- delete[] inotify_buffer_;
- }
- bool Init(const std::string &key)
- {
- shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
- if (shm_fd_ < 0)
- {
- printf("1. shm_open failed\n");
- return false;
- }
- else
- {
- printf("1. shm_open successed\n");
- }
-
- SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
- if (shm_info_ == MAP_FAILED)
- {
- printf("2. mmap info failed\n");
- return false;
- }
- else
- {
- printf("2. mmap info successed\n");
- }
- printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
- if (strcasecmp(shm_info_->key, key.c_str()) != 0)
- {
- printf("3. key not match \n");
- return false;
- }
- else
- {
- printf("3. key matched \n");
- }
- // 开始监听文件变化
- inotify_fd_ = inotify_init();
- if (inotify_fd_ < 0)
- {
- printf("4. inotify_init failed\n");
- return false;
- }
- else
- {
- printf("4. inotify_init successed\n");
- }
- shm_size_ = shm_info_->size;
- uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
- printf("realsize = %ld\n", real_size);
-
- inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
-
- shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
- if (shm_data_ == MAP_FAILED)
- {
- printf("5. mmap data failed\n");
- return false;
- }
- else
- {
- printf("5. mmap data successed\n");
- }
- return true;
- }
-
- void Loop()
- {
- while (true)
- {
- while (total_read < shm_data_->total)
- {
- // printf("5---------------\n");
- for (int i = 0; i < line_size_; i++)
- {
- if (current_offset_ >= shm_size_)
- {
- current_offset_ = 0;
- }
- line_[i] = shm_data_->buffer[current_offset_++];
- if (line_[i] == '\0')
- {
- break;
- }
- }
- total_read++;
- printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
- }
- // printf("6.-------\n");
- if (!FLAGS_shm_nowait)
- {
- //printf("7.-------\n");
- read(inotify_fd_, inotify_buffer_, BUF_LEN);
- struct timeval tv;
- struct timezone tz;
-
- gettimeofday(&tv, &tz);
- // std::cout<< "second : \t" << tv.tv_sec << std::endl; //秒
- std::cout <
" second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒 - }
- }
- }
-
- private:
- struct SHM_Data
- {
- uint64_t total; // 记录消息总数
- char inotify_name[512]; // inotify 文件名
- char key[64]; // 当前数据标识
- uint64_t size; // 环形缓冲区大小
- char buffer[]; // 环形缓冲区
- };
- SHM_Data *shm_data_ = nullptr; // 共享对象指针
- uint64_t shm_size_ = 0; // 共享内存大小
- uint64_t line_size_ = 0; // 每条数据最大值
- uint64_t total_read = 0; // 当前读取总记录数
- uint64_t current_offset_ = 0; // 当前读取的偏移量
-
- int count = 0;//tmp
-
- std::string shm_path_;
-
- int inotify_fd_;
- int shm_fd_;
- int tag_;
-
- char *line_;
- char *inotify_buffer_;
- };
- DEFINE_string(shm_key, "", "shm key");
- int main(int argc, char *argv[])
- {
- gflags::ParseCommandLineFlags(&argc, &argv, true);
- Consumer consumer(2, FLAGS_shm_file);
- if (!consumer.Init(FLAGS_shm_key))
- {
- return 1;
- }
- consumer.Loop();
- }
Loop()这部分要改成回调机制。
编译
因为这里使用了gflag,所以需要先编译安装gflag。过程参考linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装
对于本文的两个程序,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译。
结果如下:

可以发现,这种机制下,生产者生产完数据后,消费者能在100us内感知到,这个效率还是非常高的。

使用top也看不出来这两个进程对CPU的明显消耗。
本文的消费者目前采用的是while循环来持续监听inotify,这个机制还是要改造成事件机制比较好。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050