/* An item in the connection queue. */typedefstructconn_queue_itemCQ_ITEM;structconn_queue_item{intsfd;enumconn_statesinit_state;intevent_flags;intread_buffer_size;enumnetwork_transporttransport;CQ_ITEM*next;};
CQ是一个管理CQ_ITEM的单向链表:
thread.c
1234567
/* A connection queue. */typedefstructconn_queueCQ;structconn_queue{CQ_ITEM*head;CQ_ITEM*tail;pthread_mutex_tlock;};
typedefstruct{pthread_tthread_id;/* unique ID of this thread */structevent_base*base;/* libevent handle this thread uses */structeventnotify_event;/* listen event for notify pipe */intnotify_receive_fd;/* receiving end of notify pipe */intnotify_send_fd;/* sending end of notify pipe */structthread_statsstats;/* Stats generated by this thread */structconn_queue*new_conn_queue;/* CQ队列 */cache_t*suffix_cache;/* suffix cache */uint8_titem_lock_type;/* use fine-grained or global item lock */}LIBEVENT_THREAD;
2.2主流程
在memcached.c的main函数中展示了客户端请求处理的主流程:
(1).对主线程的libevent做了初始化
12
/* initialize main thread libevent instance */main_base=event_init();
(2).初始化所有的线程(包括Master和Worker线程),并启动
12
/* start up worker threads if MT mode */thread_init(settings.num_threads,main_base);
/* * Initializes the thread subsystem, creating various worker threads. * * nthreads Number of worker event handler threads to spawn * main_base Event base for main thread */voidthread_init(intnthreads,structevent_base*main_base){...//省略若干代码//threads的声明在thread.c头部,用于保存所有的线程threads=calloc(nthreads,sizeof(LIBEVENT_THREAD));if(!threads){perror("Can't allocate thread descriptors");exit(1);}dispatcher_thread.base=main_base;dispatcher_thread.thread_id=pthread_self();for(i=0;i<nthreads;i++){intfds[2];if(pipe(fds)){//创建管道perror("Can't create notify pipe");exit(1);}threads[i].notify_receive_fd=fds[0];//读端threads[i].notify_send_fd=fds[1];//写端//创建所有workers线程的libevent实例setup_thread(&threads[i]);/* Reserve three fds for the libevent base, and two for the pipe */stats.reserved_fds+=5;}//创建线程/* Create threads after we've done all the libevent setup. */for(i=0;i<nthreads;i++){create_worker(worker_libevent,&threads[i]);}//等待所有线程启动起来之后,这个函数再返回/* Wait for all the threads to set themselves up before returning. */pthread_mutex_lock(&init_lock);wait_for_thread_registration(nthreads);pthread_mutex_unlock(&init_lock);}
/* * Set up a thread's information. */staticvoidsetup_thread(LIBEVENT_THREAD*me){me->base=event_init();if(!me->base){fprintf(stderr,"Can't allocate event base\n");exit(1);}//注意这里只有notify_receive_fd,即读端口/* Listen for notifications from other threads */event_set(&me->notify_event,me->notify_receive_fd,EV_READ|EV_PERSIST,thread_libevent_process,me);event_base_set(me->base,&me->notify_event);if(event_add(&me->notify_event,0)==-1){fprintf(stderr,"Can't monitor libevent notify pipe\n");exit(1);}me->new_conn_queue=malloc(sizeof(structconn_queue));if(me->new_conn_queue==NULL){perror("Failed to allocate memory for connection queue");exit(EXIT_FAILURE);}cq_init(me->new_conn_queue);...}
/* * Processes an incoming "handle a new connection" item. This is called when * input arrives on the libevent wakeup pipe. */staticvoidthread_libevent_process(intfd,shortwhich,void*arg){LIBEVENT_THREAD*me=arg;CQ_ITEM*item;charbuf[1];//从管道中读数据if(read(fd,buf,1)!=1)if(settings.verbose>0)fprintf(stderr,"Can't read from libevent pipe\n");switch(buf[0]){case'c'://c表示有新的连接请求被主线程分配到当前Worker线程item=cq_pop(me->new_conn_queue);if(NULL!=item){conn*c=conn_new(item->sfd,item->init_state,item->event_flags,item->read_buffer_size,item->transport,me->base);if(c==NULL){if(IS_UDP(item->transport)){fprintf(stderr,"Can't listen for events on UDP socket\n");exit(1);}else{if(settings.verbose>0){fprintf(stderr,"Can't listen for events on fd %d\n",item->sfd);}close(item->sfd);}}else{c->thread=me;}cqi_free(item);}break;/* we were told to flip the lock type and report in */case'l':...case'g':...}}
/* * Worker thread: main event loop */staticvoid*worker_libevent(void*arg){LIBEVENT_THREAD*me=arg;/* Any per-thread setup can happen here; thread_init() will block until * all threads have finished initializing. *//* set an indexable thread-specific memory item for the lock type. * this could be unnecessary if we pass the conn *c struct through * all item_lock calls... */me->item_lock_type=ITEM_LOCK_GRANULAR;pthread_setspecific(item_lock_type_key,&me->item_lock_type);register_thread_initialized();event_base_loop(me->base,0);returnNULL;}
/** * Create a socket and bind it to a specific port number * @param interface the interface to bind to * @param port the port number to bind to * @param transport the transport protocol (TCP / UDP) * @param portnumber_file A filepointer to write the port numbers to * when they are successfully added to the list of ports we * listen on. */staticintserver_socket(constchar*interface,intport,enumnetwork_transporttransport,FILE*portnumber_file){...//省略若干代码//主机名到地址解析,结果存在ai中,为addrinfo的链表error=getaddrinfo(interface,port_buf,&hints,&ai);if(error!=0){if(error!=EAI_SYSTEM)fprintf(stderr,"getaddrinfo(): %s\n",gai_strerror(error));elseperror("getaddrinfo()");return1;}for(next=ai;next;next=next->ai_next){conn*listen_conn_add;if((sfd=new_socket(next))==-1){//创建socket/* getaddrinfo can return "junk" addresses, * we make sure at least one works before erroring. */if(errno==EMFILE){/* ...unless we're out of fds */perror("server_socket");exit(EX_OSERR);}continue;}//IPV4地址,设置socket选项setsockopt(sfd,SOL_SOCKET,SO_REUSEADDR,(void*)&flags,sizeof(flags));...//省略若干代码//socket和地址绑定if(bind(sfd,next->ai_addr,next->ai_addrlen)==-1){if(errno!=EADDRINUSE){perror("bind()");close(sfd);freeaddrinfo(ai);return1;}close(sfd);continue;}else{success++;...//省略若干代码}if(IS_UDP(transport)){...//省略若干代码}else{if(!(listen_conn_add=conn_new(sfd,conn_listening,EV_READ|EV_PERSIST,1,transport,main_base))){fprintf(stderr,"failed to create listening connection\n");exit(EXIT_FAILURE);}listen_conn_add->next=listen_conn;listen_conn=listen_conn_add;}}freeaddrinfo(ai);/* Return zero iff we detected no errors in starting up connections */returnsuccess==0;}
/* * Dispatches a new connection to another thread. This is only ever called * from the main thread, either during initialization (for UDP) or because * of an incoming connection. */voiddispatch_conn_new(intsfd,enumconn_statesinit_state,intevent_flags,intread_buffer_size,enumnetwork_transporttransport){CQ_ITEM*item=cqi_new();//新申请一个CQ_ITEMcharbuf[1];if(item==NULL){close(sfd);/* given that malloc failed this may also fail, but let's try */fprintf(stderr,"Failed to allocate memory for connection object\n");return;}//分发给Worker线程inttid=(last_thread+1)%settings.num_threads;LIBEVENT_THREAD*thread=threads+tid;last_thread=tid;item->sfd=sfd;item->init_state=init_state;//注意这里的状态为conn_new_cmditem->event_flags=event_flags;item->read_buffer_size=read_buffer_size;item->transport=transport;//把新申请的CQ_ITEM放到被分配的Worker线程的队列中cq_push(thread->new_conn_queue,item);MEMCACHED_CONN_DISPATCH(sfd,thread->thread_id);buf[0]='c';//向worker thread线程的管道写入一字节的数据if(write(thread->notify_send_fd,buf,1)!=1){perror("Writing to thread notify pipe");}}
/* * Processes an incoming "handle a new connection" item. This is called when * input arrives on the libevent wakeup pipe. */staticvoidthread_libevent_process(intfd,shortwhich,void*arg){LIBEVENT_THREAD*me=arg;CQ_ITEM*item;charbuf[1];//从管道中读数据if(read(fd,buf,1)!=1)if(settings.verbose>0)fprintf(stderr,"Can't read from libevent pipe\n");switch(buf[0]){case'c'://c表示有新的连接请求被主线程分配到当前Worker线程//从当前Worker线程的连接请求队列中弹出一个请求//此对象即先前main thread线程推入new_conn_queue队列的对象item=cq_pop(me->new_conn_queue);if(NULL!=item){//根据这个CQ_ITEM对象,创建并初始化conn对象//该对象负责客户端与该worker thread线程之间的通信conn*c=conn_new(item->sfd,item->init_state,item->event_flags,item->read_buffer_size,item->transport,me->base);if(c==NULL){if(IS_UDP(item->transport)){fprintf(stderr,"Can't listen for events on UDP socket\n");exit(1);}else{if(settings.verbose>0){fprintf(stderr,"Can't listen for events on fd %d\n",item->sfd);}close(item->sfd);}}else{c->thread=me;}cqi_free(item);}break;/* we were told to flip the lock type and report in */case'l':...case'g':...}}