C语言高性能网络编程:I/O多路复用与线程池架构实战

在网络服务开发中,C语言凭借其极致的性能表现成为构建高并发服务器的首选。本文将深入探讨epoll多路复用、零拷贝技术、线程池架构等核心优化技术,提供完整的可运行代码实现。

图片[1]-C语言高性能网络编程:I/O多路复用与线程池架构实战

一、epoll多路复用深度优化

(1) 高性能epoll服务器架构

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#define MAX_EVENTS 1024
#define BUFFER_SIZE 4096
#define BACKLOG 1024
typedef struct connection {
int fd;
struct sockaddr_in addr;
char read_buffer[BUFFER_SIZE];
char write_buffer[BUFFER_SIZE];
size_t read_len;
size_t write_len;
time_t last_active;
} connection_t;
typedef struct epoll_server {
int epoll_fd;
int listen_fd;
connection_t **connections;
int max_connections;
struct epoll_event *events;
} epoll_server_t;
// 设置非阻塞IO
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
// 初始化服务器
epoll_server_t* server_init(int port, int max_conn) {
epoll_server_t *server = malloc(sizeof(epoll_server_t));
if (!server) return NULL;
server->max_connections = max_conn;
server->connections = calloc(max_conn, sizeof(connection_t*));
server->events = malloc(sizeof(struct epoll_event) * MAX_EVENTS);
// 创建监听socket
server->listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (server->listen_fd == -1) {
perror("socket");
goto error;
}
// 设置SO_REUSEADDR
int reuse = 1;
if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR, 
&reuse, sizeof(reuse)) == -1) {
perror("setsockopt");
goto error;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(server->listen_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
perror("bind");
goto error;
}
if (listen(server->listen_fd, BACKLOG) == -1) {
perror("listen");
goto error;
}
// 创建epoll实例
server->epoll_fd = epoll_create1(0);
if (server->epoll_fd == -1) {
perror("epoll_create1");
goto error;
}
// 添加监听socket到epoll
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 边缘触发模式
ev.data.fd = server->listen_fd;
if (epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, server->listen_fd, &ev) == -1) {
perror("epoll_ctl");
goto error;
}
printf("Server initialized on port %d, max connections: %d\n", 
port, max_conn);
return server;
error:
if (server->listen_fd != -1) close(server->listen_fd);
if (server->epoll_fd != -1) close(server->epoll_fd);
free(server->events);
free(server->connections);
free(server);
return NULL;
}
// 处理新连接
void handle_accept(epoll_server_t *server) {
while (1) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
int client_fd = accept4(server->listen_fd, 
(struct sockaddr*)&client_addr,
&addr_len, SOCK_NONBLOCK);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 没有更多连接
}
perror("accept4");
break;
}
// 创建连接对象
connection_t *conn = malloc(sizeof(connection_t));
if (!conn) {
close(client_fd);
continue;
}
memset(conn, 0, sizeof(connection_t));
conn->fd = client_fd;
memcpy(&conn->addr, &client_addr, sizeof(client_addr));
conn->last_active = time(NULL);
// 添加到连接表
if (client_fd < server->max_connections) {
server->connections[client_fd] = conn;
}
// 添加到epoll监听
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
ev.data.ptr = conn;
if (epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
perror("epoll_ctl client");
free(conn);
close(client_fd);
continue;
}
printf("New connection: %s:%d, fd: %d\n",
inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port),
client_fd);
}
}
// 处理数据读取
void handle_read(epoll_server_t *server, connection_t *conn) {
while (1) {
ssize_t n = read(conn->fd, 
conn->read_buffer + conn->read_len,
BUFFER_SIZE - conn->read_len);
if (n > 0) {
conn->read_len += n;
conn->last_active = time(NULL);
// 简单处理:echo回显
if (conn->read_len > 0) {
memcpy(conn->write_buffer, conn->read_buffer, conn->read_len);
conn->write_len = conn->read_len;
conn->read_len = 0;
// 修改epoll事件为可写
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
ev.data.ptr = conn;
epoll_ctl(server->epoll_fd, EPOLL_CTL_MOD, conn->fd, &ev);
}
} else if (n == 0) {
// 连接关闭
printf("Connection closed by client\n");
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 数据读取完毕
}
perror("read");
break;
}
}
}
// 处理数据写入
void handle_write(epoll_server_t *server, connection_t *conn) {
ssize_t n = write(conn->fd, conn->write_buffer, conn->write_len);
if (n > 0) {
// 移动剩余数据
if (n < conn->write_len) {
memmove(conn->write_buffer, 
conn->write_buffer + n,
conn->write_len - n);
}
conn->write_len -= n;
conn->last_active = time(NULL);
// 如果数据全部发送完毕,改回监听读事件
if (conn->write_len == 0) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
ev.data.ptr = conn;
epoll_ctl(server->epoll_fd, EPOLL_CTL_MOD, conn->fd, &ev);
}
} else if (n == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("write");
}
}
}
// 服务器主循环
void server_run(epoll_server_t *server) {
printf("Server started, waiting for events...\n");
while (1) {
int nfds = epoll_wait(server->epoll_fd, server->events, MAX_EVENTS, 1000);
for (int i = 0; i < nfds; i++) {
struct epoll_event *ev = &server->events[i];
if (ev->data.fd == server->listen_fd) {
// 新连接
handle_accept(server);
} else {
connection_t *conn = (connection_t*)ev->data.ptr;
if (ev->events & EPOLLRDHUP) {
// 连接关闭
printf("Connection closed (EPOLLRDHUP)\n");
close(conn->fd);
free(conn);
continue;
}
if (ev->events & EPOLLIN) {
handle_read(server, conn);
}
if (ev->events & EPOLLOUT) {
handle_write(server, conn);
}
}
}
}
}
// 清理资源
void server_cleanup(epoll_server_t *server) {
if (!server) return;
for (int i = 0; i < server->max_connections; i++) {
if (server->connections[i]) {
close(server->connections[i]->fd);
free(server->connections[i]);
}
}
close(server->listen_fd);
close(server->epoll_fd);
free(server->events);
free(server->connections);
free(server);
}
int main() {
epoll_server_t *server = server_init(8080, 10000);
if (!server) {
return 1;
}
server_run(server);
server_cleanup(server);
return 0;
}

二、高性能线程池实现

(1) 线程池核心架构

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>
#define MAX_THREADS 64
#define MAX_QUEUE 65536
typedef struct task {
void (*function)(void *arg);
void *arg;
struct task *next;
} task_t;
typedef struct thread_pool {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
task_t *queue_head;
task_t *queue_tail;
int thread_count;
int queue_size;
int queue_max;
int shutdown;
int started;
} thread_pool_t;
// 创建线程池
thread_pool_t* thread_pool_create(int thread_count, int queue_size) {
if (thread_count <= 0 || thread_count > MAX_THREADS || 
queue_size <= 0 || queue_size > MAX_QUEUE) {
return NULL;
}
thread_pool_t *pool = calloc(1, sizeof(thread_pool_t));
if (!pool) return NULL;
pool->thread_count = 0;
pool->queue_size = 0;
pool->queue_max = queue_size;
pool->queue_head = pool->queue_tail = NULL;
pool->shutdown = pool->started = 0;
// 初始化锁和条件变量
if (pthread_mutex_init(&pool->lock, NULL) != 0) {
free(pool);
return NULL;
}
if (pthread_cond_init(&pool->notify, NULL) != 0) {
pthread_mutex_destroy(&pool->lock);
free(pool);
return NULL;
}
// 创建工作线程
pool->threads = malloc(sizeof(pthread_t) * thread_count);
if (!pool->threads) {
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->notify);
free(pool);
return NULL;
}
for (int i = 0; i < thread_count; i++) {
if (pthread_create(&pool->threads[i], NULL, 
thread_pool_worker, (void*)pool) != 0) {
thread_pool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
}
// 工作线程函数
void* thread_pool_worker(void *thread_pool) {
thread_pool_t *pool = (thread_pool_t*)thread_pool;
task_t *task;
for (;;) {
pthread_mutex_lock(&pool->lock);
// 等待任务或关闭信号
while (pool->queue_size == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->notify, &pool->lock);
}
// 检查是否需要退出
if (pool->shutdown && pool->queue_size == 0) {
break;
}
// 从队列获取任务
task = pool->queue_head;
if (task) {
pool->queue_head = task->next;
pool->queue_size--;
if (pool->queue_head == NULL) {
pool->queue_tail = NULL;
}
}
pthread_mutex_unlock(&pool->lock);
// 执行任务
if (task) {
task->function(task->arg);
free(task);
}
}
pool->started--;
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
return NULL;
}
// 添加任务到线程池
int thread_pool_add(thread_pool_t *pool, void (*function)(void *), void *arg) {
if (!pool || !function) return -1;
if (pthread_mutex_lock(&pool->lock) != 0) {
return -1;
}
// 检查队列是否已满
if (pool->queue_size >= pool->queue_max) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
// 创建新任务
task_t *task = malloc(sizeof(task_t));
if (!task) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
task->function = function;
task->arg = arg;
task->next = NULL;
// 添加到队列尾部
if (pool->queue_tail) {
pool->queue_tail->next = task;
} else {
pool->queue_head = task;
}
pool->queue_tail = task;
pool->queue_size++;
// 通知工作线程
if (pthread_cond_signal(&pool->notify) != 0) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
pthread_mutex_unlock(&pool->lock);
return 0;
}
// 销毁线程池
int thread_pool_destroy(thread_pool_t *pool, int graceful) {
if (!pool) return -1;
if (pthread_mutex_lock(&pool->lock) != 0) {
return -1;
}
// 避免重复销毁
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
pool->shutdown = (graceful) ? 1 : 2;
// 唤醒所有工作线程
if (pthread_cond_broadcast(&pool->notify) != 0) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
pthread_mutex_unlock(&pool->lock);
// 等待所有工作线程退出
for (int i = 0; i < pool->thread_count; i++) {
if (pthread_join(pool->threads[i], NULL) != 0) {
return -1;
}
}
// 清理剩余任务
task_t *task;
while (pool->queue_head) {
task = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(task);
}
// 释放资源
free(pool->threads);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->notify);
free(pool);
return 0;
}
// 示例任务函数
void example_task(void *arg) {
int *value = (int*)arg;
printf("Processing task with value: %d, thread: %lu\n", 
*value, pthread_self());
usleep(100000); // 模拟工作负载
free(value);
}
// 性能测试
void benchmark_thread_pool() {
thread_pool_t *pool = thread_pool_create(8, 1024);
if (!pool) {
printf("Failed to create thread pool\n");
return;
}
struct timeval start, end;
gettimeofday(&start, NULL);
// 提交1000个任务
for (int i = 0; i < 1000; i++) {
int *value = malloc(sizeof(int));
*value = i;
while (thread_pool_add(pool, example_task, value) != 0) {
usleep(1000); // 队列满时等待
}
}
// 等待所有任务完成
while (pool->queue_size > 0) {
usleep(1000);
}
gettimeofday(&end, NULL);
long seconds = end.tv_sec - start.tv_sec;
long microseconds = end.tv_usec - start.tv_usec;
double elapsed = seconds + microseconds * 1e-6;
printf("Processed 1000 tasks in %.4f seconds\n", elapsed);
printf("Throughput: %.2f tasks/second\n", 1000.0 / elapsed);
thread_pool_destroy(pool, 1);
}

三、零拷贝技术优化

(1) sendfile零拷贝文件传输

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
// 使用sendfile实现零拷贝文件传输
int send_file_zero_copy(int client_fd, const char *filename) {
int file_fd = open(filename, O_RDONLY);
if (file_fd == -1) {
perror("open");
return -1;
}
// 获取文件信息
struct stat file_stat;
if (fstat(file_fd, &file_stat) == -1) {
perror("fstat");
close(file_fd);
return -1;
}
// 发送文件大小信息(简单协议)
char header[64];
snprintf(header, sizeof(header), "FILE_SIZE:%ld\n", file_stat.st_size);
send(client_fd, header, strlen(header), 0);
// 使用sendfile零拷贝传输
off_t offset = 0;
ssize_t sent_bytes;
while (offset < file_stat.st_size) {
sent_bytes = sendfile(client_fd, file_fd, &offset, file_stat.st_size - offset);
if (sent_bytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
perror("sendfile");
close(file_fd);
return -1;
}
if (sent_bytes == 0) {
break; // 传输完成
}
}
close(file_fd);
return 0;
}
// 高性能文件服务器示例
int create_file_server(int port) {
int server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (server_fd == -1) {
perror("socket");
return -1;
}
int reuse = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
perror("bind");
close(server_fd);
return -1;
}
if (listen(server_fd, 1024) == -1) {
perror("listen");
close(server_fd);
return -1;
}
printf("File server listening on port %d\n", port);
return server_fd;
}

四、连接池与资源复用

(1) 数据库连接池实现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
typedef struct db_connection {
int id;
int in_use;
time_t last_used;
void *db_handle; // 实际数据库连接句柄
} db_connection_t;
typedef struct connection_pool {
db_connection_t *connections;
int pool_size;
int max_pool_size;
int timeout; // 连接超时时间(秒)
pthread_mutex_t lock;
pthread_cond_t available;
} connection_pool_t;
// 创建连接池
connection_pool_t* connection_pool_create(int initial_size, int max_size, int timeout) {
connection_pool_t *pool = malloc(sizeof(connection_pool_t));
if (!pool) return NULL;
pool->connections = calloc(max_size, sizeof(db_connection_t));
if (!pool->connections) {
free(pool);
return NULL;
}
pool->pool_size = initial_size;
pool->max_pool_size = max_size;
pool->timeout = timeout;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->available, NULL);
// 初始化连接
for (int i = 0; i < initial_size; i++) {
pool->connections[i].id = i;
pool->connections[i].in_use = 0;
pool->connections[i].last_used = time(NULL);
// 这里应该初始化实际的数据库连接
// pool->connections[i].db_handle = db_connect();
}
return pool;
}
// 获取数据库连接
db_connection_t* connection_pool_get(connection_pool_t *pool) {
pthread_mutex_lock(&pool->lock);
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += pool->timeout;
// 等待可用连接
int found = 0;
db_connection_t *conn = NULL;
while (!found) {
// 查找可用连接
for (int i = 0; i < pool->pool_size; i++) {
if (!pool->connections[i].in_use) {
pool->connections[i].in_use = 1;
pool->connections[i].last_used = time(NULL);
conn = &pool->connections[i];
found = 1;
break;
}
}
if (!found) {
// 如果连接池未满,创建新连接
if (pool->pool_size < pool->max_pool_size) {
int new_id = pool->pool_size++;
pool->connections[new_id].id = new_id;
pool->connections[new_id].in_use = 1;
pool->connections[new_id].last_used = time(NULL);
// pool->connections[new_id].db_handle = db_connect();
conn = &pool->connections[new_id];
found = 1;
} else {
// 等待连接释放
if (pthread_cond_timedwait(&pool->available, &pool->lock, &ts) != 0) {
pthread_mutex_unlock(&pool->lock);
return NULL; // 超时
}
}
}
}
pthread_mutex_unlock(&pool->lock);
return conn;
}
// 释放连接回连接池
void connection_pool_release(connection_pool_t *pool, db_connection_t *conn) {
if (!pool || !conn) return;
pthread_mutex_lock(&pool->lock);
conn->in_use = 0;
conn->last_used = time(NULL);
pthread_cond_signal(&pool->available);
pthread_mutex_unlock(&pool->lock);
}
// 清理空闲连接
void connection_pool_cleanup(connection_pool_t *pool) {
pthread_mutex_lock(&pool->lock);
time_t now = time(NULL);
for (int i = 0; i < pool->pool_size; i++) {
if (!pool->connections[i].in_use && 
(now - pool->connections[i].last_used) > 300) { // 5分钟超时
// 关闭实际数据库连接
// db_close(pool->connections[i].db_handle);
// 标记为无效
pool->connections[i].last_used = 0;
}
}
pthread_mutex_unlock(&pool->lock);
}

总结

C语言高性能网络编程需要从多个层面进行系统优化。通过epoll边缘触发模式减少系统调用次数,使用线程池避免频繁线程创建销毁,采用零拷贝技术降低内存拷贝开销,以及通过连接池复用昂贵资源,可以构建出支撑海量并发的高性能网络服务。

核心优化策略:

  1. I/O多路复用:epoll边缘触发模式最大化处理效率
  2. 并发架构:线程池+任务队列避免资源竞争
  3. 零拷贝传输:sendfile等技术减少内核态到用户态拷贝
  4. 资源复用:连接池管理数据库等昂贵资源
  5. 内存优化:避免不必要的内存分配和拷贝

【进阶方向】
结合RDMA(远程直接内存访问)技术进一步消除网络传输中的CPU参与,或采用DPDK用户态网络栈绕过内核协议栈,在特定场景下可获得数量级的性能提升。

© 版权声明
THE END
喜欢就支持一下吧
点赞5 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容