Loading...
墨滴

herongwei

2021/03/28  阅读:51  主题:默认主题

阻塞I/O和线程模型:一种轻量模式

阻塞I/O和线程模型:一种轻量模式

为什么需要多线程

进程切换上下文的代价是比较高的,幸运的是,有一种轻量级的模型可以处理多用户连接请求,这就是线程模型。这一讲里,我们就来了解一下线程模型。

线程(thread)是运行在进程中的一个逻辑流,现代操作系统都允许在单进程中运行多个线程。

线程由操作系统内核管理。每个线程都有自己的上下文(content),包括一个可以唯一标识线程的 ID、栈、程序计数器、寄存器等。

在同一个进程中,所有的线程共享该进程的整个虚拟地址空间,包括代码、数据、堆、共享库等。

实际上,从 Linux 内核角度来讲,可以认为它并没有线程这个概念,无论是我们所说的线程,还是进程,对于 Linux 而言,都属于 task,因此无论是进程还是线程,都拥有唯一属于自己的 task_struct 。实际上我们所谓的线程,更像是 task 这个概念,有的进程有一个 task,就叫做单线程进程;有的进程有多个线程,就叫做多线程进程。

每一个进程一开始都会产生一个线程,一般称为主线程,主线程可以在产生子线程,这样的主线程-子线程可以叫做一个对等线程。

你可能会问,既然可以使用多进程来处理并发,为什么还要使用多线程模式呢?

简单来说,在同一个进程下,线程上下文切换的开销要比进程小得多。怎么理解线程的上下文呢?

我们的代码被 CPU 调度执行的时候,是需要一些数据支撑的,比如程序计数器告诉 CPU 代码执行到哪里了?寄存器里存了当前计算的一些中间值,内存里放置了一些当前用到的变量等。从一个计算场景切换到另外一个计算场景,程序计数器、寄存器等这些值重新载入新场景的值,就是线程的上下文切换。

POSIX 线程模型

POSIX 线程是现代 UNIX 系统提供的处理线程的标准接口。POSIX 定义的线程函数大约有 60 多个,这些函数可以帮助我们创建线程、回收线程。接下来我们先看一个简单的例子程序。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

int another_shared = 0;

void thread_run(void *arg) {
    int *calc = (int *) arg;
    printf("hello, world, tid == %d \n", pthread_self());
    for (int i = 0; i < 100; i++) {
        *calc += 1;
        another_shared += 1;
    }
}
int main(int argc, char **argv) {
    int calc;
    pthread_t tid1;
    pthread_t tid2;

    pthread_create(&tid1, NULL, thread_run, &calc);
    pthread_create(&tid2, NULL, thread_run, &calc);

    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("calculator is %d \n", calc); 
    printf("another_shared is %d \n", another_shared);
}

上面的程序中,主线程依次创建了两个子线程,然后等待这两个子线程处理完毕之后终止。每个子线程都在对两个共享变量进行计算,最后在主线程中打印出最后的计算结果。

程序的第 18 和 19 行分别调用了 pthread_create 创建了两个线程,每个线程的入口都是 thread_run 函数,这里我们使用了 calculator 这个全局变量,并且通过传地址指针的方式,将这个值传给了 thread_run 函数。

当调用 pthread_create 结束,子线程会立即执行,主线程在此后调用了 pthread_join 函数等待子线程结束。

运行这个程序,很幸运,计算的结果是正确的。

主要线程函数

创建线程

正如前面看到,通过调用 pthread_create 函数来创建一个线程。这个函数的原型如下:

int pthread_create(pthread_t *tid, const pthread_attr_t *attr,
           void *(*func)(void *), void *arg)
;

返回:若成功则为0,若出错则为正的Exxx值

每个线程都有一个线程 ID(tid)唯一来标识,其数据类型为 pthread_t,一般是 unsigned int。pthread_create 函数的第一个输出参数 tid 就是代表了线程 ID,如果创建线程成功,tid 就返回正确的线程 ID。

每个线程都会有很多属性,比如优先级、是否应该成为一个守护进程等,这些值可以通过 pthread_attr_t 来描述,一般我们不会特殊设置,可以直接指定这个参数为 NULL。

第三个参数为新线程的入口函数,该函数可以接收一个参数 arg,类型为指针,如果我们想给线程入口函数传多个值,那么需要把这些值包装成一个结构体,再把这个结构体的地址作为 pthread_create 的第四个参数,在线程入口函数内,再将该地址转为该结构体的指针对象。

简单来说,第三个参数:你想让线程来干什么?第四个参数:干了这件事之后结果保存到哪

在新线程的入口函数内,可以执行 pthread_self 函数返回线程 tid。

pthread_t pthread_self(void)

终止线程

终止一个线程最直接的方法是在父线程内调用以下函数:

void pthread_exit(void *status)

当调用这个函数之后,父线程会等待其他所有的子线程终止,之后父线程自己终止。

当然,如果一个子线程入口函数直接退出了,那么子线程也就自然终止了。所以,绝大多数的子线程执行体都是一个无限循环。

也可以通过调用 pthread_cancel 来主动终止一个子线程,和 pthread_exit 不同的是,它可以指定某个子线程终止。

int pthread_cancel(pthread_t tid)

回收已终止线程的资源

我们可以通过调用 pthread_join 回收已终止线程的资源:

int pthread_join(pthread_t tid, void ** thread_return)

当调用 pthread_join 时,主线程会阻塞,直到对应 tid 的子线程自然终止。和 pthread_cancel 不同的是,它不会强迫子线程终止。

分离线程

一个线程的重要属性是可结合的,或者是分离的。一个可结合的线程是能够被其他线程杀死和回收资源的;而一个分离的线程不能被其他线程杀死或回收资源。一般来说,默认的属性是可结合的。我们可以通过调用 pthread_detach 函数可以分离一个线程:

int pthread_detach(pthread_t tid)

在高并发的例子里,每个连接都由一个线程单独处理,在这种情况下,服务器程序并不需要对每个子线程进行终止,这样的话,每个子线程可以在入口函数开始的地方,把自己设置为分离的,这样就能在它终止后自动回收相关的线程资源了,就不需要调用 pthread_join 函数了。

每个连接一个线程处理

接下来,我们改造一下服务器端程序。我们的目标是这样:每次有新的连接到达后,创建一个新线程,而不是用新进程来处理它。

#include "lib/common.h"

extern void loop_echo(int);

void thread_run(void *arg) {
    pthread_detach(pthread_self());
    int fd = (int) arg;
    loop_echo(fd);
}

int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);
    pthread_t tid;
    
    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
        } else {
            pthread_create(&tid, NULL, &thread_run, (void *) fd);
        }
    }

    return 0;
}

这个程序的第 18 行阻塞调用在 accept 上,一旦有新连接建立,阻塞调用返回,调用 pthread_create 创建一个子线程来处理这个连接。

描述连接最主要的是连接描述字,这里通过强制把描述字转换为 void * 指针的方式,完成了传值。这里可以简单总结一下,虽然传的是一个指针,但是这个指针里存放的并不是一个地址,而是连接描述符的数值。

新线程入口函数 thread_run 里,第 6 行使用了 pthread_detach 方法,将子线程转变为分离的,也就意味着子线程独自负责线程资源回收。第 7 行,强制将指针转变为描述符数据,和前面将描述字转换为 void * 指针对应,第 8 行调用 loop_echo 方法处理这个连接的数据读写。

loop_echo 的程序如下,在接收客户端的数据之后,再编码回送出去。

//在接收客户端的数据之后再编码回送出去
void loop_echo(int fd) {
    char outbuf[MAX_LINE + 1];
    size_t outbuf_used = 0;
    ssize_t result;

    while (1) {
        char ch;
        result = recv(fd, &ch, 10);

        //断开连接或者出错
        if (result == 0) {
            break;
        } else if (result == -1) {
            error(1, errno, "read error");
            break;
        }

        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used ++] = ch;
        }
        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

运行这个程序之后,开启多个 telnet 客户端,可以看到这个服务器程序可以处理多个并发连接并回送数据。单独一个连接退出也不会影响其他连接的数据收发。

$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
aaa
nnn
^]
telnet> quit
Connection closed.

构建线程池处理多个连接

上面的服务器端程序虽然可以正常工作,不过它有一个缺点,那就是如果并发连接过多,就会引起线程的频繁创建和销毁。虽然线程切换的上下文开销不大,但是线程创建和销毁的开销却是不小的。

能不能对这个程序进行一些优化呢?

我们可以使用预创建线程池的方式来进行优化。在服务器端启动时,可以先按照固定大小预创建出多个线程,当有新连接建立时,往连接字队列里放置这个新连接描述字,线程池里的线程负责从连接字队列里取出连接描述字进行处理。

这个程序的关键是连接字队列的设计,因为这里既有往这个队列里放置描述符的操作,也有从这个队列里取出描述符的操作。

对此,需要引入两个重要的概念,一个是锁 mutex,一个是条件变量 condition。锁很好理解,加锁的意思就是其他线程不能进入;条件变量则是在多个线程需要交互的情况下,用来线程间同步的原语。

#include "lib/common.h"

#define  THREAD_NUMBER      4
#define  BLOCK_QUEUE_SIZE   100

extern void loop_echo(int);

typedef struct {
    pthread_t thread_tid;/* thread ID */
    long thread_count;   /* # connections handled */
}Thread;

Thread *thread_array;

//定义一个队列
typedef struct {
    int max_number;     //队列里最大的描述子最大个数
    int *fd;        //数组指针
    int front;      //当前队列的头
    int rear;       //当前队列的尾
    pthread_mutex_t mutex;  //锁
    pthread_cond_t cond;    //条件变量
} block_queue;

//初始化队列
void block_queue_init(block_queue *blockQueue , int number) {
    blockQueue->max_number = number;
    blockQueue->fd = calloc(number, sizeof(int));
    blockQueue->front = blockQueue->rear = 0;
    pthread_mutex_init(&blockQueue->mutex, NULL);
    pthread_cond_init(&blockQueue->cond, NULL);
}

//往队列里放置一个描述字 fd
void block_queue_push(block_queue *blockQueue , int fd) {
    //先加锁 因为有多个线程需要读写队列
    pthread_mutex_lock(&blockQueue->mutex);
    //将描述字放到队列尾的位置
    blockQueue->fd[blockQueue->rear] = fd;
    if (++blockQueue->rear == blockQueue->max_number) {
        blockQueue->rear = 0;
    }//如果已经到最后,重置尾的位置

    printf("push fd %d\n", fd);
    //通知其他等待的线程 有新的连接等待处理
    pthread_cond_signal(&blockQueue->cond);
    //解锁
    pthread_mutex_unlock(&blockQueue->mutex);
}
//往队列里读出描述字进行处理
int block_queue_pop(block_queue *blockQueue) {
    //先加锁 因为有多个线程需要读写队列
    pthread_mutex_lock(&blockQueue->mutex);
    //判断队列里没有新的连接字可以处理,就一直条件等待,直到有新的连接字入队列
    while(blockQueue->front == blockQueue->rear) {
        pthread_cond_wait(&blockQueue->cond, &blockQueue->mutex);
    }
    int fd = blockQueue->fd[blockQueue->front];
    //如果已经到最后,重置头的位置
    if (++blockQueue->front == blockQueue->max_number) {
        blockQueue->front = 0;
    }
    printf("pop fd %d\n", fd);
    //解锁
    pthread_mutex_unlock(&blockQueue->mutex);
    //返回连接字
    return fd;
}

void thread_run(void *arg) {
    pthread_t tid = pthread_self();
    pthread_detach(tid);

    block_queue *blockQueue = (block_queue *) arg;
    while (1) {
        int fd = block_queue_pop(blockQueue);//在队列里取出描述符
        printf("get fd in thread, fd==%d, tid == %d", fd, tid);
        loop_echo(fd);
    }
}

int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);

    block_queue blockQueue;
    block_queue_init(&blockQueue, BLOCK_QUEUE_SIZE);

    thread_array = calloc(THREAD_NUMBER, sizeof(Thread));
    int i;
    for (i = 0; i < THREAD_NUMBER; i++) {
        pthread_create(&(thread_array[i].thread_tid), NULL, &thread_run, (void *) &blockQueue);
    }

    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
        } else {
            block_queue_push(&blockQueue, fd);
        }
    }
    return 0;
}

这里有 block_queue 相关的定义和实现,并在关键的地方加了一些注释,有几个地方需要特别注意:

第一是记得对操作进行加锁和解锁,这里是通过 pthread_mutex_lock 和 pthread_mutex_unlock 来完成的。

第二是当工作线程没有描述字可用时,需要等待,第 43 行通过调用 pthread_cond_wait,所有的工作线程等待有新的描述字可达。

第 32 行,主线程通知工作线程有新的描述符需要服务。

有了描述字队列,主程序变得非常简洁。第 19-23 行预创建了多个线程,组成了一个线程池。28-32 行在新连接建立后,将连接描述字加入到队列中。7-9 行是工作线程的主循环,从描述字队列中取出描述字,对这个连接进行服务处理。同样的,运行这个程序之后,开启多个 telnet 客户端,可以看到这个服务器程序可以正常处理多个并发连接并回显。

终端执行 top -H -p pid(thread02的进程号)查看线程数,可看到,一个主线程,开启了四个子线程。

和前面的程序相比,线程创建和销毁的开销大大降低,但因为线程池大小固定,又因为使用了阻塞套接字,肯定会出现有连接得不到及时服务的场景。

仅仅使用阻塞 I/O 模型和线程是没有办法达到极致的高并发处理能力。下一篇我们来学习一下非阻塞多路 I/O 复用加上线程来处理。

总结

今天的实战我们使用了线程来构建服务器端程序。一种是每次动态创建线程,另一种是使用线程池提高效率。和进程相比,线程的语义更轻量,使用的场景也更多。线程是高性能网络服务器必须掌握的核心知识。希望大家能在实战中加深印象。

参考:极客时间《网络编程实战》

更多精彩

关注公众号【herongwei】

  • 后台回复【大礼包】即可获取白嫖后端技术知识+算法电子书~
  • 回复【刷题】送大家一本帮助我拿到 BAT 等一线大厂 offer 的算法笔记,是一位谷歌+阿里大神写的,对于算法薄弱或者需要提高的同学都十分受用,算法一定是计算机学习的重中之重

为了回馈读者,本公众号不定期会有送礼活动,敬请关注~

herongwei

2021/03/28  阅读:51  主题:默认主题

作者介绍

herongwei

公众号/知乎搜「herongwei」关注