Libevent事件驱动框架应用

Libevent介绍:是一个用 C 语言编写的轻量级、开源、高性能的事件通知库

它的核心设计目标是简化事件驱动编程,让开发者能够轻松构建可扩展的高性能网络应用。

通过封装不同操作系统底层的高效 I/O 多路复用机制(如 Linux 的
epoll
、BSD 的
kqueue
、Windows 的
IOCP
等),Libevent 提供了一套统一的异步事件处理 API,使得开发者只需关注业务逻辑,无需为平台差异性编写繁琐的底层代码。

应对C10K级别并发挑战的经典方案:基于Libevent构建高并发异步网络架构

痛点:传统多线程阻塞I/O模型​(如“一个连接一个线程”)在连接数激增时,会因线程上下文切换、内存占用(每个线程的栈空间)和同步锁竞争导致性能急剧下降。

核心:从阻塞多线程到时间驱动

它的事件驱动模型主要采用了三种范式:

1.事件循环(Event Loop):核心是一个单线程循环(event_base_dispatch),它负责收集所有就绪的I/O时间(如套接字可读,可写),然后集中分派给对应的回调函数处理。避免了为每个链接创建线程的开销。



#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <event2/event.h>
#include <event2/util.h>
 
// 定时器回调函数
void timer_callback(evutil_socket_t fd, short event, void *arg)
{
    printf("定时器触发: 处理定时任务
");
}
 
// 信号处理回调
void signal_callback(evutil_socket_t fd, short event, void *arg)
{
    struct event_base *base = (struct event_base*)arg;
    printf("
收到中断信号,程序将在2秒后退出...
");
    
    struct timeval delay = {2, 0};
    event_base_loopexit(base, &delay);
}
 
int main()
{
    // 初始化libevent
    struct event_base *base = event_base_new();
    if (!base) {
        fprintf(stderr, "创建event_base失败
");
        return 1;
    }
    
    // 创建定时器事件(每1秒触发一次)
    struct timeval tv = {1, 0};
    struct event *timer_event = event_new(base, -1, EV_PERSIST, timer_callback, NULL);
    event_add(timer_event, &tv);
    
    // 创建信号事件(处理Ctrl+C)
    struct event *signal_event = evsignal_new(base, SIGINT, signal_callback, base);
    event_add(signal_event, NULL);
    
    printf("程序运行中,按Ctrl+C退出...
");
    
    // 启动事件循环
    event_base_dispatch(base);
    
    // 清理资源
    event_free(timer_event);
    event_free(signal_event);
    event_base_free(base);
    
    printf("程序正常退出
");
    return 0;
}

2.非阻塞I/O(Non-blocking I/O):所有的网络操作设置为非阻塞模式。操作不会阻塞时间循环线程,若数据未就绪则立即返回,时间循环会继续监控,直到数据准备好才会触发回调。

3.I/O多路复用(I/O Mulitiplexing):Libevent封装了不同操作系统的高性能I/O多路复用机制(如Linux的
epoll
、BSD的
kqueue
),这使得单个线程可以高效地监视成千上万个文件描述符(套接字)的状态变化。

通过单线程事件循环+非阻塞I/O+I/O多路复用,Libevent实现的异步网络模型能够用极少线程处理大量连接,从而显著减少上下文切换和内存开销,这正是其能高效模拟支撑3000+并发连接的理论基础

Libevent实现高并发的关键技术

理解其核心思想后,来看看Libevent提供的一些关键组件和API,它们是构建高并发应用的基石:

事件基座 (event_base)​​:这是Libevent事件循环的核心实例,通过
event_base_new()
创建。所有事件都在其调度之下。

连接监听器 (evconnlistener)​​:用于简化TCP服务器监听套接字的创建、绑定和监听过程。通过
evconnlistener_new_bind()
创建,并设置连接到达时的回调函数。

缓冲事件 (bufferevent)​​:这是Libevent的一个高级抽象,非常实用。它在基础I/O事件之上自动管理了输入输出缓冲区。

使用
bufferevent_socket_new()
为连接创建bufferevent。

通过
bufferevent_setcb()
设置读、写和事件回调函数。当数据到达输入缓冲区时,读回调被触发;当输出缓冲区数据全部写出后,写回调(若有设置)被触发。

使用
bufferevent_enable()
启用特定事件监听。

通过
bufferevent_write()
发送数据(数据会先存入输出缓冲区,由Libevent自动写出),通过
bufferevent_read()
从输入缓冲区读取数据。这大大简化了网络编程中常见的缓冲管理。

超越单线程:主从Reactor与线程池

虽然单线程事件驱动模型能高效处理I/O,但若业务逻辑计算密集型任务很重,会阻塞事件循环。为此,生产环境常采用更高级的架构:

主从Reactor模型​:使用一个主线程​(通常运行主Reactor)专门通过
evconnlistener
接受新连接。接收到新连接后,通过负载均衡策略(如Round-Robin)将其分派到一组工作线程​(每个运行一个子Reactor,即独立的event_base)上进行数据读写和业务处理。这有效利用了多核CPU,并防止新连接请求阻塞业务处理。



#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/bufferevent.h>
#include <event2/listener.h>
#include <pthread.h>
 
#define THREAD_POOL_SIZE 4
#define MAX_BUFFER_SIZE 4096
 
// 工作线程结构
struct worker_thread {
    pthread_t thread_id;
    struct event_base *base;
    struct event *notify_event;
    int notify_receive_fd;
    int notify_send_fd;
};
 
// 全局线程池
struct worker_thread *thread_pool = NULL;
int current_thread = 0;
 
// 新连接回调函数
void accept_connection_cb(struct evconnlistener *listener, 
                         evutil_socket_t fd, 
                         struct sockaddr *address, 
                         int socklen, void *ctx)
{
    // 轮询分配连接到工作线程
    struct worker_thread *worker = &thread_pool[current_thread];
    current_thread = (current_thread + 1) % THREAD_POOL_SIZE;
    
    // 将连接描述符传递给工作线程(通过管道或socketpair)
    send(worker->notify_send_fd, &fd, sizeof(fd), 0);
}
 
// 读取数据回调
void read_callback(struct bufferevent *bev, void *ctx)
{
    struct evbuffer *input = bufferevent_get_input(bev);
    size_t len = evbuffer_get_length(input);
    
    if (len > 0) {
        char buffer[MAX_BUFFER_SIZE];
        size_t n = bufferevent_read(bev, buffer, sizeof(buffer)-1);
        buffer[n] = '';
        
        printf("收到数据: %s", buffer);
        
        // 回显数据
        bufferevent_write(bev, buffer, n);
    }
}
 
// 事件回调
void event_callback(struct bufferevent *bev, short events, void *ctx)
{
    if (events & BEV_EVENT_EOF) {
        printf("连接关闭
");
    } else if (events & BEV_EVENT_ERROR) {
        printf("连接错误
");
    }
    
    bufferevent_free(bev);
}
 
// 工作线程函数
void* worker_thread_func(void *arg)
{
    struct worker_thread *worker = (struct worker_thread*)arg;
    
    printf("工作线程启动
");
    event_base_dispatch(worker->base);
    
    return NULL;
}
 
int main()
{
    // 初始化线程支持
#ifdef WIN32
    evthread_use_windows_threads();
#else
    evthread_use_pthreads();
#endif
 
    // 创建工作线程池[5](@ref)
    thread_pool = malloc(THREAD_POOL_SIZE * sizeof(struct worker_thread));
    
    for (int i = 0; i < THREAD_POOL_SIZE; i++) {
        struct worker_thread *worker = &thread_pool[i];
        worker->base = event_base_new();
        
        // 创建线程间通信管道[1](@ref)
        int fds[2];
        if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
            perror("创建socketpair失败");
            return 1;
        }
        
        worker->notify_receive_fd = fds[0];
        worker->notify_send_fd = fds[1];
        
        // 设置管道事件监听
        worker->notify_event = event_new(worker->base, fds[0], 
                                       EV_READ | EV_PERSIST, 
                                       accept_connection_cb, worker);
        event_add(worker->notify_event, NULL);
        
        // 启动工作线程
        pthread_create(&worker->thread_id, NULL, worker_thread_func, worker);
    }
    
    // 主线程创建监听器[1](@ref)
    struct event_base *base = event_base_new();
    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(8080);
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    
    struct evconnlistener *listener = evconnlistener_new_bind(
        base, accept_connection_cb, NULL,
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin, sizeof(sin));
    
    if (!listener) {
        fprintf(stderr, "创建监听器失败
");
        return 1;
    }
    
    printf("服务器启动,监听端口 8080...
");
    event_base_dispatch(base);
    
    // 清理资源
    evconnlistener_free(listener);
    event_base_free(base);
    
    for (int i = 0; i < THREAD_POOL_SIZE; i++) {
        event_base_loopexit(thread_pool[i].base, NULL);
        pthread_join(thread_pool[i].thread_id, NULL);
        event_base_free(thread_pool[i].base);
    }
    
    free(thread_pool);
    return 0;
}

业务线程池​:对于计算密集型的业务逻辑,可以在工作线程内使用线程池。当bufferevent的读回调触发后,将解码后的业务对象放入任务队列,由专门的业务线程池处理,处理完毕后再通过事件循环线程将响应数据写回客户端。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容