xiaobaoqiu Blog

Think More, Code Less

Memcached网络模型

这篇文章的目的是学习Memcached网络模型相关的源代码.

Memcached采用了很典型的Master-Worker模型,采用的是多线程而不是多进程. 主线程(Master)接收连接, 然后把连接平分派给工作线程(Worker),工作线程处理业务逻辑.

核心的共享数据是消息队列,主线程会把收到的事件请求放入队列,随后调度程序会选择一个空闲的Worker线程来从队列中取出事件请求进行处理.

1.libevent简介

Memcached使用libevent实现事件循环,libevent在Linux环境下默认采用epoll作为IO多路复用方法. 用户线程使用libevent则通常按以下步骤: (1).用户线程通过event_init()函数创建一个event_base对象。event_base对象管理所有注册到自己内部的IO事件。多线程环境下,event_base对象不能被多个线程共享,即一个event_base对象只能对应一个线程。 (2).然后该线程通过event_add函数,将与自己感兴趣的文件描述符相关的IO事件,注册到event_base对象,同时指定事件发生时所要调用的事件处理函数(event handler)。服务器程序通常监听套接字(socket)的可读事件。比如,服务器线程注册套接字sock1的EV_READ事件,并指定event_handler1()为该事件的回调函数。libevent将IO事件封装成struct event类型对象,事件类型用EV_READ/EV_WRITE等常量标志。 (3).注册完事件之后,线程调用event_base_loop进入循环监听(monitor)状态。该循环内部会调用epoll等IO复用函数进入阻塞状态,直到描述符上发生自己感兴趣的事件。此时,线程会调用事先指定的回调函数处理该事件。例如,当套接字sock1发生可读事件,即sock1的内核buff中已有可读数据时,被阻塞的线程立即返回(wake up)并调用event_handler1()函数来处理该次事件。 (4).处理完这次监听获得的事件后,线程再次进入阻塞状态并监听,直到下次事件发生。

2.Memcached网络模型

大致的图示如下:

2.1主要数据结构

首先是CQ_ITEM, CQ_ITEM实际上是主线程accept后返回的已建立连接的fd的封装:

thread.c
1
2
3
4
5
6
7
8
9
10
/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;
    enum conn_states  init_state;
    int               event_flags;
    int               read_buffer_size;
    enum network_transport     transport;
    CQ_ITEM          *next;
};

CQ是一个管理CQ_ITEM的单向链表:

thread.c
1
2
3
4
5
6
7
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
};

LIBEVENT_THREAD是Memcached对线程结构的封装,每个线程都包含一个CQ队列,一条通知管道pipe 和一个libevent的实例event_base :

thread.c
1
2
3
4
5
6
7
8
9
10
11
typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* CQ队列 */
    cache_t *suffix_cache;      /* suffix cache */
    uint8_t item_lock_type;     /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

2.2主流程

在memcached.c的main函数中展示了客户端请求处理的主流程:

(1).对主线程的libevent做了初始化

1
2
/* initialize main thread libevent instance */
 main_base = event_init();

(2).初始化所有的线程(包括Master和Worker线程),并启动

1
2
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);

其中settings.num_threads表示线程数目,默认是4个:

1
settings.num_threads = 4;         /* N workers */

下面简单分析thread_init的核心代码(thread.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
 * Initializes the thread subsystem, creating various worker threads.
 *
 * nthreads  Number of worker event handler threads to spawn
 * main_base Event base for main thread
 */
void thread_init(int nthreads, struct event_base *main_base) {

    ...//省略若干代码

    //threads的声明在thread.c头部,用于保存所有的线程
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();

    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {    //创建管道
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];  //读端
        threads[i].notify_send_fd = fds[1];     //写端

        //创建所有workers线程的libevent实例
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    //创建线程
    /* Create threads after we've done all the libevent setup. */
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    //等待所有线程启动起来之后,这个函数再返回
    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

thread_init首先malloc线程的空间,然后第一个threads作为主线程,其余都是workers线程 然后为每个线程创建一个pipe,这个pipe被用来作为主线程通知workers线程有新的连接到达.

其中pipe()函数用于创建管道,管道两端可分别用描述字fds[0]以及fds[1]来描述.需要注意的是,管道的两端是固定的。即一端只能用于读,由描述字fds[0]表示,称其为管道读端;另一端则只能用于写,由描述字fds[1]来表示,称其为管道写端.

setup_thread主要是创建所有workers线程的libevent实例(主线程的libevent实例在main函数中已经建立),setup_thread()的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
 * Set up a thread's information.
 */
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    //注意这里只有notify_receive_fd,即读端口
    /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);

    ...
}

这里会为所有worker thread线程注册与notify_event_fd描述符有关的IO事件,这里的notify_event_fd描述符是该worker thread线程与main thread线程通信的管道的接收端(读)描述符。通过注册与该描述符有关的IO事件,worker thread线程就能监听main thread线程发给自己的数据(即事件).

注意这里event_set中的thread_libevent_process参数,其意义在于监听Worker线程与main thread线程通信的管道上的可读事件,并指定用thread_libevent_process()函数处理该事件,即每次管道读端有数据刻度,即触发thread_libevent_process过程.

thread_libevent_process的代码如下,其中最重要的一个就是数据为c的,后续会详细分析这块代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
 * Processes an incoming "handle a new connection" item. This is called when
 * input arrives on the libevent wakeup pipe.
 */
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //从管道中读数据
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c':   //c表示有新的连接请求被主线程分配到当前Worker线程
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to flip the lock type and report in */
    case 'l':
        ...
    case 'g':
        ...
    }
}

thread_init函数中create_worker实际上就是真正启动了线程, create_worker的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Creates a worker thread.
 */
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

pthread_create是创建线程函数,第三个参数是线程运行函数的起始地址,这里即worker_libevent函数,该方法执行event_base_loop启动该线程的libevent.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; thread_init() will block until
     * all threads have finished initializing.
     */

    /* set an indexable thread-specific memory item for the lock type.
     * this could be unnecessary if we pass the conn *c struct through
     * all item_lock calls...
     */
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);

    register_thread_initialized();

    event_base_loop(me->base, 0);
    return NULL;
}

这里我们需要记住每个workers线程目前只在自己线程的管道的读端有数据时可读时触发,并调用 thread_libevent_process方法.

(3).主线程调用

1
2
/* create the listening socket, bind it, and init */
server_sockets(settings.port, tcp_transport, portnumber_file)

在worker thread线程启动后,main thread线程就要创建监听套接字(listening socket)来等待客户端连接请求。这个方法主要是封装了创建监听socket,绑定地址,设置非阻塞模式并注册监听socket的libevent 读事件等一系列操作.

套接字被封装成conn对象,表示与客户端的连接,定义十分庞大(见memcached.h).

端口号默认是11211:

1
settings.port = 11211;

server_sockets函数主要调用server_socket()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
 * Create a socket and bind it to a specific port number
 * @param interface the interface to bind to
 * @param port the port number to bind to
 * @param transport the transport protocol (TCP / UDP)
 * @param portnumber_file A filepointer to write the port numbers to
 *        when they are successfully added to the list of ports we
 *        listen on.
 */
static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {

    ...//省略若干代码

    //主机名到地址解析,结果存在ai中,为addrinfo的链表
    error= getaddrinfo(interface, port_buf, &hints, &ai);
    if (error != 0) {
        if (error != EAI_SYSTEM)
          fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
        else
          perror("getaddrinfo()");
        return 1;
    }

    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {   //创建socket
            /* getaddrinfo can return "junk" addresses,
             * we make sure at least one works before erroring.
             */
            if (errno == EMFILE) {
                /* ...unless we're out of fds */
                perror("server_socket");
                exit(EX_OSERR);
            }
            continue;
        }
        //IPV4地址,设置socket选项
        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
        ...//省略若干代码

        //socket和地址绑定
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            if (errno != EADDRINUSE) {
                perror("bind()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
            close(sfd);
            continue;
        } else {
            success++;
            ...//省略若干代码
        }

        if (IS_UDP(transport)) {
            ...//省略若干代码
        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }

    freeaddrinfo(ai);

    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}

conn_new()是这里的最关键的一个函数,此函数负责将原始套接字封装成为一个conn对象,同时会注册与该conn对象相关的IO事件,并指定该连接(conn)的初始状态。这里要注意的是listening socket的conn对象被初始化为conn_listening状态.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {

    ...//省略若干代码

    //设置fd和初始状态
    c->sfd = sfd;
    c->state = init_state;

    //注册与该连接有关的IO事件
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    ...//
}

所有conn对象IO事件相关的处理函数都是event_handler()函数,这个函数主要是调用drive_machine()函数:

1
2
3
4
5
6
void event_handler(const int fd, const short which, void *arg) {
    conn *c;
    ...
    drive_machine(c);
    ...
}

drive_machine这个函数就全权负责处理与客户连接相关的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void drive_machine(conn *c) {
    ...
    assert(c != NULL);

    while (!stop) {
        switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            ...

            if (settings.maxconns_fast &&
                ...
            } else {
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;
        case conn_waiting:
        ...
        case conn_read:
        ...
        case conn_parse_cmd:
        ...
        case conn_new_cmd:
        ...
        case conn_nread:
        ...
        case conn_swallow:
        ...
        case conn_write:
        ...
        case conn_mwrite:
        ...
        case conn_closing:
        ...
        case conn_closed:
    ...
}

drive_machine中就是conn对象的state字段发挥作用的地方了,drive_machine()函数是一个巨大的switch语句,它根据conn对象的当前状态,即state字段的值选择执行不同的分支,因为listening socket的conn对象被初始化为conn_listening状态,所以drive_machine()函数会执行switch语句中case conn_listenning的分支,即接受客户端连接并通过dispatch_conn_new()函数将连接分派给Worker线程.

dispatch_conn_new代码如下(thread.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
 * Dispatches a new connection to another thread. This is only ever called
 * from the main thread, either during initialization (for UDP) or because
 * of an incoming connection.
 */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();  //新申请一个CQ_ITEM
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    //分发给Worker线程
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;  //注意这里的状态为conn_new_cmd
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    //把新申请的CQ_ITEM放到被分配的Worker线程的队列中
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    //向worker thread线程的管道写入一字节的数据
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

向Worker线程写一个字符的意义在于触发Worker线程管道的读端,即notify_receive_fd描述符的可读事件.

主线程在新连接到来的时候是如何选择处理副线程的呢?很简单,有一个计数器last_thread, 每次将last_thread加一,再模线程数来选择线程ID.

通过之前的分析,我们知道,Worker线程的管道有读时间触发的时候,会调用thread_libevent_process来处理,这里详细分析一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
 * Processes an incoming "handle a new connection" item. This is called when
 * input arrives on the libevent wakeup pipe.
 */
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //从管道中读数据
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c':   //c表示有新的连接请求被主线程分配到当前Worker线程
    //从当前Worker线程的连接请求队列中弹出一个请求
    //此对象即先前main thread线程推入new_conn_queue队列的对象
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        //根据这个CQ_ITEM对象,创建并初始化conn对象
        //该对象负责客户端与该worker thread线程之间的通信
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to flip the lock type and report in */
    case 'l':
        ...
    case 'g':
        ...
    }
}

到这里,Worker线程就建立了和客户端的连接.

conn_new的一个值得注意的地方就是会设置线程的事件处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    ...
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
        event_base_set(base, &c->event);
        c->ev_flags = event_flags;

        if (event_add(&c->event, 0) == -1) {
            perror("event_add");
            return NULL;
        }
    ...
}

我们可以看到,Worker线程也是使用event_handler函数来处理客户端请求过来的数据,根当前请求连接的状态来处理.

(4).事件循环

1
2
/* enter the event loop */
event_base_loop(main_base, 0);

这时主线程启动开始通过libevent来接受外部连接请求,整个启动过程完毕.

3.总结

Memcached中采用的就是所谓的半同步-半异步模式,最早应该是由ACE的作者提出,原文在这里.

简单示意图如下:

3.1半同步-半异步模式

几个模块的之间的交互为:

(1).异步模块接收可能会异步到来的各种事件(I/O,信号等),然后将它们放入队列中;
(2).同步模块一般只有一种动作,就是不停的从队列中取出消息进行处理;

半同步-半异步模式的出现是为了给服务器的功能进行划分,尽可能将的可能阻塞的操作放在同步模块中,这样不会影响到异步模块的处理.

举个例子说明:

假设现在有一个服务器,在接收完客户端请求之后会去数据库查询,这个查询可能会很慢.这时,如果还是采用的把接收客户端的连接和处理客户端的请求(在这里这个处理就是查询数据库)放在一个模块中来处理,很可能将会有很多连接的处理响应非常慢.

此时,考虑使用半同步半异步的模式,开一个进程,使用多路复用IO(如epoll/select)等监听客户端的连接,接收到新的连接请求之后就将这些请求存放到通过某种IPC方式实现的消息队列中,同时,还有N个处理进程,它们所做的工作就是不停的从消息队列中取出消息进行处理.这样的划分,将接收客户端请求和处理客户端请求划分为不同的模块,相互之间的通过IPC进行通讯,将对彼此功能的影响限制到最小.

优点

(1).接收操作只在主循环中处理,因此不会出现惊群现象;
(2).主副线程分工明确, 主线程仅负责I/O, 副线程负责业务逻辑处理;
(3).多个副线程之间不会有影响,因为大家都有各自独立的连接队列;

缺点

假如业务逻辑是类似于web服务器之类的, 那么一个简单的请求也需要这个比较繁琐的操作的话(最重要的是,很可能一个进程就能处理完的事情,非得从一个线程接收再到另一个线程去处理), 那么显然代价是不值得的.