线程池

为了减少切换线程的开支,提前创建好多个线程。

  • 空间换时间,浪费服务器的硬件资源,换取运行效率.
  • 池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源.
  • 当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配.
  • 当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源.

threadpoolsimple.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

//任务
typedef struct _PoolTask
{
int tasknum;//模拟任务编号
void *arg;//回调函数参数
void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;

//线程池:主要包含任务队列、线程数组、空的锁、非空锁
typedef struct _ThreadPool
{
int max_job_num;//最大任务个数
int job_num;//实际任务个数
PoolTask *tasks;//任务队列数组
int job_push;//入队位置
int job_pop;// 出队位置

int thr_num;//线程池内线程个数
pthread_t *threads;//线程池内线程数组
int shutdown;//是否关闭线程池
pthread_mutex_t pool_lock;//线程池的锁
pthread_cond_t empty_task;//任务队列为空的条件
pthread_cond_t not_empty_task;//任务队列不为空的条件

}ThreadPool;

//生产者(添加任务函数)、消费者(执行任务的线程函数)
void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum 代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数

#endif

libevent的样例helloworld.c

有客户端连接到本机的9995端口时,自动发送“Hello, World!”

event_base_new

evconnlistener_new_bind

evsignal_new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
This example program provides a trivial server program that listens for TCP
connections on port 9995. When they arrive, it writes a short message to
each client connection, and closes each connection once it is flushed.

Where possible, it exits cleanly in response to a SIGINT (ctrl-c).
*/


#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef _WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
# include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif

#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>

static const char MESSAGE[] = "Hello, World!\n";

static const int PORT = 9995;

static void listener_cb(struct evconnlistener *, evutil_socket_t,
struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);

int
main(int argc, char **argv)
{
struct event_base *base;
struct evconnlistener *listener;
struct event *signal_event;

struct sockaddr_in sin = {0};
#ifdef _WIN32
WSADATA wsa_data;
WSAStartup(0x0201, &wsa_data);
#endif

base = event_base_new();
if (!base) {
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}

sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);

listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin,
sizeof(sin));

if (!listener) {
fprintf(stderr, "Could not create a listener!\n");
return 1;
}

signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);

if (!signal_event || event_add(signal_event, NULL)<0) {
fprintf(stderr, "Could not create/add a signal event!\n");
return 1;
}

event_base_dispatch(base);

evconnlistener_free(listener);
event_free(signal_event);
event_base_free(base);

printf("done\n");
return 0;
}

static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sa, int socklen, void *user_data)
{
struct event_base *base = user_data;
struct bufferevent *bev;

bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
fprintf(stderr, "Error constructing bufferevent!");
event_base_loopbreak(base);
return;
}
bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
bufferevent_enable(bev, EV_WRITE);
bufferevent_disable(bev, EV_READ);

bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}

static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
struct evbuffer *output = bufferevent_get_output(bev);
if (evbuffer_get_length(output) == 0) {
printf("flushed answer\n");
bufferevent_free(bev);
}
}

static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
if (events & BEV_EVENT_EOF) {
printf("Connection closed.\n");
} else if (events & BEV_EVENT_ERROR) {
printf("Got an error on the connection: %s\n",
strerror(errno));/*XXX win32*/
}
/* None of the other events can happen here, since we haven't enabled
* timeouts */
bufferevent_free(bev);
}

static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = user_data;
struct timeval delay = { 2, 0 };

printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");

event_base_loopexit(base, &delay);
}