• 日志库的设计与模块


    组件

    线程池

    总体认知

    1. 为什么要用一个线程池: 减少线程的创建, 异步的执行任务更解耦, 例如日志需要磁盘落地, 通过异步方式让日志给线程池, 大大的提升性能, 对于一些耗时的任务用线程池去做, 主线程关于主要任务。 此外我们的cpu现在都是多个核的, 一个线程在一个核中运行, 你如果是单线程, 一个核运行, 其他的核都是等待状态, cpu性能完全没有榨干, 最高到100%(6个线程最多达到600%)。 因此多线程对程序性能是绝对提升的。

    2. 线程一般创建多少个, 需要绑定么: 一般没必要绑定, 除非你的任务是计算密集类型的。

    3. 线程池和连接池的区别: 线程池是争抢任务的, 但是连接池只是负责给你资源, 二者不一样, 线程池你还要给任务过去。

    基本内容

    1. 功能: 实现一个线程池, 提供并发任务。
    //h
    
    
    #pragma once
    
    #include 
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    using ThreadPoolTask = std::function<void()>;
    
    class ThreadPool {
     public:
      static ThreadPool* GetInstance();
      virtual void PostTask(ThreadPoolTask task, std::string name = "") = 0;
    };
    
    struct Task {
      ThreadPoolTask task;
      std::string name;
    };
    
    class TaskQueue : public std::list<Task> {
     public:
      TaskQueue();
      ~TaskQueue();
      void PostTask(struct Task task);
      void WaitTask();
      void TakeTask(struct Task& task);
      void Lock();
      void Unlock();
    
     private:
      std::mutex tasksMutex;  // 这个怎么初始化? 这不可能是声明,
                              // 哪有声明类但是不初始化的。
      sem_t taskSem;
    };
    class ThreadPoolImpl : public ThreadPool {
     public:
      static ThreadPoolImpl* GetInstance();
      virtual void PostTask(ThreadPoolTask task, std::string name) override;
    
     private:
      ThreadPoolImpl();
      ~ThreadPoolImpl();
      static inline ThreadPoolImpl* instance = nullptr;
      int interruptFd = -1;
      void WorkThreadMain(int id);
      TaskQueue runnable;
      std::vector<std::unique_ptr<std::thread>> workThreads;  // thread work
      std::unique_ptr<std::thread> futureThread = nullptr;    // fucntion of thread,
                                                              // stop args
      std::atomic_bool running = {true};
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    
    
    /*
     * Date:2022-09-01
     * Author: Tian yi
     * funtion:  ThreadPoolImpl
     * Description :  使用ThreadPoolImpl包含多个线程, 这些线程绑定了自己的成员函数WorkThreadMain, 在
     * 成员函数中检查任务队列的信号量, 然后每个线程创建一个任务去任务队列取任务, 然后执行。| 外部main函数通过
     * postTask函数放任务过来给函数对象functional(被vector存储了)。
     * Example:   
     * Reference:
     */
    
    #pragma once
    
    #include "thread_pool.h"
    
    #include 
    #include 
    #include 
    #include 
    
    #include 
    #include 
    #include 
    
    ThreadPool* ThreadPool::GetInstance() { return ThreadPoolImpl::GetInstance(); }
    
    TaskQueue::TaskQueue() { sem_init(&taskSem, false, 0); }
    
    TaskQueue::~TaskQueue() { sem_destroy(&taskSem); }
    
    void TaskQueue::PostTask(struct Task task) {
      {
        std::lock_guard<std::mutex> lock(tasksMutex);
        push_back(task);
      }
      sem_post(&taskSem);
    }
    
    void TaskQueue::WaitTask() { sem_wait(&taskSem); }
    
    void TaskQueue::TakeTask(struct Task& task) {
      {
        std::lock_guard<std::mutex> lock(tasksMutex);
        task = front();
        pop_front();
      }
    }
    void TaskQueue::Lock() { tasksMutex.lock(); }
    
    void TaskQueue::Unlock() { tasksMutex.unlock(); }
    
    ThreadPoolImpl* ThreadPoolImpl::GetInstance() {
      if (instance == nullptr) {
        static std::mutex mutex;
        std::lock_guard<std::mutex> lock(mutex);
        if (instance == nullptr) {
          instance = new ThreadPoolImpl();
        }
      }
      return instance;
    }
    
    void ThreadPoolImpl::PostTask(ThreadPoolTask task, std::string name) {
      runnable.PostTask({task, name});
    }
    
    ThreadPoolImpl::ThreadPoolImpl() {
      // 绑定过来
      int cpuNumber = 2;
      for (int i = 0; i < cpuNumber; i++) {
        auto func = std::bind(&ThreadPoolImpl::WorkThreadMain, this, i);
        workThreads.push_back(std::make_unique<std::thread>(func));
      }
    }
    
    ThreadPoolImpl::~ThreadPoolImpl() {
      running = false;
      for (auto& workThread : workThreads) {
        workThread->join();
      }
    }
    
    void ThreadPoolImpl::WorkThreadMain(int id) {
      while (true) {
        runnable.WaitTask();
        if (running == false) {
          break;
        }
        Task task;
        runnable.TakeTask(task);
    
        std::cout << "Work(" << id << "): " << task.name << " started" << std::endl;
        task.task();
        std::cout << "Work(" << id << "): " << task.name << " end" << std::endl;
      }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    
    //main
    #include 
    #include 
    
    #include "thread_pool.h"
    
    int main() {
      auto tp = ThreadPoolImpl::GetInstance();
    
      for (int i = 0; i < 1000; i++) {
        std::stringstream ss;
        ss << "inc" << i;
        tp->PostTask(
            []() {
              volatile int cnt = 0;
              for (int i = 0; i < 100000; i++) {
                cnt += i;
                if (i / 3 == 0) {
                  cnt++;
                }
              }
            },
            ss.str());
      }
      return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1. 线程池原理: 使用ThreadPoolImpl包含多个线程, 这些线程绑定了自己的成员函数WorkThreadMain, 在* 成员函数中检查任务队列的信号量, 然后每个线程创建一个任务去任务队列取任务, 然后执行。| 外部main函数通过 * postTask函数放任务过来给函数对象functional(被vector存储了)。
    2. 其实这部分线程池的实现有很多开源代码, 例如零声学院讲的是用c语言加linux file加函数指针实现的, 非常硬核。 tars线程池使用了异步线程future性能更优(参照https://zhuanlan.zhihu.com/p/412127997),完全不用等待,而本文只是使用了最简单的实时同步线程池, 而且使用了c++11封装了很多底层的linux硬核操作, 但是万变不离其宗, 整体思路都是差不多的, 后续我们会基于这个线程池讲解epoll框架服务器的实现, 其资源类加锁和封装资源和多线程处理的思想后面都会用到。
    3. 如果用c语言实现线程池, 就会发现各种指针满天飞。
    4. 一般很多任务都是固定的, 因此往任务池放的任务都是一样的函数地址, 但是参数不一样。如果任务不一样任务池就绑定其他的函数。

    工作级别内容

    工作认知

    1. 工作中我们基本用不到这个线程池组件, 这个部分基本都是封装到了我们使用的网络库或者中间件中, 用来处理别人的请求,转发到对应的任务中, 例如muduo库就是将任务转到了用户的函数, brpc就是通过多线程池将任务转发到了rpc任务中,web框架也是如此, 我们看任何网络相关的开源框架, 除了nginx是用多进程, 大部分涉及到大规模请求的中间件都会有线程池, 不过有了线程池要考虑锁的消耗了, 因此也有中间件例如redis用单线程去跑。

    自己踩过的坑

    1. 原子变量的定义方式不对 | 成员类声明就是默认初始化 |

    内存池

    总体认知

    1. 为什么要用内存池: 对于一个程序,我们能够管理的就是堆, 也就是new这些。 对于每个用户的请求数据, 我们不可能在栈上创建, 因为可能还会有其他线程的函数去访问,因此需要堆数据, 让其他线程也能够访问到这个数据, 不然频繁的创建保存用户数据其他执行耗时任务的线程拿不到用户的数据。但是这种分配堆空间容易产生内容碎片,如果运行了几个月之后出现一次coredump(因为碎片太多, 无法分分配到空间), 基本上就是内存问题, 很难复现当时的问题, 之能重启。 因此要用内存池去管理这些。

    2. 内存分配时候要注意什么: 一定要用置空, 免得拿到了数据别人析构时候没有析构干净,你拿到了的空间有一些脏数据。

    3. 使用内存池要注意什么, 都有哪些有名的线程池,怎么使用: 直接用开源的内存池, 这种关键组件别自己造; 一般用google的tmalloc, 直接使用一个宏定义是让new指定到这个代码中的new,delete就行。其实代码完全不用改。

    4. 内存池的原理:通过链表管理这些地址, 我们使用4k,16k。。。。128k等级别的分配内存池, 每个级别都有自己的链表绑定已经构造的内存, 如果不用了就置为0。 但是需要注意小内存的链表释放很难, 因为会分布到各个位置, 最好就是弄个大块内存切片分小内存。 如果要手动实现需要牵扯很多代码块。

    5. 为什么可以设计一个面向连接的内存池:对于tmalloc这种一开始就创建好的内存池,对于小块的内存释放也是非常难操作, 我们完全可以设计一个用户socket级别的内存池,每次一个用户来搞个内存池, 用完释放, 就可以不用产生内存碎片了。

    6. 内存池要考虑线程安全: 需要啊, 你创建内存的函数智能有一个线程进来。 因此线程池类内部有锁(绑定了资源)。

    基本内容

    这部分太难了, 基本就是实现了一个面向用户连接的内存池,用户断开请求就释放了, 不会产生内存碎片,都是大的内存, 对小内存管理的非常好。

    关于全局的内存池tmalloc如何使用:https://juejin.cn/post/6979067456803307551 。 好像如果你的内存没有使用libpthread.so就会出现一些问题。

    工作级别内容

    工作认知

    1. 工作中我们基本用不到这个线程池组件, 这个部分基本都是封装到了我们使用的网络库或者中间件中, 用来处理别人的请求,转发到对应的任务中, 例如muduo库就是将任务转到了用户的函数, brpc就是通过多线程池将任务转发到了rpc任务中,web框架也是如此, 我们看任何网络相关的开源框架, 除了nginx是用多进程, 大部分涉及到大规模请求的中间件都会有线程池, 不过有了线程池要考虑锁的消耗了, 因此也有中间件例如redis用单线程去跑。

    自己踩过的坑

    1. 原子变量的定义方式不对 | 成员类声明就是默认初始化 |

    异步请求池

    总体认知

    1. 为什么请求需要异步: 因为对于一个请求我们都是串行的执行, 对于很多耗时的任务完全可以做个线程通知, 例如我们买东西的快递, 同步阻塞就是啥也不干一直等, 同步非阻塞就是先去干其他的, 快递来了你看看自己当下要不要去取。 异步非阻塞线程就是你开个线程去监听fd如果有相应就把快递自动放到你家中, 用协程就是速度更快。

    2. 同步, 异步, 阻塞,线程, 协程的区别与联系:

    • 其实讲解可以参照这个https://blog.51cto.com/u_4139404/3544871;https://zhuanlan.zhihu.com/p/113089074,
      https://www.zhihu.com/question/449741417/answer/1784208279。 我现在主要的理解就是异步是一种编程方式, 就是你写代码时候把需要长时间等待的部分单独开个线程或者协程用来处理,自己的主线程去做其他的事情, 或者你可以用epoll来做异步。 例如在reactor中主线程是同步非阻塞的将有fd相应的数据放到任务队列, 而proactor是异步的去做放的工作, 主线程一直快速的进行epoll读取, 读取出来的内容做存放用异步来做。
    1. 异步请求线程池怎么做呢 : 对于创建一个epoll,将请求的fd都加到epoll, 开启一个线程while去判断epoll是否有相应, 如果有的话取出fd进行数据读取, 读完关闭这个线程, 算是异步结束。
    2. 异步的内容如果主线程需要用到怎么办, 什么时候需要考虑异步: 这种你就不要用异步的, 既然用到, 肯定要同步主线程等待, 异步任务最好就是他执行的任务和你主线程解耦了, 如果不解耦, 数据内容你要用到的话, 坑定就不能用异步了, 最多用多线程去做, 而且你还要去等待才行。 因此我们说异步虽然强大, 但是也有其需求的场景。 例如go语言用到异步协程去处理每个用户的请求就非常的方便, 其每个用户都是有自己数据地方, 而且主线程没必要去管协程的结果,我主线程只负责fd。在实际场景中其实很多我们业务代码需要等待多个第三方数据的部分就可以用异步, 开启多线程去执行, 然后主线程等待数据, 当然你如果用futrue操作发送过来的数据更为方便, 没有用直接开多线程也挺好的。 如果你的第三方任务都是web请求的话, 那就可以额外开一个线程搞成epoll方式的, 速度也很快,和你直接开多个线程异步拿差不多。
    3. C++中promise和futrue这些底层是什么, 它是怎么支持异步的: 主要参照这个https://cloud.tencent.com/developer/article/1584075吧, 其实底层就是用thread实现的, 但是thread这种异步的方式想要传送数据不是很方便, 而且是同步开启异步执行的, 因此有这些异步编程的接口, 方便多线程的数据传过来。 *(注意then是比较常用的, 在异步编程中的责任链。)

    基本内容

    
    
    
    #include 
    #include 
    #include 
    #include 
    
    #include 
    #include 
    
    #include 
    #include 
    #include 
    
    #include 
    #include 
    #include 
    
    #include 
    
    
    #define DNS_SVR				"114.114.114.114"
    
    
    #define DNS_HOST			0x01
    #define DNS_CNAME			0x05
    
    
    
    struct dns_header {
    	unsigned short id;
    	unsigned short flags;
    	unsigned short qdcount;
    	unsigned short ancount;
    	unsigned short nscount;
    	unsigned short arcount;
    };
    
    struct dns_question {
    	int length;
    	unsigned short qtype;
    	unsigned short qclass;
    	char *qname;
    };
    
    struct dns_item {
    	char *domain;
    	char *ip;
    };
    
    
    
    
    int dns_client_commit(const char *domain) {
    
    	int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    	if (sockfd < 0) {
    		perror("create socket failed\n");
    		exit(-1);
    	}
    
    	printf("url:%s\n", domain);
    
    	struct sockaddr_in dest;
    	bzero(&dest, sizeof(dest));
    	dest.sin_family = AF_INET;
    	dest.sin_port = htons(53);
    	dest.sin_addr.s_addr = inet_addr(DNS_SVR);
    	
    	int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
    	printf("connect :%d\n", ret);
    
    	struct dns_header header = {0};
    	dns_create_header(&header);
    
    	struct dns_question question = {0};
    	dns_create_question(&question, domain);
    
    	char request[1024] = {0};
    	int req_len = dns_build_request(&header, &question, request);
    	int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));
    
    	char buffer[1024] = {0};
    	struct sockaddr_in addr;
    	size_t addr_len = sizeof(struct sockaddr_in);
    		
    	int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
    		
    	printf("recvfrom n : %d\n", n);
    	struct dns_item *domains = NULL;
    	dns_parse_response(buffer, &domains);
    
    	return 0;
    }
    
    char *domain[] = {
    	"www.ntytcp.com",
    	"bojing.wang",
    	"www.baidu.com",
    	"tieba.baidu.com",
    	"news.baidu.com",
    	"zhidao.baidu.com",
    	"music.baidu.com",
    	"image.baidu.com",
    	"v.baidu.com",
    	"map.baidu.com",
    	"baijiahao.baidu.com",
    	"xueshu.baidu.com",
    	"cloud.baidu.com",
    	"www.163.com",
    	"open.163.com",
    	"auto.163.com",
    	"gov.163.com",
    	"money.163.com",
    	"sports.163.com",
    	"tech.163.com",
    	"edu.163.com",
    	"www.taobao.com",
    	"q.taobao.com",
    	"sf.taobao.com",
    	"yun.taobao.com",
    	"baoxian.taobao.com",
    	"www.tmall.com",
    	"suning.tmall.com",
    	"www.tencent.com",
    	"www.qq.com",
    	"www.aliyun.com",
    	"www.ctrip.com",
    	"hotels.ctrip.com",
    	"hotels.ctrip.com",
    	"vacations.ctrip.com",
    	"flights.ctrip.com",
    	"trains.ctrip.com",
    	"bus.ctrip.com",
    	"car.ctrip.com",
    	"piao.ctrip.com",
    	"tuan.ctrip.com",
    	"you.ctrip.com",
    	"g.ctrip.com",
    	"lipin.ctrip.com",
    	"ct.ctrip.com"
    };
    
    struct async_context {
    
    	int epfd;
    	pthread_t thid;
    
    };
    
    
    typedef void (*async_result_cb)(void *arg, int count);
    
    
    struct epoll_arg {
    	async_result_cb cb;
    	int fd;
    };
    
    #define ASYNC_CLIENT_NUM	1024
    
    void dns_async_free_domain(struct dns_item *domains, int count) {
    
    	int i = 0;
    	for (i = 0;i < count;i ++) {
    		free(domains[i].domain);
    		free(domains[i].ip);
    	}
    	free(domains);
    }
    
    
    // 
    void *dns_async_callback(void *arg) {
    
    	/*
    	 * while (1) {
     	 *    epoll_wait();
     	 *    recv();
     	 *      parser();
     	 *      fd --> epoll delete
    	 * }
    	 */
    
    	struct async_context *ctx = (struct async_context *)arg;
    	
     	while (1) {
    
    		struct epoll_event events[ASYNC_CLIENT_NUM] = {0};
    		
    		// yield --> label --> resume
    		int nready = epoll_wait(ctx->epfd, events, ASYNC_CLIENT_NUM, 0);
    		if (nready < 0) continue;
    
    		int i = 0;
    		for (i = 0;i < nready;i ++) {
    
    			struct epoll_arg *data = events[i].data.ptr;
    			int sockfd = data->fd;
    
    			char buffer[1024] = {0};
    			struct sockaddr_in addr;
    			size_t addr_len = sizeof(struct sockaddr_in);
    				
    			int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
    				
    			printf("recvfrom n : %d\n", n);
    			// 为什么要定义成为结构体呢, 因为返回的数据可能有很多类型, 每种类型的处理不一样。 
    			struct dns_item *domains = NULL;
    			int count = dns_parse_response(buffer, &domains);
    
    			data->cb(domains, count);
    
    			//
    			epoll_ctl(ctx->epfd, EPOLL_CTL_DEL, sockfd, NULL);
    
    			close(sockfd);
    			
    
    			dns_async_free_domain(domains);
    			free(data);
    
    		}
    
    	}
    
    
    }
    
    
    int  dns_async_context_init(struct async_context  *ctx) {
    
    	if (ctx == NULL) return -1;
    
    	//1 epoll_create
    
    	int epfd = epoll_create(1);
    	if (epfd < 0) return -1;
    
    	ctx->epfd = epfd;
    	
    
    	//1 pthread_create
    
    	int ret = pthread_create(&ctx->thid, NULL, dns_async_callback, ctx);
    	if (ret) {
    		close(epfd);
    		return -1;
    	}
    
    	return 0;
    }
    
    
    int dns_async_context_destroy() {
    
    	//1 close(epfd)
    	
    
    	//1 pthread_cancel(thid)
    	
    
    	free();
    }
    
    
    //  多次提交过去
    int dns_async_client_commit(struct async_context *ctx, async_result_cb cb) {
    	//1 socket
    	int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    	if (sockfd < 0) {
    		perror("create socket failed\n");
    		exit(-1);
    	}
    
    	printf("url:%s\n", domain);
    
    	struct sockaddr_in dest;
    	bzero(&dest, sizeof(dest));
    	dest.sin_family = AF_INET;
    	dest.sin_port = htons(53);
    	dest.sin_addr.s_addr = inet_addr(DNS_SVR);
    	//1 connect server
    	
    	int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
    	printf("connect :%d\n", ret);
    
    	//1 encode protocol
    	struct dns_header header = {0};
    	dns_create_header(&header);
    
    	struct dns_question question = {0};
    	dns_create_question(&question, domain);
    
    	char request[1024] = {0};
    	int req_len = dns_build_request(&header, &question, request);
    
    	//1 send , 发送过去绑定epoll, 有响应epoll就会搞过来
    	int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));
    #if 0
    	char buffer[1024] = {0};
    	struct sockaddr_in addr;
    	size_t addr_len = sizeof(struct sockaddr_in);
    		
    	int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
    		
    	printf("recvfrom n : %d\n", n);
    	struct dns_item *domains = NULL;
    	dns_parse_response(buffer, &domains);
    #else
    
    	struct epoll_arg  *eparg = (struct epoll_arg  *)calloc(1, sizeof(struct epoll_arg));
    	if (eparg == NULL) return -1;
    
    	eparg->fd = sockfd;
    	eparg->cb = cb;
    		
    
    	struct epoll_event ev;
    	ev.data.ptr = eparg;
    	ev.events = EPOLLIN;
    	epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, sockfd, &ev);
    
    #endif
    	return 0;
    }
    
    
    
    
    int main(int argc, char *argv[]) {
    
    	int count = sizeof(domain) / sizeof(domain[0]);
    	int i = 0;
    #if 0
    	for (i = 0;i < count;i ++) {
    		dns_client_commit(domain[i]);
    	}
    #else
    
    	for (i = 0;i < 50;i ++) {
    		dns_async_client_commit(ctx, domain[i]);
    		yield();
    	}
    
    
    #endif
    
    	getchar();
    
    #if 0
    	struct async_context  * ctx = dns_async_context_init();
    	dns_async_context_destroy(ctx);
    	
    	
    	struct async_context  * ctx = malloc();
    	dns_async_context_init(ctx);
    
    	dns_async_context_destroy(ctx);
    
    	free(ctx);
    #endif
    
    }
    
    
    
    
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    1. 我们可以大致梳理一下上面的代码, 主要就是dns_async_context_init先创建一个上下文,开启一个线程去读取上下文中的fd,。主线程dns_client_commit去创建一个epoll绑定fd并且各种打包发送dns请求。 然后从线程就从上下文通信的结构体重拿到epoll的fd在循环遍历的读取内容, 将内容放到对应的结构体中。

    2. 这个代码和协程有什么联系: 上面的代码如果改造成为协程方式, 就是将每次commit和epollwait搞到一起, 通过协程(底层yiled原理, 跳转到一个函数中运行完再结束)跳转到callback里面的wait去执行, 这就相当于每次提交都要wait, 而我们现在的代码是一个线程提交一个线程处理,这个处理还要单独调用一个线程, 相比较协程的函数栈调用浪费的资源有点多。

    3. 如果你只写到一个main函数处理wait会有什么问题: 首先速度比较慢, 此外你的处理是线性的, 你这样就做不了其他任务了, 非常不解耦。 最后现实业务中别人提交的地方是不确定的, 可能发生在任意的地方, 因此对于这种随机的任务必须开单线程去处理, 不管是别人的请求还是本地的修改。

    4. 异步为什么需要上下文: 对于异步之间的请求内容同步, 一般都是需要上下文的, 如果上下文有读写冲突记得还要加锁。 上下文就是主线程想要给从线程传递信息用的, 而从线程一般会接收这个信息用来做自己的工作。

    5. 异步池和线程池的区别: 异步池是解耦你代码中需要等待的业务而且出现的位置非常随机, 线程池是解耦你创建线程的消耗。 异步请求池中的线程可以来自线程池。 而线程池一般和任务队列一起用, 就是你将各种任务放到一个任务队列, 线程池排队去执行, 其实这也可以叫做异步处理池, 但是由于执行的任务可能会不一样, 而且有排队按照顺序处理的规定, 所以叫了一个消息队列。

    工作级别内容

    工作认知

    1. 工作中的项目基本上都会用到异步。 除了哪些并发web请求的地方需要用到异步线程池, 在业务内其实完全可以用异步请求池(没有web那么大的并发量但是需要等待时候过长, 需要等待)去做, 传入上下文过去给这个异步线程池处理,处理完我的上下文数据有所更新自己也能用。 例如我们处理一堆业务时候就可以异步的将上下文传入进去,开线程去并行执行,本地等待并行处理完或者只处理一些简单的。

    自己踩过的坑

    1. 暂无

    资源连接池

    总体认知

    1. 什么是连接池, 和线程池有什么区别: 一些redis或者mysql的连接, 完全没必要每次请求都要创建连接请求, 完全可以搞个池子, 类似线程池, 每个任务完全没不要每次创建线程去执行, 同样的思想, 一个管理的是多个线程, 一个管理的是多个连接的fd。 线程池中的任务需要去连接池中拿到资源的连接。 连接池是被动的, 线程池是主动的执行任务。
    2. 不同连接池有什么区别: 基本没啥区别, 就是换一下fd的类型, 就是先安装客户端的包, 引用头文件绑定其内容。
    3. 连接池

    基本内容

    1. 下面我们将构建一个连接池去连接mysql的。
    
    //mysql.conf
    
    # 数据库连接池配置文件
    
    # ip地址、端口号、用户名、密码
    ip=172.168.1.2
    port=3306
    user=zhangsan
    password=123456
    
    # 连接池初始连接量
    init_size=10
    
    # 连接池最大连接量
    max_size=1024
    
    # 最大空闲时间默认单位(s)
    max_freeTime=60
    
    # 最大连接超时时间(ms)
    connect_timeout=100
    
    dbname=student
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    
    //connect.h
    
    #pragma once
    
    #include 
    
    #include 
    #include 
    using namespace std;
    /*
     * 实现MySQL数据库操作
     */
    class Connect {
     public:
      Connect();
      ~Connect();
    
      //连接数据库
      bool connect(string ip, unsigned short port, string user, string password,
                   string dbname);
    
      //更新操作
      bool update(string sql);
    
      //查询操作
      MYSQL_RES *query(string sql);
    
      //刷新连接时间
      void refresh_aliveTime() { alive_time_ = clock(); }
    
      //返回存活的时间
      clock_t get_aliveTime() { return alive_time_; }
    
     private:
      MYSQL *conn_;  //表示和mysql的连接
    
      clock_t alive_time_;  //记录进入空闲状态后的存活时间
    };
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    
    //connent.cpp
    
    #pragma once
    
    #include 
    
    #include 
    #include 
    using namespace std;
    /*
     * 实现MySQL数据库操作
     */
    class Connect {
     public:
      Connect();
      ~Connect();
    
      //连接数据库
      bool connect(string ip, unsigned short port, string user, string password,
                   string dbname);
    
      //更新操作
      bool update(string sql);
    
      //查询操作
      MYSQL_RES *query(string sql);
    
      //刷新连接时间
      void refresh_aliveTime() { alive_time_ = clock(); }
    
      //返回存活的时间
      clock_t get_aliveTime() { return alive_time_; }
    
     private:
      MYSQL *conn_;  //表示和mysql的连接
    
      clock_t alive_time_;  //记录进入空闲状态后的存活时间
    };
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    
    //connect_pool.h
    
    
    #pragma once
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    #include "connect.hpp"
    
    using namespace std;
    
    //单例模式
    class Connect_pool {
     public:
      static Connect_pool *get_instance();
    
      //给外部提供的接口,返回可用的空闲连接
      shared_ptr<Connect> get_connect();
    
     private:
      Connect_pool();
    
      //加载配置文件
      bool load_configure();
    
      // 运行在独立的线程,专门负责生产新的连接
      void produce_connection();
    
      //释放空闲连接
      void scan_connection();
    
     private:
      string ip_;
      unsigned short port_;
      string user_;
      string password_;
      string dbname_;
    
      int init_size_;           //连接池初始连接量
      int max_size_;            //连接的最大连接量
      int max_freeTime;         //连接池的最大空闲时间
      int connection_timeout_;  //连接池获取连接的超时时间
    
      queue<Connect *> connect_queue_;  //储存mysql连接的队列
      mutex queue_mutex_;               //维护线程安全
      atomic_int connect_cnt_;  //记录连接所创建的connect连接的总数量
    
      condition_variable condition_;  //设置条件变量,负责生产线程的唤醒和休眠
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    
    //connect_pool.cpp
    
    #include "connect_pool.h"
    
    #include "public.h"
    
    ConnectPool *ConnectPool::get_instance() {
      static ConnectPool pool;
      return &pool;
    }
    
    shared_ptr<Connect> ConnectPool ::get_connect() {
      unique_lock<mutex> lock(queue_mutex_);
      while (connect_queue.empty()) {
        if (cv_status::timeout ==
            condition_.wait_for(lock, chrono::milliseconds(connection_timeout_))) {
          if (coonect_queue_.empty()) {
            LOG("get  connection  timeout...");
          }
        }
      }
      // 这个共享指针拿到得值如果不用了, 重写删除器, 不能让它走connect的析构,
      // 直接插入到队列中。
      shared_ptr<Connect> ret(connect_queue.front(), [&](Connect *ptr) {
        unique_lock<mutex> lock(queue_mutex_);
        connect_queue_.push(ptr);
      });
      connect_queue_.pop();
      if (connect_queue_.empty()) {
        condition_.notify_all;
      }
      return ret;
    }
    
    ConnectPool::ConnectPool() {
      if (!load_configure()) {
        LOG("confgiure file error");
        return;
      }
      for (int i = 0; i < init_size_; i++) {
        Connect *p = new Connect();
        if (p->connect(ip_, port_, user_, password_, dbname_) == false) {
          LOG("connection error!");
        }
        connect_queue_.push(p);
        connect_cont_++;
      }
      thread connect_producer(bind(&ConnectPool::produce_connection, this));
      connect_producer.detach();
    
      thread scaner(bind(&Connect_pool::scan_connection), this);
      saner.detach();
    }
    
    bool ConnectPool::load_configure() {
      FILE *fp = fopen("/mysql.conf", "r");
      if (fp == nullptr) {
        LOG("mysql conf file open error");
        return false;
      }
      while (!feof(fp)) {
        char line[2014] = {0};
        fgets(line, 2014, fp);
        string str = line;
        int index = str.find('=', 0);
        if (index == -1) {
          continue;
        }
        int endidx = str.find("\n", index);
        string key = str.substr(0, index);
        string value = str.substr(index + 1, endidx - index - 1);
        if (key == "ip") {
          ip_ = value;
        } else if (key == "port") {
          port_ = atoi(value.c_str());
        } else if (key == "user") {
          user_ = value;
        } else if (key == "password") {
          password_ = value;
        } else if (key == init_size) {
          init_size_ atoi(value.c_str());
        } else if (key == "max_freeTime") {
          max_freeTime = atoi(value.c_str());
        } else if (key == "connect_timeout") {
          connection_timeout_ = atoi(value.c_str());
        } else if (key == "dbname") {
          dbname_ = value;
        } else {
          LOG("this key is not in data");
        }
        return true;
      }
      // 空了代表init不够用了, 就新建。
      void ConnectPool::produce_connection() {
        while (1) {
          unique_lock<mutex> lock(queue_mutex_);
          while (!connect_queue_.empty()) {
            connection_wait(lock);
          }
          if (connect_cnt_ < max_size_) {
            Connect *p = new Connect();
            p->connect(ip_, port_, usr_, password_ dbname_);
            connect_queue_.push(p);
            connect_cnt_++;
          }
          condition_.notify_all();
        }
      }
      // 定时看看你新建的连接用不用啊, 不用我就删掉了
      void ConnectPool::scan_connection() {
        while (1) {
          this_thread::sleep_for(chrono::seconds(max_freeTime));
    
          unique_lock(mutex) lock(queue_mutex_);
          while (connect_cnt_ > init_size_) {
            Connect *p = connect_queue_.front();
            if (p->get_aliveTime() > max_freeTime * 1000) {
              connect_queue_.pop();
              delete p;
              connect_cnt--;
            }
          }
        }
      }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    1. 上面的代码就是连接池,

    工作级别内容

    工作认知

    1. 工作

    自己踩过的坑

    1. 暂无

    开源中间件

    日志

    基本内容

    基本认知

    1. 为什么需要日志:
    • 对于线上的服务, 我们很难排查出来问题,出现问题难以复现。 因此 需要日志, 打日志的意识你可以现在感觉不到, 但是如果你线上问题找不出来, 就非常头疼了。
    1. 常用的日志都有什么功能
    • 设置日志等级: 开启deubg级别, 对于线上的日志, 不要开启debug日志的打印, 就显示一些关键数据打印日志, 当我们自己测试找问题的时候就开启debug日志, 这样可以减轻线上的日志打印压力。
    • 能够设置前面的参数, 例如日期这些。

    log4cpp

    1. 缓存区写入日志和内核区写入日志的区别?有什么问题么?
      在这里插入图片描述

    可以看到当你的缓存区比较大的时候,用缓存区写入的更多, 但是当你缓存区小时候,直接操作内核write写入性能更优。 这些同步的写入速度太慢了, 很多日志库支持异步的。

    log4cpp的基础概念
    在这里插入图片描述

    • 对于一个信息内容是message, 我们设置好它的目录 ,然后设置好输出的append, 第三个是append设置好layout。 这里重点说一个category是可以自动配置的, 我们可以搞成树状, 在实际项目中我们每个模块都有自己的category, 自己的category只打印自己的append和layout, 但是如果你被开启了继承, 就可以将多个模块的日志全部打印到root目录下, 统一管理, 但是每个模块只打印自己的模块日志。
    1. google日志怎么用呢?配置文件怎么看?
    • 大家一般都是用配置文件用的, 但是用main去配置也是类似, 不过不能热更新加载, 不方便。
    • 在这里插入图片描述
      -上图就是配置每个模块的日志格式和输出方式, 并且配置好了继承关系。
    1. 一般日志要求一秒打印多少?异步和同步差别有多大?
    • 一般是一秒一百万。
      在这里插入图片描述
    • 上图下面是异步的方式, 可有看出区别非常大, 异步可以写100倍的同步日志。 同步一般只用在客户端里面。
    1. gdb怎么调试
    • 我们使用gdb进入到log动态库里面看看怎么运行的, 首先设置到main函数断点, 然后设置 b Category:callAppenders进入到调用appenders函数中(注意这一步需要进入到动态库,用y),然后看看堆栈调用了哪些,然后一步步分析代码。
    1. log4cpp提供了异步吗?
    • 好像没有,需要自己实现。 但是有一个队列, 需要你自己搞个额外的线程去写入, 你一边开线程落地, 人家往队列中放, 可以看到直接就是快百万了。
      在这里插入图片描述

    muduo库支持异步

    1. 可以看到mudo库是log4cpp的两倍性能。

    在这里插入图片描述

    1. muduo日志库的思想: 主要就是对于日志弄多个缓存buf, 开启落地线程每次读取一个buf落地
      (move级别的),写入完全不受影响, 拿空的buf去写, 写入多的时候还会开启多个buf, 这样的多buf不用等待和单线程保证落地日志有顺序的落地,并且提供了切片功能和宏定义调用。

    剩下的参照:https://www.coonote.com/note/log4cpp-log-design-scheme.html, 我们后续需要再补充。

    工作级别内容

    工作认知

    1. 工作中日志是我们经常要用到的, 对于线上的任务, 如果没有日志, 我们根本找不到当时的输入数据是啥, 即使有coredump问题, 输入不知道也是很难排查问题, 而且无法监控到每一步的执行环节。 因此日志一定要打全, 不然你排查问题会想死。 此外日志会有等级, 我们线上的日志一般默认是INFO, 不开启debug, 但是我们自己调试的时候, 可以把一些调试级别日志打到日常执行的流程中, 看看问题, 线上的服务也可以打, 反正不执行影响不了性能。但是一般都是在出现问题的地方去打这些日志的。
    2. 很多公司成熟的日志都是通过宏定义封装了好几层,用的时候很好用, 接收了很多控制开关, 这点需要注意。

    自己踩过的坑

    1. 日志一定要打全。

    网络库

    基本内容

    基本认知

    1. 为什么需要网络库:

    muduo

    libenvent

    工作级别内容

    工作认知

    自己踩过的坑

    1. 日志一定要打全。
  • 相关阅读:
    电子行业MES管理系统有哪些特殊功能
    mybatis中mapper.xml热加载
    zemax埃尔弗目镜
    SSM框架-Spring(三)
    ui属性系统(1)VSProperty
    Spring Boot 2(一):【重磅】Spring Boot 2.0权威发布
    VIM指令
    Android上传私有插件到私有MAVEN-PUBLISH
    JavaEE初阶:网络编程套接字
    web前端期末大作业 HTML+CSS+JavaScript仿唯品会购物商城网页设计实例 企业网站制作
  • 原文地址:https://blog.csdn.net/liupeng19970119/article/details/126151184