为什么要用一个线程池: 减少线程的创建, 异步的执行任务更解耦, 例如日志需要磁盘落地, 通过异步方式让日志给线程池, 大大的提升性能, 对于一些耗时的任务用线程池去做, 主线程关于主要任务。 此外我们的cpu现在都是多个核的, 一个线程在一个核中运行, 你如果是单线程, 一个核运行, 其他的核都是等待状态, cpu性能完全没有榨干, 最高到100%(6个线程最多达到600%)。 因此多线程对程序性能是绝对提升的。
线程一般创建多少个, 需要绑定么: 一般没必要绑定, 除非你的任务是计算密集类型的。
线程池和连接池的区别: 线程池是争抢任务的, 但是连接池只是负责给你资源, 二者不一样, 线程池你还要给任务过去。
//h
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using ThreadPoolTask = std::function<void()>;
class ThreadPool {
public:
static ThreadPool* GetInstance();
virtual void PostTask(ThreadPoolTask task, std::string name = "") = 0;
};
struct Task {
ThreadPoolTask task;
std::string name;
};
class TaskQueue : public std::list<Task> {
public:
TaskQueue();
~TaskQueue();
void PostTask(struct Task task);
void WaitTask();
void TakeTask(struct Task& task);
void Lock();
void Unlock();
private:
std::mutex tasksMutex; // 这个怎么初始化? 这不可能是声明,
// 哪有声明类但是不初始化的。
sem_t taskSem;
};
class ThreadPoolImpl : public ThreadPool {
public:
static ThreadPoolImpl* GetInstance();
virtual void PostTask(ThreadPoolTask task, std::string name) override;
private:
ThreadPoolImpl();
~ThreadPoolImpl();
static inline ThreadPoolImpl* instance = nullptr;
int interruptFd = -1;
void WorkThreadMain(int id);
TaskQueue runnable;
std::vector<std::unique_ptr<std::thread>> workThreads; // thread work
std::unique_ptr<std::thread> futureThread = nullptr; // fucntion of thread,
// stop args
std::atomic_bool running = {true};
};
/*
* Date:2022-09-01
* Author: Tian yi
* funtion: ThreadPoolImpl
* Description : 使用ThreadPoolImpl包含多个线程, 这些线程绑定了自己的成员函数WorkThreadMain, 在
* 成员函数中检查任务队列的信号量, 然后每个线程创建一个任务去任务队列取任务, 然后执行。| 外部main函数通过
* postTask函数放任务过来给函数对象functional(被vector存储了)。
* Example:
* Reference:
*/
#pragma once
#include "thread_pool.h"
#include
#include
#include
#include
#include
#include
#include
ThreadPool* ThreadPool::GetInstance() { return ThreadPoolImpl::GetInstance(); }
TaskQueue::TaskQueue() { sem_init(&taskSem, false, 0); }
TaskQueue::~TaskQueue() { sem_destroy(&taskSem); }
void TaskQueue::PostTask(struct Task task) {
{
std::lock_guard<std::mutex> lock(tasksMutex);
push_back(task);
}
sem_post(&taskSem);
}
void TaskQueue::WaitTask() { sem_wait(&taskSem); }
void TaskQueue::TakeTask(struct Task& task) {
{
std::lock_guard<std::mutex> lock(tasksMutex);
task = front();
pop_front();
}
}
void TaskQueue::Lock() { tasksMutex.lock(); }
void TaskQueue::Unlock() { tasksMutex.unlock(); }
ThreadPoolImpl* ThreadPoolImpl::GetInstance() {
if (instance == nullptr) {
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
if (instance == nullptr) {
instance = new ThreadPoolImpl();
}
}
return instance;
}
void ThreadPoolImpl::PostTask(ThreadPoolTask task, std::string name) {
runnable.PostTask({task, name});
}
ThreadPoolImpl::ThreadPoolImpl() {
// 绑定过来
int cpuNumber = 2;
for (int i = 0; i < cpuNumber; i++) {
auto func = std::bind(&ThreadPoolImpl::WorkThreadMain, this, i);
workThreads.push_back(std::make_unique<std::thread>(func));
}
}
ThreadPoolImpl::~ThreadPoolImpl() {
running = false;
for (auto& workThread : workThreads) {
workThread->join();
}
}
void ThreadPoolImpl::WorkThreadMain(int id) {
while (true) {
runnable.WaitTask();
if (running == false) {
break;
}
Task task;
runnable.TakeTask(task);
std::cout << "Work(" << id << "): " << task.name << " started" << std::endl;
task.task();
std::cout << "Work(" << id << "): " << task.name << " end" << std::endl;
}
}
//main
#include
#include
#include "thread_pool.h"
int main() {
auto tp = ThreadPoolImpl::GetInstance();
for (int i = 0; i < 1000; i++) {
std::stringstream ss;
ss << "inc" << i;
tp->PostTask(
[]() {
volatile int cnt = 0;
for (int i = 0; i < 100000; i++) {
cnt += i;
if (i / 3 == 0) {
cnt++;
}
}
},
ss.str());
}
return 0;
}
为什么要用内存池: 对于一个程序,我们能够管理的就是堆, 也就是new这些。 对于每个用户的请求数据, 我们不可能在栈上创建, 因为可能还会有其他线程的函数去访问,因此需要堆数据, 让其他线程也能够访问到这个数据, 不然频繁的创建保存用户数据其他执行耗时任务的线程拿不到用户的数据。但是这种分配堆空间容易产生内容碎片,如果运行了几个月之后出现一次coredump(因为碎片太多, 无法分分配到空间), 基本上就是内存问题, 很难复现当时的问题, 之能重启。 因此要用内存池去管理这些。
内存分配时候要注意什么: 一定要用置空, 免得拿到了数据别人析构时候没有析构干净,你拿到了的空间有一些脏数据。
使用内存池要注意什么, 都有哪些有名的线程池,怎么使用: 直接用开源的内存池, 这种关键组件别自己造; 一般用google的tmalloc, 直接使用一个宏定义是让new指定到这个代码中的new,delete就行。其实代码完全不用改。
内存池的原理:通过链表管理这些地址, 我们使用4k,16k。。。。128k等级别的分配内存池, 每个级别都有自己的链表绑定已经构造的内存, 如果不用了就置为0。 但是需要注意小内存的链表释放很难, 因为会分布到各个位置, 最好就是弄个大块内存切片分小内存。 如果要手动实现需要牵扯很多代码块。
为什么可以设计一个面向连接的内存池:对于tmalloc这种一开始就创建好的内存池,对于小块的内存释放也是非常难操作, 我们完全可以设计一个用户socket级别的内存池,每次一个用户来搞个内存池, 用完释放, 就可以不用产生内存碎片了。
内存池要考虑线程安全: 需要啊, 你创建内存的函数智能有一个线程进来。 因此线程池类内部有锁(绑定了资源)。
这部分太难了, 基本就是实现了一个面向用户连接的内存池,用户断开请求就释放了, 不会产生内存碎片,都是大的内存, 对小内存管理的非常好。
关于全局的内存池tmalloc如何使用:https://juejin.cn/post/6979067456803307551 。 好像如果你的内存没有使用libpthread.so就会出现一些问题。
为什么请求需要异步: 因为对于一个请求我们都是串行的执行, 对于很多耗时的任务完全可以做个线程通知, 例如我们买东西的快递, 同步阻塞就是啥也不干一直等, 同步非阻塞就是先去干其他的, 快递来了你看看自己当下要不要去取。 异步非阻塞线程就是你开个线程去监听fd如果有相应就把快递自动放到你家中, 用协程就是速度更快。
同步, 异步, 阻塞,线程, 协程的区别与联系:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define DNS_SVR "114.114.114.114"
#define DNS_HOST 0x01
#define DNS_CNAME 0x05
struct dns_header {
unsigned short id;
unsigned short flags;
unsigned short qdcount;
unsigned short ancount;
unsigned short nscount;
unsigned short arcount;
};
struct dns_question {
int length;
unsigned short qtype;
unsigned short qclass;
char *qname;
};
struct dns_item {
char *domain;
char *ip;
};
int dns_client_commit(const char *domain) {
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("create socket failed\n");
exit(-1);
}
printf("url:%s\n", domain);
struct sockaddr_in dest;
bzero(&dest, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(53);
dest.sin_addr.s_addr = inet_addr(DNS_SVR);
int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
printf("connect :%d\n", ret);
struct dns_header header = {0};
dns_create_header(&header);
struct dns_question question = {0};
dns_create_question(&question, domain);
char request[1024] = {0};
int req_len = dns_build_request(&header, &question, request);
int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));
char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);
int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
printf("recvfrom n : %d\n", n);
struct dns_item *domains = NULL;
dns_parse_response(buffer, &domains);
return 0;
}
char *domain[] = {
"www.ntytcp.com",
"bojing.wang",
"www.baidu.com",
"tieba.baidu.com",
"news.baidu.com",
"zhidao.baidu.com",
"music.baidu.com",
"image.baidu.com",
"v.baidu.com",
"map.baidu.com",
"baijiahao.baidu.com",
"xueshu.baidu.com",
"cloud.baidu.com",
"www.163.com",
"open.163.com",
"auto.163.com",
"gov.163.com",
"money.163.com",
"sports.163.com",
"tech.163.com",
"edu.163.com",
"www.taobao.com",
"q.taobao.com",
"sf.taobao.com",
"yun.taobao.com",
"baoxian.taobao.com",
"www.tmall.com",
"suning.tmall.com",
"www.tencent.com",
"www.qq.com",
"www.aliyun.com",
"www.ctrip.com",
"hotels.ctrip.com",
"hotels.ctrip.com",
"vacations.ctrip.com",
"flights.ctrip.com",
"trains.ctrip.com",
"bus.ctrip.com",
"car.ctrip.com",
"piao.ctrip.com",
"tuan.ctrip.com",
"you.ctrip.com",
"g.ctrip.com",
"lipin.ctrip.com",
"ct.ctrip.com"
};
struct async_context {
int epfd;
pthread_t thid;
};
typedef void (*async_result_cb)(void *arg, int count);
struct epoll_arg {
async_result_cb cb;
int fd;
};
#define ASYNC_CLIENT_NUM 1024
void dns_async_free_domain(struct dns_item *domains, int count) {
int i = 0;
for (i = 0;i < count;i ++) {
free(domains[i].domain);
free(domains[i].ip);
}
free(domains);
}
//
void *dns_async_callback(void *arg) {
/*
* while (1) {
* epoll_wait();
* recv();
* parser();
* fd --> epoll delete
* }
*/
struct async_context *ctx = (struct async_context *)arg;
while (1) {
struct epoll_event events[ASYNC_CLIENT_NUM] = {0};
// yield --> label --> resume
int nready = epoll_wait(ctx->epfd, events, ASYNC_CLIENT_NUM, 0);
if (nready < 0) continue;
int i = 0;
for (i = 0;i < nready;i ++) {
struct epoll_arg *data = events[i].data.ptr;
int sockfd = data->fd;
char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);
int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
printf("recvfrom n : %d\n", n);
// 为什么要定义成为结构体呢, 因为返回的数据可能有很多类型, 每种类型的处理不一样。
struct dns_item *domains = NULL;
int count = dns_parse_response(buffer, &domains);
data->cb(domains, count);
//
epoll_ctl(ctx->epfd, EPOLL_CTL_DEL, sockfd, NULL);
close(sockfd);
dns_async_free_domain(domains);
free(data);
}
}
}
int dns_async_context_init(struct async_context *ctx) {
if (ctx == NULL) return -1;
//1 epoll_create
int epfd = epoll_create(1);
if (epfd < 0) return -1;
ctx->epfd = epfd;
//1 pthread_create
int ret = pthread_create(&ctx->thid, NULL, dns_async_callback, ctx);
if (ret) {
close(epfd);
return -1;
}
return 0;
}
int dns_async_context_destroy() {
//1 close(epfd)
//1 pthread_cancel(thid)
free();
}
// 多次提交过去
int dns_async_client_commit(struct async_context *ctx, async_result_cb cb) {
//1 socket
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("create socket failed\n");
exit(-1);
}
printf("url:%s\n", domain);
struct sockaddr_in dest;
bzero(&dest, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(53);
dest.sin_addr.s_addr = inet_addr(DNS_SVR);
//1 connect server
int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));
printf("connect :%d\n", ret);
//1 encode protocol
struct dns_header header = {0};
dns_create_header(&header);
struct dns_question question = {0};
dns_create_question(&question, domain);
char request[1024] = {0};
int req_len = dns_build_request(&header, &question, request);
//1 send , 发送过去绑定epoll, 有响应epoll就会搞过来
int slen = sendto(sockfd, request, req_len, 0, (struct sockaddr*)&dest, sizeof(struct sockaddr));
#if 0
char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);
int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
printf("recvfrom n : %d\n", n);
struct dns_item *domains = NULL;
dns_parse_response(buffer, &domains);
#else
struct epoll_arg *eparg = (struct epoll_arg *)calloc(1, sizeof(struct epoll_arg));
if (eparg == NULL) return -1;
eparg->fd = sockfd;
eparg->cb = cb;
struct epoll_event ev;
ev.data.ptr = eparg;
ev.events = EPOLLIN;
epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, sockfd, &ev);
#endif
return 0;
}
int main(int argc, char *argv[]) {
int count = sizeof(domain) / sizeof(domain[0]);
int i = 0;
#if 0
for (i = 0;i < count;i ++) {
dns_client_commit(domain[i]);
}
#else
for (i = 0;i < 50;i ++) {
dns_async_client_commit(ctx, domain[i]);
yield();
}
#endif
getchar();
#if 0
struct async_context * ctx = dns_async_context_init();
dns_async_context_destroy(ctx);
struct async_context * ctx = malloc();
dns_async_context_init(ctx);
dns_async_context_destroy(ctx);
free(ctx);
#endif
}
我们可以大致梳理一下上面的代码, 主要就是dns_async_context_init先创建一个上下文,开启一个线程去读取上下文中的fd,。主线程dns_client_commit去创建一个epoll绑定fd并且各种打包发送dns请求。 然后从线程就从上下文通信的结构体重拿到epoll的fd在循环遍历的读取内容, 将内容放到对应的结构体中。
这个代码和协程有什么联系: 上面的代码如果改造成为协程方式, 就是将每次commit和epollwait搞到一起, 通过协程(底层yiled原理, 跳转到一个函数中运行完再结束)跳转到callback里面的wait去执行, 这就相当于每次提交都要wait, 而我们现在的代码是一个线程提交一个线程处理,这个处理还要单独调用一个线程, 相比较协程的函数栈调用浪费的资源有点多。
如果你只写到一个main函数处理wait会有什么问题: 首先速度比较慢, 此外你的处理是线性的, 你这样就做不了其他任务了, 非常不解耦。 最后现实业务中别人提交的地方是不确定的, 可能发生在任意的地方, 因此对于这种随机的任务必须开单线程去处理, 不管是别人的请求还是本地的修改。
异步为什么需要上下文: 对于异步之间的请求内容同步, 一般都是需要上下文的, 如果上下文有读写冲突记得还要加锁。 上下文就是主线程想要给从线程传递信息用的, 而从线程一般会接收这个信息用来做自己的工作。
异步池和线程池的区别: 异步池是解耦你代码中需要等待的业务而且出现的位置非常随机, 线程池是解耦你创建线程的消耗。 异步请求池中的线程可以来自线程池。 而线程池一般和任务队列一起用, 就是你将各种任务放到一个任务队列, 线程池排队去执行, 其实这也可以叫做异步处理池, 但是由于执行的任务可能会不一样, 而且有排队按照顺序处理的规定, 所以叫了一个消息队列。
//mysql.conf
# 数据库连接池配置文件
# ip地址、端口号、用户名、密码
ip=172.168.1.2
port=3306
user=zhangsan
password=123456
# 连接池初始连接量
init_size=10
# 连接池最大连接量
max_size=1024
# 最大空闲时间默认单位(s)
max_freeTime=60
# 最大连接超时时间(ms)
connect_timeout=100
dbname=student
//connect.h
#pragma once
#include
#include
#include
using namespace std;
/*
* 实现MySQL数据库操作
*/
class Connect {
public:
Connect();
~Connect();
//连接数据库
bool connect(string ip, unsigned short port, string user, string password,
string dbname);
//更新操作
bool update(string sql);
//查询操作
MYSQL_RES *query(string sql);
//刷新连接时间
void refresh_aliveTime() { alive_time_ = clock(); }
//返回存活的时间
clock_t get_aliveTime() { return alive_time_; }
private:
MYSQL *conn_; //表示和mysql的连接
clock_t alive_time_; //记录进入空闲状态后的存活时间
};
//connent.cpp
#pragma once
#include
#include
#include
using namespace std;
/*
* 实现MySQL数据库操作
*/
class Connect {
public:
Connect();
~Connect();
//连接数据库
bool connect(string ip, unsigned short port, string user, string password,
string dbname);
//更新操作
bool update(string sql);
//查询操作
MYSQL_RES *query(string sql);
//刷新连接时间
void refresh_aliveTime() { alive_time_ = clock(); }
//返回存活的时间
clock_t get_aliveTime() { return alive_time_; }
private:
MYSQL *conn_; //表示和mysql的连接
clock_t alive_time_; //记录进入空闲状态后的存活时间
};
//connect_pool.h
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include "connect.hpp"
using namespace std;
//单例模式
class Connect_pool {
public:
static Connect_pool *get_instance();
//给外部提供的接口,返回可用的空闲连接
shared_ptr<Connect> get_connect();
private:
Connect_pool();
//加载配置文件
bool load_configure();
// 运行在独立的线程,专门负责生产新的连接
void produce_connection();
//释放空闲连接
void scan_connection();
private:
string ip_;
unsigned short port_;
string user_;
string password_;
string dbname_;
int init_size_; //连接池初始连接量
int max_size_; //连接的最大连接量
int max_freeTime; //连接池的最大空闲时间
int connection_timeout_; //连接池获取连接的超时时间
queue<Connect *> connect_queue_; //储存mysql连接的队列
mutex queue_mutex_; //维护线程安全
atomic_int connect_cnt_; //记录连接所创建的connect连接的总数量
condition_variable condition_; //设置条件变量,负责生产线程的唤醒和休眠
};
//connect_pool.cpp
#include "connect_pool.h"
#include "public.h"
ConnectPool *ConnectPool::get_instance() {
static ConnectPool pool;
return &pool;
}
shared_ptr<Connect> ConnectPool ::get_connect() {
unique_lock<mutex> lock(queue_mutex_);
while (connect_queue.empty()) {
if (cv_status::timeout ==
condition_.wait_for(lock, chrono::milliseconds(connection_timeout_))) {
if (coonect_queue_.empty()) {
LOG("get connection timeout...");
}
}
}
// 这个共享指针拿到得值如果不用了, 重写删除器, 不能让它走connect的析构,
// 直接插入到队列中。
shared_ptr<Connect> ret(connect_queue.front(), [&](Connect *ptr) {
unique_lock<mutex> lock(queue_mutex_);
connect_queue_.push(ptr);
});
connect_queue_.pop();
if (connect_queue_.empty()) {
condition_.notify_all;
}
return ret;
}
ConnectPool::ConnectPool() {
if (!load_configure()) {
LOG("confgiure file error");
return;
}
for (int i = 0; i < init_size_; i++) {
Connect *p = new Connect();
if (p->connect(ip_, port_, user_, password_, dbname_) == false) {
LOG("connection error!");
}
connect_queue_.push(p);
connect_cont_++;
}
thread connect_producer(bind(&ConnectPool::produce_connection, this));
connect_producer.detach();
thread scaner(bind(&Connect_pool::scan_connection), this);
saner.detach();
}
bool ConnectPool::load_configure() {
FILE *fp = fopen("/mysql.conf", "r");
if (fp == nullptr) {
LOG("mysql conf file open error");
return false;
}
while (!feof(fp)) {
char line[2014] = {0};
fgets(line, 2014, fp);
string str = line;
int index = str.find('=', 0);
if (index == -1) {
continue;
}
int endidx = str.find("\n", index);
string key = str.substr(0, index);
string value = str.substr(index + 1, endidx - index - 1);
if (key == "ip") {
ip_ = value;
} else if (key == "port") {
port_ = atoi(value.c_str());
} else if (key == "user") {
user_ = value;
} else if (key == "password") {
password_ = value;
} else if (key == init_size) {
init_size_ atoi(value.c_str());
} else if (key == "max_freeTime") {
max_freeTime = atoi(value.c_str());
} else if (key == "connect_timeout") {
connection_timeout_ = atoi(value.c_str());
} else if (key == "dbname") {
dbname_ = value;
} else {
LOG("this key is not in data");
}
return true;
}
// 空了代表init不够用了, 就新建。
void ConnectPool::produce_connection() {
while (1) {
unique_lock<mutex> lock(queue_mutex_);
while (!connect_queue_.empty()) {
connection_wait(lock);
}
if (connect_cnt_ < max_size_) {
Connect *p = new Connect();
p->connect(ip_, port_, usr_, password_ dbname_);
connect_queue_.push(p);
connect_cnt_++;
}
condition_.notify_all();
}
}
// 定时看看你新建的连接用不用啊, 不用我就删掉了
void ConnectPool::scan_connection() {
while (1) {
this_thread::sleep_for(chrono::seconds(max_freeTime));
unique_lock(mutex) lock(queue_mutex_);
while (connect_cnt_ > init_size_) {
Connect *p = connect_queue_.front();
if (p->get_aliveTime() > max_freeTime * 1000) {
connect_queue_.pop();
delete p;
connect_cnt--;
}
}
}
}
}

可以看到当你的缓存区比较大的时候,用缓存区写入的更多, 但是当你缓存区小时候,直接操作内核write写入性能更优。 这些同步的写入速度太慢了, 很多日志库支持异步的。
log4cpp的基础概念





剩下的参照:https://www.coonote.com/note/log4cpp-log-design-scheme.html, 我们后续需要再补充。