加入星計劃,您可以享受以下權(quán)益:

  • 創(chuàng)作內(nèi)容快速變現(xiàn)
  • 行業(yè)影響力擴散
  • 作品版權(quán)保護
  • 300W+ 專業(yè)用戶
  • 1.5W+ 優(yōu)質(zhì)創(chuàng)作者
  • 5000+ 長期合作伙伴
立即加入
  • 正文
    • 1. 多進程并發(fā)服務(wù)器
    • 2. 多進程并發(fā)服務(wù)器代碼實現(xiàn)
    • 3. 多線程并發(fā)服務(wù)器
    • 4. 多線程并發(fā)服務(wù)器代碼實現(xiàn)
    • 5. 擴展:Socket API封裝
  • 相關(guān)推薦
  • 電子產(chǎn)業(yè)圖譜
申請入駐 產(chǎn)業(yè)圖譜

TCP并發(fā)服務(wù)器(多進程與多線程)

10/10 14:40
531
閱讀需 29 分鐘
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點資訊討論

1. 多進程并發(fā)服務(wù)器

我們在上一節(jié)寫的TCP服務(wù)器只能處理單連接,在代碼實現(xiàn)時,多進程并發(fā)服務(wù)器與非并發(fā)服務(wù)器在創(chuàng)建監(jiān)聽套接字、綁定、監(jiān)聽這幾個步驟是一樣的,但是在接收連接請求的時候,多進程并發(fā)服務(wù)器是這樣實現(xiàn)的:父進程負責(zé)接受連接請求,一旦連接成功,將會創(chuàng)建一個子進程與客戶端通信。示意圖如下:

(1)什么是并發(fā)

單核CPU → 多進程/線程并發(fā) → 時間片輪轉(zhuǎn)

并發(fā) → 某一個時間片/點所能處理的任務(wù)數(shù)

服務(wù)器并發(fā):服務(wù)器在某個時間點/片所能處理的連接數(shù)所能接收的client連接越多,并發(fā)量越大

(2)多進程并發(fā)服務(wù)器需要注意的幾個要點

使用多進程的方式來解決服務(wù)器處理多連接的問題,需要注意下面幾點:

共享:讀時共享、寫時復(fù)制。有血緣關(guān)系的進程間將會共享

文件描述符

內(nèi)存映射區(qū)mmap

父進程扮演什么角色?

等待接受客戶端連接accept()

有連接的時候通過fork()創(chuàng)建一個子進程。父進程只負責(zé)等待客戶端連接,即通過accept()阻塞等待連接請求,一旦有連接請求,馬上通過fork()創(chuàng)建一個子進程,子進程通過共享父進程的文件描述符來實現(xiàn)和client通信。

將用于通信的文件描述符關(guān)閉。accept()接受連接請求后會返回一個用于通信的文件描述符,而父進程的職責(zé)是等待連接并fork()創(chuàng)建用于通信的子進程,所以對于父進程來說,用于通信的文件描述符是沒有用處的,關(guān)閉該文件描述符來節(jié)省開銷。我們知道,文件描述符是有上限的,最多1024個(0-1023),如果不關(guān)閉的話,每次fork()一個子進程都要浪費一個文件描述符,如果進程多了,可能文件描述符就不夠用了。

子進程扮演什么角色?

通信。通過共享的父進程accept()返回的文件描述符來與客戶端通信。

將用于監(jiān)聽的文件描述符關(guān)閉。同樣是為了節(jié)省資源,子進程被fork()出來后也會擁有一個用于監(jiān)聽的文件描述符(因為子進程是對父進程的拷貝),但是子進程的作用是與客戶端通信,所以用于監(jiān)聽的文件描述符對子進程而言并無用處,關(guān)閉以節(jié)省資源。

創(chuàng)建的子進程個數(shù)有限制嗎?

硬件限制

文件描述符默認上限1024

子進程資源回收

wait/waitpid

使用信號回收

signal

sigaction

捕捉信號SIGCHLD

(3)讀時共享寫時復(fù)制詳解

首先看圖

如果父子進程都只是讀數(shù)據(jù),那么他們都通過虛擬地址去訪問1號物理地址的內(nèi)容,如果此時父進程修改了數(shù)據(jù)a=8,那么父進程會先復(fù)制一份數(shù)據(jù)到2號內(nèi)存,然后修改2號內(nèi)存的數(shù)據(jù),父進程再讀的時候就去2號內(nèi)存讀,而子進程依然去1號內(nèi)存讀。如果子進程也要修改這個全局變量,那么子進程也會拷貝一份數(shù)據(jù)到內(nèi)存3,然后修改內(nèi)存3的數(shù)據(jù),子進程訪問數(shù)據(jù)時會訪問內(nèi)存3的數(shù)據(jù)。(多個子進程就會拷貝多份)

2. 多進程并發(fā)服務(wù)器代碼實現(xiàn)

#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <ctype.h>#include <signal.h>#include <sys/wait.h>#include <errno.h>
// 進程回收函數(shù)void recyle(int num){    pid_t pid;    while( (pid = waitpid(-1, NULL, WNOHANG)) > 0 )    {        printf("child died , pid = %dn", pid);    }}
int main(int argc, const char* argv[]){    if(argc < 2)    {        printf("eg: ./a.out portn");        exit(1);    }    struct sockaddr_in serv_addr;    socklen_t serv_len = sizeof(serv_addr);    int port = atoi(argv[1]);
    // 創(chuàng)建套接字    int lfd = socket(AF_INET, SOCK_STREAM, 0);    // 初始化服務(wù)器 sockaddr_in     memset(&serv_addr, 0, serv_len);    serv_addr.sin_family = AF_INET;                   // 地址族     serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);    // 監(jiān)聽本機所有的IP    serv_addr.sin_port = htons(port);            // 設(shè)置端口     // 綁定IP和端口    bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
    // 設(shè)置同時監(jiān)聽的最大個數(shù)    listen(lfd, 36);    printf("Start accept ......n");
    // 使用信號回收子進程pcb //這個子進程回收機制會被子進程復(fù)制    struct sigaction act;    act.sa_handler = recyle;    act.sa_flags = 0;    sigemptyset(&act.sa_mask);    sigaction(SIGCHLD, &act, NULL);
    struct sockaddr_in client_addr;    socklen_t cli_len = sizeof(client_addr);    while(1)    {        // 父進程接收連接請求        // accept阻塞的時候被信號中斷, 處理信號對應(yīng)的操作之后(比如子進程終止,收到信號后去回收子進程)        // 回來之后不阻塞了, 直接返回-1, 這時候 errno==EINTR        int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);        //解決方法就是,在一個循環(huán)中判斷,如果accept阻塞過程中被信號打斷        //也就是返回值-1且errno == EINTR,那么再一次調(diào)用accept        //這樣accept會再次回到阻塞狀態(tài),并且返回值不是-1,也就不會進入循環(huán)        //等到再次被信號打斷的時候才會再次進入循環(huán)        /*這里的cfd雖然只定義了一個,但是在每個子進程中都會有一個拷貝,并且修改一個子進程的cfd不會影響其它子進程*/        while(cfd == -1 && errno == EINTR)        {            cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);        }        printf("connect sucessfuln");        // 創(chuàng)建子進程        pid_t pid = fork();        if(pid == 0)        {            close(lfd);            // child process            // 通信            char ip[64];            while(1)            {                // client ip port                printf("client IP: %s, port: %dn",                        inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),                       ntohs(client_addr.sin_port));                char buf[1024];                int len = read(cfd, buf, sizeof(buf));                if(len == -1)                {                    perror("read error");                    exit(1);                }                else if(len == 0)                {                    printf("客戶端斷開了連接n");                    close(cfd);                    break;                }                else                {                    printf("recv buf: %sn", buf);                    write(cfd, buf, len);                }            }            // 干掉子進程            return 0;
        }        else if(pid > 0)        {            // parent process            close(cfd);        }    }
    close(lfd);    return 0;}

3. 多線程并發(fā)服務(wù)器

多線程并發(fā)服務(wù)器示意圖如下:

在多進程模型中,fork得到的子進程會復(fù)制父進程的文件描述符cfd等信息,每個進程的cfd都是自己的,操作互不影響。但是線程不同,現(xiàn)在只有主線程的cfd,多個線程間的信息是共享的,假如說傳遞給每個子線程的cfd都是同一個,那么線程1修改該文件描述符指向的內(nèi)容會影響到線程2的通信,因為它們共享這一個文件描述符。所以這里需要建立一個文件描述符數(shù)組,每個子線程對應(yīng)數(shù)組中的一個文件描述符。

另外連接主線程的client是哪一個,也就是說哪個client對應(yīng)和哪個子線程通信,這也需要把和子線程通信的client的ip和port傳給和該client通信的子線程,這樣子線程才能知道通信的客戶端的ip和port。

于是我們需要創(chuàng)建一個結(jié)構(gòu)體數(shù)組,每個子線程對應(yīng)結(jié)構(gòu)體數(shù)組中的一個成員,而結(jié)構(gòu)體數(shù)組中的每個成員將作為參數(shù)傳遞給子進程的回調(diào)函數(shù)。

歸根到底就是因為,進程是獨立的,線程是共享的。

線程共享下面的資源:

全局數(shù)據(jù)區(qū)

堆區(qū)

一塊有效內(nèi)存的地址,比如說把線程1的一塊內(nèi)存的地址傳給線程2,那么線程2也可以操作這塊內(nèi)存。

4. 多線程并發(fā)服務(wù)器代碼實現(xiàn)

#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <ctype.h>#include <pthread.h>
// 自定義數(shù)據(jù)結(jié)構(gòu) //把線程處理函數(shù)所需要的信息封裝進來typedef struct SockInfo{    int fd; // 文件描述符    struct sockaddr_in addr; //ip地址結(jié)構(gòu)體    pthread_t id; //線程id}SockInfo;
// 子線程處理函數(shù)void* worker(void* arg){    char ip[64];    char buf[1024];    SockInfo* info = (SockInfo*)arg;    // 通信    while(1)    {        printf("Client IP: %s, port: %dn",               inet_ntop(AF_INET, &info->addr.sin_addr.s_addr, ip, sizeof(ip)),               ntohs(info->addr.sin_port));        int len = read(info->fd, buf, sizeof(buf));        if(len == -1)        {            perror("read error");            pthread_exit(NULL); //只退出子線程        //exit(1); //exit會把主線程也一塊退出        }        else if(len == 0)        {            printf("客戶端已經(jīng)斷開了連接n");            close(info->fd);            break;        }        else        {            printf("recv buf: %sn", buf);            write(info->fd, buf, len);        }    }    return NULL;}
int main(int argc, const char* argv[]){    if(argc < 2)    {        printf("eg: ./a.out portn");        exit(1);    }    struct sockaddr_in serv_addr;    socklen_t serv_len = sizeof(serv_addr);    int port = atoi(argv[1]);
    // 創(chuàng)建套接字    int lfd = socket(AF_INET, SOCK_STREAM, 0);    // 初始化服務(wù)器 sockaddr_in     memset(&serv_addr, 0, serv_len);    serv_addr.sin_family = AF_INET;                   // 地址族     serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);    // 監(jiān)聽本機所有的IP    serv_addr.sin_port = htons(port);            // 設(shè)置端口     // 綁定IP和端口    bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
    // 設(shè)置同時監(jiān)聽的最大個數(shù)    listen(lfd, 36);    printf("Start accept ......n");
    int i = 0;    SockInfo info[256]; //每個線程對應(yīng)數(shù)組的一個元素,最多256個線程    // 規(guī)定 fd == -1  說明這是一個無效文件描述符,也就是說這個文件描述符是空閑的,沒被占用    for(i=0; i<sizeof(info)/sizeof(info[0]); ++i)    {        info[i].fd = -1; //所有文件描述符全部初始化為-1    }
    socklen_t cli_len = sizeof(struct sockaddr_in);    while(1)    {        // 選一個沒有被使用的, 最小的數(shù)組元素        //因為有可能我們使用的文件描述符對應(yīng)數(shù)組下標(biāo)i已經(jīng)累加到了100,但是前面        //99個都已經(jīng)被釋放了(斷開連接了),我們最好選用一個當(dāng)前空閑的數(shù)組下標(biāo)最小        //的文件描述符,以合理利用資源        for(i=0; i<256; ++i)        {            if(info[i].fd == -1)            {                break; //這樣就能把數(shù)組下標(biāo)最小的fd找出來,并確保i指向它,直接break出去            }        }        if(i == 256) //整個數(shù)組都被用完了,直接break出while循環(huán)        {            break;        }        // 主線程 - 等待接受連接請求        info[i].fd = accept(lfd, (struct sockaddr*)&info[i].addr, &cli_len); //第二個參數(shù)是傳出參數(shù),        //傳出客戶端ip信息(struct sockaddr*)類型
        // 創(chuàng)建子線程 - 通信        pthread_create(&info[i].id, NULL, worker, &info[i]);        // 設(shè)置線程分離 //這樣子線程終止的時候會自動釋放,就不需要主線程去釋放了        pthread_detach(info[i].id);    }
    close(lfd);
    // 只退出主線程 //對子線程無影響,子線程可以繼續(xù)通信    pthread_exit(NULL);    return 0;}

5. 擴展:Socket API封裝

#include <stdlib.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <sys/socket.h>
void perr_exit(const char *s){        perror(s);        exit(-1);}
//也可以在vim下按2K跳轉(zhuǎn)到man文檔中的accept函數(shù),因為man文檔跳轉(zhuǎn)不區(qū)分大小寫int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr){        int n;
again:        if ((n = accept(fd, sa, salenptr)) < 0)     {        //ECONNABORTED 發(fā)生在重傳(一定次數(shù))失敗后,強制關(guān)閉套接字        //EINTR 進程被信號中斷 //如果accept函數(shù)在阻塞時被信號打斷,處理完信號           //返回時就不會在阻塞了,而是直接返回-1        if ((errno == ECONNABORTED) || (errno == EINTR))        {        goto again; //如果accept阻塞時被信號打斷了,需要在執(zhí)行一次accept繼續(xù)阻塞        }        else        {        perr_exit("accept error");        }        }        return n;}
int Bind(int fd, const struct sockaddr *sa, socklen_t salen){    int n;
        if ((n = bind(fd, sa, salen)) < 0)    {        perr_exit("bind error");    }
    return n;}
int Connect(int fd, const struct sockaddr *sa, socklen_t salen){    int n;    n = connect(fd, sa, salen);        if (n < 0)     {        perr_exit("connect error");    }
    return n;}
int Listen(int fd, int backlog){    int n;
        if ((n = listen(fd, backlog)) < 0)    {        perr_exit("listen error");    }
    return n;}
int Socket(int family, int type, int protocol){        int n;
        if ((n = socket(family, type, protocol)) < 0)    {        perr_exit("socket error");    }
        return n;}
ssize_t Read(int fd, void *ptr, size_t nbytes){        ssize_t n;
again:        if ( (n = read(fd, ptr, nbytes)) == -1)     {        if (errno == EINTR)                goto again; //如果read被信號中斷了,應(yīng)該讓它繼續(xù)去read等待讀數(shù)據(jù) (read阻塞時)        else                return -1;        }
        return n;}
ssize_t Write(int fd, const void *ptr, size_t nbytes){        ssize_t n;
again:        if ((n = write(fd, ptr, nbytes)) == -1)     {        if (errno == EINTR)                goto again;        else                return -1;        }        return n;}
int Close(int fd){    int n;        if ((n = close(fd)) == -1)                perr_exit("close error");
    return n;}
/*參三: 應(yīng)該讀取的字節(jié)數(shù)*/     //一直讀到n字節(jié)數(shù)才會返回,否則阻塞等待                     //socket 4096  readn(cfd, buf, 4096)   nleft = 4096-1500ssize_t Readn(int fd, void *vptr, size_t n){        size_t  nleft;              //usigned int 剩余未讀取的字節(jié)數(shù)        ssize_t nread;              //int 實際讀到的字節(jié)數(shù)        char   *ptr;
        ptr = vptr;        nleft = n;                  //n 未讀取字節(jié)數(shù)
        while (nleft > 0)     {        if ((nread = read(fd, ptr, nleft)) < 0)         {        if (errno == EINTR)            {        nread = 0;            }        else            {        return -1;            }        }         else if (nread == 0)        {        break;        }
        nleft -= nread;   //nleft = nleft - nread         ptr += nread;        }        return n - nleft;}
ssize_t Writen(int fd, const void *vptr, size_t n){        size_t nleft;        ssize_t nwritten;        const char *ptr;
        ptr = vptr;        nleft = n;        while (nleft > 0)     {        if ( (nwritten = write(fd, ptr, nleft)) <= 0)         {        if (nwritten < 0 && errno == EINTR)                nwritten = 0;        else                return -1;        }        nleft -= nwritten;        ptr += nwritten;        }        return n;}
static ssize_t my_read(int fd, char *ptr) //靜態(tài)函數(shù)保證了讀完第一個100字節(jié)才去讀下一個100字節(jié),而不是每次調(diào)用都讀100字節(jié){        static int read_cnt; //改變量存在靜態(tài)數(shù)據(jù)區(qū),下次調(diào)用my_read函數(shù)的時候,read_cnt會保留上次的值        static char *read_ptr;        static char read_buf[100];                //因為這里的變量都是static的,所以并非每次調(diào)用my_read都會讀100字節(jié),而是讀完100字節(jié)再去讀下一個100字節(jié)        if (read_cnt <= 0) { again:        if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0)    //"hellon"        {        if (errno == EINTR)                goto again;        return -1;        }         else if (read_cnt == 0)        return 0;
        read_ptr = read_buf;        }        read_cnt--; //在上次調(diào)用結(jié)束的值基礎(chǔ)上--,保證了讀完100字節(jié)再去讀下一個100字節(jié)        *ptr = *read_ptr++;
        return 1;}
/*readline --- fgets*/    //傳出參數(shù) vptrssize_t Readline(int fd, void *vptr, size_t maxlen){        ssize_t n, rc;        char    c, *ptr;        ptr = vptr;
        for (n = 1; n < maxlen; n++)     {        if ((rc = my_read(fd, &c)) == 1)    //ptr[] = hellon        {        *ptr++ = c;        if (c == 'n') //先讀100個字節(jié),依次遍歷,遇到 'n' 說明一行讀完了                break;        }         else if (rc == 0)         {        *ptr = 0;        return n-1;        }         else        return -1;        }        *ptr = 0;
        return n;}

 

相關(guān)推薦

電子產(chǎn)業(yè)圖譜

Linux、C、C++、Python、Matlab,機器人運動控制、多機器人協(xié)作,智能優(yōu)化算法,貝葉斯濾波與卡爾曼濾波估計、多傳感器信息融合,機器學(xué)習(xí),人工智能。