Libevent介绍:是一个用 C 语言编写的轻量级、开源、高性能的事件通知库。
它的核心设计目标是简化事件驱动编程,让开发者能够轻松构建可扩展的高性能网络应用。
通过封装不同操作系统底层的高效 I/O 多路复用机制(如 Linux 的 、BSD 的
epoll、Windows 的
kqueue等),Libevent 提供了一套统一的异步事件处理 API,使得开发者只需关注业务逻辑,无需为平台差异性编写繁琐的底层代码。
IOCP
应对C10K级别并发挑战的经典方案:基于Libevent构建高并发异步网络架构
痛点:传统多线程阻塞I/O模型(如“一个连接一个线程”)在连接数激增时,会因线程上下文切换、内存占用(每个线程的栈空间)和同步锁竞争导致性能急剧下降。
核心:从阻塞多线程到时间驱动
它的事件驱动模型主要采用了三种范式:
1.事件循环(Event Loop):核心是一个单线程循环(event_base_dispatch),它负责收集所有就绪的I/O时间(如套接字可读,可写),然后集中分派给对应的回调函数处理。避免了为每个链接创建线程的开销。
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <event2/event.h>
#include <event2/util.h>
// 定时器回调函数
void timer_callback(evutil_socket_t fd, short event, void *arg)
{
printf("定时器触发: 处理定时任务
");
}
// 信号处理回调
void signal_callback(evutil_socket_t fd, short event, void *arg)
{
struct event_base *base = (struct event_base*)arg;
printf("
收到中断信号,程序将在2秒后退出...
");
struct timeval delay = {2, 0};
event_base_loopexit(base, &delay);
}
int main()
{
// 初始化libevent
struct event_base *base = event_base_new();
if (!base) {
fprintf(stderr, "创建event_base失败
");
return 1;
}
// 创建定时器事件(每1秒触发一次)
struct timeval tv = {1, 0};
struct event *timer_event = event_new(base, -1, EV_PERSIST, timer_callback, NULL);
event_add(timer_event, &tv);
// 创建信号事件(处理Ctrl+C)
struct event *signal_event = evsignal_new(base, SIGINT, signal_callback, base);
event_add(signal_event, NULL);
printf("程序运行中,按Ctrl+C退出...
");
// 启动事件循环
event_base_dispatch(base);
// 清理资源
event_free(timer_event);
event_free(signal_event);
event_base_free(base);
printf("程序正常退出
");
return 0;
}
2.非阻塞I/O(Non-blocking I/O):所有的网络操作设置为非阻塞模式。操作不会阻塞时间循环线程,若数据未就绪则立即返回,时间循环会继续监控,直到数据准备好才会触发回调。
3.I/O多路复用(I/O Mulitiplexing):Libevent封装了不同操作系统的高性能I/O多路复用机制(如Linux的、BSD的
epoll),这使得单个线程可以高效地监视成千上万个文件描述符(套接字)的状态变化。
kqueue
通过单线程事件循环+非阻塞I/O+I/O多路复用,Libevent实现的异步网络模型能够用极少线程处理大量连接,从而显著减少上下文切换和内存开销,这正是其能高效模拟支撑3000+并发连接的理论基础
Libevent实现高并发的关键技术
理解其核心思想后,来看看Libevent提供的一些关键组件和API,它们是构建高并发应用的基石:
事件基座 (event_base):这是Libevent事件循环的核心实例,通过 创建。所有事件都在其调度之下。
event_base_new()
连接监听器 (evconnlistener):用于简化TCP服务器监听套接字的创建、绑定和监听过程。通过 创建,并设置连接到达时的回调函数。
evconnlistener_new_bind()
缓冲事件 (bufferevent):这是Libevent的一个高级抽象,非常实用。它在基础I/O事件之上自动管理了输入输出缓冲区。
使用 为连接创建bufferevent。
bufferevent_socket_new()
通过 设置读、写和事件回调函数。当数据到达输入缓冲区时,读回调被触发;当输出缓冲区数据全部写出后,写回调(若有设置)被触发。
bufferevent_setcb()
使用 启用特定事件监听。
bufferevent_enable()
通过 发送数据(数据会先存入输出缓冲区,由Libevent自动写出),通过
bufferevent_write()从输入缓冲区读取数据。这大大简化了网络编程中常见的缓冲管理。
bufferevent_read()
超越单线程:主从Reactor与线程池
虽然单线程事件驱动模型能高效处理I/O,但若业务逻辑计算密集型任务很重,会阻塞事件循环。为此,生产环境常采用更高级的架构:
主从Reactor模型:使用一个主线程(通常运行主Reactor)专门通过 接受新连接。接收到新连接后,通过负载均衡策略(如Round-Robin)将其分派到一组工作线程(每个运行一个子Reactor,即独立的event_base)上进行数据读写和业务处理。这有效利用了多核CPU,并防止新连接请求阻塞业务处理。
evconnlistener
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/bufferevent.h>
#include <event2/listener.h>
#include <pthread.h>
#define THREAD_POOL_SIZE 4
#define MAX_BUFFER_SIZE 4096
// 工作线程结构
struct worker_thread {
pthread_t thread_id;
struct event_base *base;
struct event *notify_event;
int notify_receive_fd;
int notify_send_fd;
};
// 全局线程池
struct worker_thread *thread_pool = NULL;
int current_thread = 0;
// 新连接回调函数
void accept_connection_cb(struct evconnlistener *listener,
evutil_socket_t fd,
struct sockaddr *address,
int socklen, void *ctx)
{
// 轮询分配连接到工作线程
struct worker_thread *worker = &thread_pool[current_thread];
current_thread = (current_thread + 1) % THREAD_POOL_SIZE;
// 将连接描述符传递给工作线程(通过管道或socketpair)
send(worker->notify_send_fd, &fd, sizeof(fd), 0);
}
// 读取数据回调
void read_callback(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
if (len > 0) {
char buffer[MAX_BUFFER_SIZE];
size_t n = bufferevent_read(bev, buffer, sizeof(buffer)-1);
buffer[n] = '';
printf("收到数据: %s", buffer);
// 回显数据
bufferevent_write(bev, buffer, n);
}
}
// 事件回调
void event_callback(struct bufferevent *bev, short events, void *ctx)
{
if (events & BEV_EVENT_EOF) {
printf("连接关闭
");
} else if (events & BEV_EVENT_ERROR) {
printf("连接错误
");
}
bufferevent_free(bev);
}
// 工作线程函数
void* worker_thread_func(void *arg)
{
struct worker_thread *worker = (struct worker_thread*)arg;
printf("工作线程启动
");
event_base_dispatch(worker->base);
return NULL;
}
int main()
{
// 初始化线程支持
#ifdef WIN32
evthread_use_windows_threads();
#else
evthread_use_pthreads();
#endif
// 创建工作线程池[5](@ref)
thread_pool = malloc(THREAD_POOL_SIZE * sizeof(struct worker_thread));
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
struct worker_thread *worker = &thread_pool[i];
worker->base = event_base_new();
// 创建线程间通信管道[1](@ref)
int fds[2];
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
perror("创建socketpair失败");
return 1;
}
worker->notify_receive_fd = fds[0];
worker->notify_send_fd = fds[1];
// 设置管道事件监听
worker->notify_event = event_new(worker->base, fds[0],
EV_READ | EV_PERSIST,
accept_connection_cb, worker);
event_add(worker->notify_event, NULL);
// 启动工作线程
pthread_create(&worker->thread_id, NULL, worker_thread_func, worker);
}
// 主线程创建监听器[1](@ref)
struct event_base *base = event_base_new();
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(8080);
sin.sin_addr.s_addr = htonl(INADDR_ANY);
struct evconnlistener *listener = evconnlistener_new_bind(
base, accept_connection_cb, NULL,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin, sizeof(sin));
if (!listener) {
fprintf(stderr, "创建监听器失败
");
return 1;
}
printf("服务器启动,监听端口 8080...
");
event_base_dispatch(base);
// 清理资源
evconnlistener_free(listener);
event_base_free(base);
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
event_base_loopexit(thread_pool[i].base, NULL);
pthread_join(thread_pool[i].thread_id, NULL);
event_base_free(thread_pool[i].base);
}
free(thread_pool);
return 0;
}
业务线程池:对于计算密集型的业务逻辑,可以在工作线程内使用线程池。当bufferevent的读回调触发后,将解码后的业务对象放入任务队列,由专门的业务线程池处理,处理完毕后再通过事件循环线程将响应数据写回客户端。














暂无评论内容