• 网络编程(六)TCP并发服务器


    (一)概念

    循环服务器:同一时刻只能处理一个客户端的请求。

    并发服务器:可以同时处理多个客户端的请求 相互之间不影响。

    并发和并行的区别:
    并发:并发是指两个或多个事件在 同一时间间隔 发生,把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行。
    并行(多核CPU):并行是指两个或者多个事件在 同一时刻同时 发生,把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行。

    (二)TCP并发服务器

    方式1:使用多线程实现
    方式2:使用多进程实现
    方式3:使用多路IO复用

    (三)使用多线程实现TCP并发服务器

    1. 思路

    主线程负责等待客户端连接(accept);
    一旦有客户端连接成功,就创建一个子线程,专门用来和该客户端通信

    2. 注意点

    1. 使用多线程一定要注意线程间的同步和互斥问题

    3. 代码实现

    server.c

    #include 
    
    sem_t sem;
    
    typedef struct _Msg{
        int acceptfd;
        struct sockaddr_in clientaddr;
    }msg_t;
    
    void *task_func(void *msg){
        pthread_detach(pthread_self());//标记为分离态
        msg_t client_msg=*(msg_t *)msg;
        sem_post(&sem);
        char buff[128]={0};
        int nbytes=0;
        while(1){
            printf("开始通信  acceptfd = %d\n", client_msg.acceptfd);
            if(-1 == (nbytes = recv(client_msg.acceptfd,buff,sizeof(buff),0))){
                //出错
                close(client_msg.acceptfd);
                pthread_exit(NULL);
            }else if(0 == nbytes){//recv接收到的是0
                //客户端断开
                close(client_msg.acceptfd);
                pthread_exit(NULL);
            }
            if(!strcmp(buff,"quit")){
                close(client_msg.acceptfd);
                pthread_exit(NULL);
            }
            //正常接收数据
            printf("线程[%ld]接收到数据[%s]\n",pthread_self(),buff);
            strcat(buff,"--zyx");
            if(-1 == send(client_msg.acceptfd,buff,sizeof(buff),0)){
                close(client_msg.acceptfd);
                pthread_exit(NULL);
            }
        }
    }
    
    int main(int argc, char const *argv[])
    {
        if(3 != argc){
            printf("Usage:%s IPv4 port\n",argv[0]);
            exit(-1);
        }
       
        //创建套接字
        int sockfd=0;
        if(-1 == (sockfd = socket(AF_INET,SOCK_STREAM,0))){
            ERR_LOG("sock error");
        }
        //填充结构体
        struct sockaddr_in serveraddr;
        serveraddr.sin_family=AF_INET;
        serveraddr.sin_addr.s_addr=inet_addr(argv[1]);
        serveraddr.sin_port=htons(atoi(argv[2]));
        socklen_t serveraddrlen = sizeof(serveraddr);
        //绑定结构体信息
        if (-1 == bind(sockfd,(struct sockaddr *)&serveraddr,serveraddrlen)){
            ERR_LOG("bind error");
        }
        //设置为监听状态
        if(-1 == listen(sockfd, 5))
            ERR_LOG("listen error");
    
        //客户端结构体保存网络信息
        struct sockaddr_in clientaddr;
        socklen_t clientaddrlen = sizeof(clientaddr);
        
        msg_t msg;
        int acceptfd = 0;
        pthread_t tid=0; 
        //无名信号量
        sem_init(&sem,0,1);
    
        while(1){
            printf("已就绪,等待连接..\n");
            //主线程等待连接
            if (-1 == (acceptfd = accept(sockfd,(struct sockaddr *)&clientaddr,&clientaddrlen))){
                ERR_LOG("accept error");
            }
            printf("有客户端连接\n");
            //如果主线程连接到了客户端,就创建子线程用来与客户端通信
            sem_wait(&sem);
            msg.acceptfd = acceptfd;
            msg.clientaddr = clientaddr;
            if(0 != pthread_create(&tid,NULL,task_func, &msg)){
                perror("pthread_create error");
            }
        }
        close(sockfd);
    
        return 0;
    }
    

    client.c

    #include 
    
    int main(int argc, char const *argv[])
    {
        if(3 != argc){
            printf("Usage:%s Ipv4 port\n",argv[0]);
            exit(-1);
        }
        //创建套接字
        int sockfd=0;
        if(-1 ==(sockfd = socket(AF_INET,SOCK_STREAM,0))){
            ERR_LOG("socket error");
        }
        //填充结构体
        struct sockaddr_in serveraddr;
        serveraddr.sin_family=AF_INET;
        serveraddr.sin_addr.s_addr=inet_addr(argv[1]);
        serveraddr.sin_port=htons(atoi(argv[2]));
        socklen_t serverlen = sizeof(serveraddr);
        //连接服务器
        if(-1 == connect(sockfd,(struct sockaddr *)&serveraddr,serverlen)){
            ERR_LOG("connect error");
        }
        printf("连接成功\n");
        char buff[128]={0};
        while(1){
            scanf("%s",buff);
            if(-1 == send(sockfd,buff,sizeof(buff),0)){
                ERR_LOG("send error");
            }
            printf("数据[%s]已发送\n",buff);
            if(!strcmp(buff,"quit")){
                break;
            }
            memset(buff,0,sizeof(buff));
            if(-1 == recv(sockfd,buff,sizeof(buff),0)){
                ERR_LOG("recv error");
            }
            printf("接收到数据[%s]\n",buff);
        }
        close(sockfd);
        return 0;
    }
    

    (四)使用多进程实现TCP并发服务器

    1. 思路

    父进程负责accept等待客户端连接;
    一旦有客户端连接成功,就创建一个子进程,专门用来和该客户端通信

    2. 注意点

    多进程要注意子进程资源的回收问题,在服务器程序中如果不及时回收子进程资源,子进程会成为僵尸进程,浪费系统资源

    3. 代码实现

    #include 
    
    //信号处理函数
    void sig_fun(int signum){
        if(SIGCHLD == signum){
            wait(NULL);
        }
    }
    
    int main(int argc, char const *argv[])
    {
        if(3 != argc){
            printf("Usage:%s IPv4 port\n",argv[0]);
            exit(-1);
        }
       
        //创建套接字
        int sockfd=0;
        if(-1 == (sockfd = socket(AF_INET,SOCK_STREAM,0))){
            ERR_LOG("sock error");
        }
        //填充结构体
        struct sockaddr_in serveraddr;
        serveraddr.sin_family=AF_INET;
        serveraddr.sin_addr.s_addr=inet_addr(argv[1]);
        serveraddr.sin_port=htons(atoi(argv[2]));
        socklen_t serveraddrlen = sizeof(serveraddr);
        //绑定结构体信息
        if (-1 == bind(sockfd,(struct sockaddr *)&serveraddr,serveraddrlen)){
            ERR_LOG("bind error");
        }
        //设置为监听状态
        if(-1 == listen(sockfd, 5))
            ERR_LOG("listen error");
    
        //客户端结构体保存网络信息
        struct sockaddr_in clientaddr;
        socklen_t clientaddrlen = sizeof(clientaddr);
        
        int acceptfd = 0;
        pid_t pid=0; 
        //捕捉SIGCHLD信号
        signal(SIGCHLD,sig_fun);
        while(1){
            printf("已就绪,等待连接..\n");
            //主线程等待连接
            if (-1 == (acceptfd = accept(sockfd,(struct sockaddr *)&clientaddr,&clientaddrlen))){
                ERR_LOG("accept error");
            }
            printf("有客户端连接\n");
            if(-1 == (pid=fork())){//出错
                ERR_LOG("fork error");
            }else if(0 == pid){//子进程
                //执行完fork之后,父进程和子进程都各自有一个acceptfd和一个sockfd
                close(sockfd);
                char buff[128]={0};
                int nbytes=0;
                while(1){
                    printf("开始通信\n");
                    if(-1 == (nbytes = recv(acceptfd,buff,sizeof(buff),0))){//出错
                        break;
                    }else if(0 == nbytes){//recv接收到的是0,客户端断开
                        break;
                    }
                    if(!strcmp(buff,"quit")){
                        break;
                    }
                    //正常接收数据
                    printf("进程[%d]接收到数据[%s]\n",getpid(),buff);
                    strcat(buff,"--zyx");
                    if(-1 == send(acceptfd,buff,sizeof(buff),0)){
                        break;
                    }
                }
                //子进程退出
                close(acceptfd);
                exit(0);
            }else if(0 < pid){//父进程
                close(acceptfd);//关闭父进程acceptfd,回收文件描述符资源
            }
        }
        close(sockfd);
        return 0;
    }
    

    4. 关于子进程结束后的资源回收问题

    方式1:父进程退出时,子进程都变成孤儿,被init回收资源 但是我们父进程的是服务器进程 不会退出
    方式2:使用wait阻塞的方式回收 --不推荐用 因为又多了一个阻塞的函数
    方式3:使用waitpid非阻塞方式回收 --不推荐用 因为需要轮询 占用CPU
    方式4:比较好的处理方式是 子进程退出时给父进程发信号 父进程接到信号后再去回收子进程资源(可以通过捕捉SIGCHLD SIGUSR1 SIGUSR2)

    (五)使用IO复用实现TCP并发服务器

    1. 思路

    TCP的服务器默认不支持并发,原因是两类阻塞的函数 accept 和 recv之间相互影响
    也就是说,本质上就是因为 sockfd 和 acceptfd 两类文件描述符的缓冲区中没有内容
    就会阻塞,而且多个阻塞之间相互影响。

    使用IO复用来监视文件描述符,当sockfd就绪,就说明有新的客户端连接;acceptfd就绪,就说明有已连接的客户端发送数据。

    2. 代码实现

    #include 
    
    int main(int argc, char const *argv[])
    {
        if(3 != argc){
            printf("Usage:%s IPv4 port\n",argv[0]);
            exit(-1);
        }
        //创建套接字
        int sockfd=0;
        if(-1 == (sockfd = socket(AF_INET,SOCK_STREAM,0))){
            ERR_LOG("sock error");
        }
        //填充结构体
        struct sockaddr_in serveraddr;
        serveraddr.sin_family=AF_INET;
        serveraddr.sin_addr.s_addr=inet_addr(argv[1]);
        serveraddr.sin_port=htons(atoi(argv[2]));
        socklen_t serveraddrlen = sizeof(serveraddr);
        //绑定结构体信息
        if (-1 == bind(sockfd,(struct sockaddr *)&serveraddr,serveraddrlen)){
            ERR_LOG("bind error");
        }
        //设置为监听状态
        if(-1 == listen(sockfd, 5))
            ERR_LOG("listen error");
        
        int acceptfd = 0;
        pid_t pid=0; 
        
        //创建集合
        fd_set readfds;     //母本
        FD_ZERO(&readfds);  //清空母本
        fd_set tempfds;     //备份
        FD_ZERO(&tempfds);  //清空备份
        int max_fd=0;       //缓存最大的文件描述符
        
        //首先将sockfd添加到监听队列中
        FD_SET(sockfd,&readfds);
        max_fd = max_fd > sockfd ? max_fd : sockfd; //更新最大的文件描述符
        
        int ret=0;          //缓存就绪文件描述符的个数
        int i=0;            //遍历变量
        int nbytes=0;       //缓存接收到的数据字节数
        char buff[128]={0}; //缓存数据
    
        while(1){
            //将母本复制给temp,对所有文件描述符进行监听
            tempfds = readfds;
            //开始监听,没有文件描述符准备就绪时就阻塞
            if(-1 == (ret = select(max_fd+1,&tempfds,NULL,NULL,NULL))){
                ERR_LOG("select error");
            }
            //有fd准备就绪,遍历处理
            for(i = 3; i < max_fd+1 && ret != 0; i++){
                //判断i是否就绪
                if(FD_ISSET(i, &tempfds)){
                    ret--;
                    //判断就绪的i是否是sockfd,如果是说明有客户端连接
                    if(sockfd == i){
                        if(-1 == (acceptfd = accept(i,NULL,NULL))){
                            ERR_LOG("accept error");
                        }
                        printf("客户端[%d]连接\n",acceptfd);
                        FD_SET(acceptfd, &readfds);//将新的acceptfd添加到集合中
                        max_fd = max_fd > acceptfd ? max_fd : acceptfd;//更新最大的文件描述符
                    }else{//说明有已经连接的客户端发来数据了
                        if(-1 == (nbytes = recv(i,buff,sizeof(buff),0))){
                            ERR_LOG("recv error");
                        }else if(0 == nbytes){ //说明有客户端断开连接
                            printf("客户端[%d]断开链接\n",i);
                            FD_CLR(i, &readfds);//将其移除队列
                            close(i);//关闭其文件描述符
                            continue;//跳过本次for循环
                        }
                        if(!strcmp(buff,"quit")){ //说明有客户端退出
                            printf("客户端[%d]退出\n",i);
                            FD_CLR(i, &readfds);//将其移除队列
                            close(i);//关闭其文件描述符
                            continue;//跳过本次for循环
                        }
                        //正常接收数据
                        printf("接收到数据:[%s]\n",buff);
                        strcat(buff,"--zyx");
                        if(-1 == send(i,buff,sizeof(buff),0)){
                            ERR_LOG("send error");
                        }
                    }
                }
            }
        }
        close(sockfd);
    }
    
  • 相关阅读:
    用java写一个kafka的生产者,往topic里面添加数据
    Hyperspectral Imagery Classification Based on Contrastive Learning
    面试算法5:单词长度的最大乘积
    【Java-LangChain:使用 ChatGPT API 搭建系统-1】简介
    UniApp 自定义条件编译详细使用流程
    前端面试题之Javascript篇
    Go-变量& 常量
    YGG 经理人杯总决赛已圆满结束,来看看这份文字版总结!
    Synchronized
    React向组件内部动态传入带内容的结构--props
  • 原文地址:https://blog.csdn.net/weixin_44254079/article/details/139732182