C语言高性能网络服务器开发:从Socket到Epoll实战

本文深入探讨C语言高性能网络服务器开发的核心技术,涵盖Socket编程、I/O多路复用、线程池优化、内存管理等关键领域,通过完整的实战案例展示如何构建高并发、低延迟的网络服务器。

图片[1]-C语言高性能网络服务器开发:从Socket到Epoll实战指南

一、Linux网络编程基础与Socket API

1.1 Socket编程核心原理

/**
 * 高性能TCP服务器基础框架
 * 支持多客户端并发连接,包含错误处理和资源管理
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>

#define MAX_CLIENTS 1024
#define BUFFER_SIZE 4096
#define SERVER_PORT 8080
#define BACKLOG 128

// 客户端连接信息结构体
typedef struct {
    int fd;                     // 客户端socket文件描述符
    struct sockaddr_in addr;    // 客户端地址信息
    time_t connect_time;        // 连接时间
    size_t total_bytes;         // 总传输字节数
} client_info_t;

// 服务器全局状态
typedef struct {
    int server_fd;              // 服务器socket
    volatile int running;       // 服务器运行状态
    client_info_t *clients;     // 客户端连接数组
    pthread_mutex_t lock;       // 全局锁
    size_t client_count;        // 当前连接数
} server_state_t;

// 全局服务器状态
static server_state_t g_server;

/**
 * 初始化服务器socket
 */
int init_server_socket(int port) {
    int server_fd;
    struct sockaddr_in server_addr;
    int opt = 1;

    // 创建TCP socket
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) {
        perror("socket creation failed");
        return -1;
    }

    // 设置SO_REUSEADDR选项,避免TIME_WAIT状态
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
        perror("setsockopt SO_REUSEADDR failed");
        close(server_fd);
        return -1;
    }

    // 设置TCP_NODELAY禁用Nagle算法
    if (setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) < 0) {
        perror("setsockopt TCP_NODELAY failed");
        close(server_fd);
        return -1;
    }

    // 配置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);

    // 绑定socket
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind failed");
        close(server_fd);
        return -1;
    }

    // 开始监听
    if (listen(server_fd, BACKLOG) < 0) {
        perror("listen failed");
        close(server_fd);
        return -1;
    }

    printf("Server listening on port %d\n", port);
    return server_fd;
}

/**
 * 设置socket为非阻塞模式
 */
int set_non_blocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL failed");
        return -1;
    }
    
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL failed");
        return -1;
    }
    
    return 0;
}

/**
 * 安全的socket数据读取
 */
ssize_t safe_read(int fd, void *buffer, size_t count) {
    ssize_t n;
    size_t total = 0;
    char *buf = (char*)buffer;

    while (total < count) {
        n = read(fd, buf + total, count - total);
        
        if (n < 0) {
            if (errno == EINTR) {
                continue;  // 被信号中断,继续读取
            } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;     // 非阻塞模式下没有数据可读
            } else {
                return -1; // 读取错误
            }
        } else if (n == 0) {
            break;         // EOF,连接关闭
        }
        
        total += n;
    }
    
    return total;
}

/**
 * 安全的socket数据写入
 */
ssize_t safe_write(int fd, const void *buffer, size_t count) {
    ssize_t n;
    size_t total = 0;
    const char *buf = (const char*)buffer;

    while (total < count) {
        n = write(fd, buf + total, count - total);
        
        if (n <= 0) {
            if (errno == EINTR) {
                continue;  // 被信号中断,继续写入
            } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 等待可写事件
                struct pollfd pfd = {fd, POLLOUT, 0};
                if (poll(&pfd, 1, 5000) <= 0) { // 5秒超时
                    return -1;
                }
                continue;
            } else {
                return -1; // 写入错误
            }
        }
        
        total += n;
    }
    
    return total;
}

/**
 * 处理客户端连接
 */
void handle_client_connection(int client_fd, struct sockaddr_in *client_addr) {
    char buffer[BUFFER_SIZE];
    char client_ip[INET_ADDRSTRLEN];
    ssize_t bytes_read;
    
    // 获取客户端IP地址
    inet_ntop(AF_INET, &(client_addr->sin_addr), client_ip, INET_ADDRSTRLEN);
    printf("New connection from %s:%d\n", client_ip, ntohs(client_addr->sin_port));
    
    // 设置接收超时
    struct timeval tv;
    tv.tv_sec = 30;  // 30秒超时
    tv.tv_usec = 0;
    setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
    
    // 处理客户端请求
    while ((bytes_read = safe_read(client_fd, buffer, BUFFER_SIZE - 1)) > 0) {
        buffer[bytes_read] = '
/**
* 高性能TCP服务器基础框架
* 支持多客户端并发连接,包含错误处理和资源管理
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#define MAX_CLIENTS 1024
#define BUFFER_SIZE 4096
#define SERVER_PORT 8080
#define BACKLOG 128
// 客户端连接信息结构体
typedef struct {
int fd;                     // 客户端socket文件描述符
struct sockaddr_in addr;    // 客户端地址信息
time_t connect_time;        // 连接时间
size_t total_bytes;         // 总传输字节数
} client_info_t;
// 服务器全局状态
typedef struct {
int server_fd;              // 服务器socket
volatile int running;       // 服务器运行状态
client_info_t *clients;     // 客户端连接数组
pthread_mutex_t lock;       // 全局锁
size_t client_count;        // 当前连接数
} server_state_t;
// 全局服务器状态
static server_state_t g_server;
/**
* 初始化服务器socket
*/
int init_server_socket(int port) {
int server_fd;
struct sockaddr_in server_addr;
int opt = 1;
// 创建TCP socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("socket creation failed");
return -1;
}
// 设置SO_REUSEADDR选项,避免TIME_WAIT状态
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt SO_REUSEADDR failed");
close(server_fd);
return -1;
}
// 设置TCP_NODELAY禁用Nagle算法
if (setsockopt(server_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) < 0) {
perror("setsockopt TCP_NODELAY failed");
close(server_fd);
return -1;
}
// 配置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
// 绑定socket
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
close(server_fd);
return -1;
}
// 开始监听
if (listen(server_fd, BACKLOG) < 0) {
perror("listen failed");
close(server_fd);
return -1;
}
printf("Server listening on port %d\n", port);
return server_fd;
}
/**
* 设置socket为非阻塞模式
*/
int set_non_blocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL failed");
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL failed");
return -1;
}
return 0;
}
/**
* 安全的socket数据读取
*/
ssize_t safe_read(int fd, void *buffer, size_t count) {
ssize_t n;
size_t total = 0;
char *buf = (char*)buffer;
while (total < count) {
n = read(fd, buf + total, count - total);
if (n < 0) {
if (errno == EINTR) {
continue;  // 被信号中断,继续读取
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;     // 非阻塞模式下没有数据可读
} else {
return -1; // 读取错误
}
} else if (n == 0) {
break;         // EOF,连接关闭
}
total += n;
}
return total;
}
/**
* 安全的socket数据写入
*/
ssize_t safe_write(int fd, const void *buffer, size_t count) {
ssize_t n;
size_t total = 0;
const char *buf = (const char*)buffer;
while (total < count) {
n = write(fd, buf + total, count - total);
if (n <= 0) {
if (errno == EINTR) {
continue;  // 被信号中断,继续写入
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 等待可写事件
struct pollfd pfd = {fd, POLLOUT, 0};
if (poll(&pfd, 1, 5000) <= 0) { // 5秒超时
return -1;
}
continue;
} else {
return -1; // 写入错误
}
}
total += n;
}
return total;
}
/**
* 处理客户端连接
*/
void handle_client_connection(int client_fd, struct sockaddr_in *client_addr) {
char buffer[BUFFER_SIZE];
char client_ip[INET_ADDRSTRLEN];
ssize_t bytes_read;
// 获取客户端IP地址
inet_ntop(AF_INET, &(client_addr->sin_addr), client_ip, INET_ADDRSTRLEN);
printf("New connection from %s:%d\n", client_ip, ntohs(client_addr->sin_port));
// 设置接收超时
struct timeval tv;
tv.tv_sec = 30;  // 30秒超时
tv.tv_usec = 0;
setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
// 处理客户端请求
while ((bytes_read = safe_read(client_fd, buffer, BUFFER_SIZE - 1)) > 0) {
buffer[bytes_read] = '\0';
// 简单的echo服务器逻辑
printf("Received %zd bytes from %s: %s\n", bytes_read, client_ip, buffer);
// 回传数据给客户端
if (safe_write(client_fd, buffer, bytes_read) < 0) {
perror("write to client failed");
break;
}
// 如果是退出命令,则关闭连接
if (strncmp(buffer, "quit", 4) == 0) {
printf("Client %s requested to quit\n", client_ip);
break;
}
}
if (bytes_read < 0) {
perror("read from client failed");
}
printf("Connection closed by %s\n", client_ip);
close(client_fd);
}
'; // 简单的echo服务器逻辑 printf("Received %zd bytes from %s: %s\n", bytes_read, client_ip, buffer); // 回传数据给客户端 if (safe_write(client_fd, buffer, bytes_read) < 0) { perror("write to client failed"); break; } // 如果是退出命令,则关闭连接 if (strncmp(buffer, "quit", 4) == 0) { printf("Client %s requested to quit\n", client_ip); break; } } if (bytes_read < 0) { perror("read from client failed"); } printf("Connection closed by %s\n", client_ip); close(client_fd); }

二、I/O多路复用与Epoll高性能模型

2.1 Epoll事件驱动架构

/**
 * 基于Epoll的高性能事件驱动服务器
 * 使用边缘触发(ET)模式实现高并发处理
 */

#include <sys/epoll.h>

#define MAX_EVENTS 1024
#define EPOLL_TIMEOUT 1000  // 1秒超时

// Epoll服务器上下文
typedef struct {
    int epoll_fd;                   // Epoll实例文件描述符
    struct epoll_event *events;     // 事件数组
    int server_fd;                  // 服务器socket
    volatile int running;           // 服务器运行标志
} epoll_server_t;

/**
 * 初始化Epoll服务器
 */
int init_epoll_server(epoll_server_t *server, int port) {
    // 创建服务器socket
    server->server_fd = init_server_socket(port);
    if (server->server_fd < 0) {
        return -1;
    }
    
    // 设置服务器socket为非阻塞
    if (set_non_blocking(server->server_fd) < 0) {
        close(server->server_fd);
        return -1;
    }
    
    // 创建epoll实例
    server->epoll_fd = epoll_create1(0);
    if (server->epoll_fd < 0) {
        perror("epoll_create1 failed");
        close(server->server_fd);
        return -1;
    }
    
    // 分配事件数组内存
    server->events = malloc(MAX_EVENTS * sizeof(struct epoll_event));
    if (!server->events) {
        perror("malloc for epoll events failed");
        close(server->epoll_fd);
        close(server->server_fd);
        return -1;
    }
    
    // 添加服务器socket到epoll监听
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;  // 边缘触发模式
    ev.data.fd = server->server_fd;
    
    if (epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, server->server_fd, &ev) < 0) {
        perror("epoll_ctl add server fd failed");
        free(server->events);
        close(server->epoll_fd);
        close(server->server_fd);
        return -1;
    }
    
    server->running = 1;
    printf("Epoll server initialized successfully\n");
    return 0;
}

/**
 * 处理新的客户端连接
 */
void handle_new_connection(epoll_server_t *server) {
    struct sockaddr_in client_addr;
    socklen_t addr_len = sizeof(client_addr);
    int client_fd;
    
    // 边缘触发模式下需要接受所有连接
    while ((client_fd = accept(server->server_fd, 
                              (struct sockaddr*)&client_addr, 
                              &addr_len)) > 0) {
        // 设置客户端socket为非阻塞
        if (set_non_blocking(client_fd) < 0) {
            close(client_fd);
            continue;
        }
        
        // 添加客户端到epoll监听
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;  // 边缘触发 + 对端关闭事件
        ev.data.fd = client_fd;
        
        if (epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) < 0) {
            perror("epoll_ctl add client fd failed");
            close(client_fd);
            continue;
        }
        
        char client_ip[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, INET_ADDRSTRLEN);
        printf("Accepted new connection from %s:%d (fd: %d)\n", 
               client_ip, ntohs(client_addr.sin_port), client_fd);
    }
    
    if (client_fd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
        perror("accept failed");
    }
}

/**
 * 处理客户端数据
 */
void handle_client_data(int client_fd) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    char client_ip[INET_ADDRSTRLEN];
    struct sockaddr_in client_addr;
    socklen_t addr_len = sizeof(client_addr);
    
    // 获取客户端地址信息
    if (getpeername(client_fd, (struct sockaddr*)&client_addr, &addr_len) == 0) {
        inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, INET_ADDRSTRLEN);
    } else {
        strcpy(client_ip, "unknown");
    }
    
    // 边缘触发模式下需要读取所有可用数据
    while ((bytes_read = read(client_fd, buffer, BUFFER_SIZE - 1)) > 0) {
        buffer[bytes_read] = '
/**
* 基于Epoll的高性能事件驱动服务器
* 使用边缘触发(ET)模式实现高并发处理
*/
#include <sys/epoll.h>
#define MAX_EVENTS 1024
#define EPOLL_TIMEOUT 1000  // 1秒超时
// Epoll服务器上下文
typedef struct {
int epoll_fd;                   // Epoll实例文件描述符
struct epoll_event *events;     // 事件数组
int server_fd;                  // 服务器socket
volatile int running;           // 服务器运行标志
} epoll_server_t;
/**
* 初始化Epoll服务器
*/
int init_epoll_server(epoll_server_t *server, int port) {
// 创建服务器socket
server->server_fd = init_server_socket(port);
if (server->server_fd < 0) {
return -1;
}
// 设置服务器socket为非阻塞
if (set_non_blocking(server->server_fd) < 0) {
close(server->server_fd);
return -1;
}
// 创建epoll实例
server->epoll_fd = epoll_create1(0);
if (server->epoll_fd < 0) {
perror("epoll_create1 failed");
close(server->server_fd);
return -1;
}
// 分配事件数组内存
server->events = malloc(MAX_EVENTS * sizeof(struct epoll_event));
if (!server->events) {
perror("malloc for epoll events failed");
close(server->epoll_fd);
close(server->server_fd);
return -1;
}
// 添加服务器socket到epoll监听
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 边缘触发模式
ev.data.fd = server->server_fd;
if (epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, server->server_fd, &ev) < 0) {
perror("epoll_ctl add server fd failed");
free(server->events);
close(server->epoll_fd);
close(server->server_fd);
return -1;
}
server->running = 1;
printf("Epoll server initialized successfully\n");
return 0;
}
/**
* 处理新的客户端连接
*/
void handle_new_connection(epoll_server_t *server) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
int client_fd;
// 边缘触发模式下需要接受所有连接
while ((client_fd = accept(server->server_fd, 
(struct sockaddr*)&client_addr, 
&addr_len)) > 0) {
// 设置客户端socket为非阻塞
if (set_non_blocking(client_fd) < 0) {
close(client_fd);
continue;
}
// 添加客户端到epoll监听
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;  // 边缘触发 + 对端关闭事件
ev.data.fd = client_fd;
if (epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) < 0) {
perror("epoll_ctl add client fd failed");
close(client_fd);
continue;
}
char client_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, INET_ADDRSTRLEN);
printf("Accepted new connection from %s:%d (fd: %d)\n", 
client_ip, ntohs(client_addr.sin_port), client_fd);
}
if (client_fd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
perror("accept failed");
}
}
/**
* 处理客户端数据
*/
void handle_client_data(int client_fd) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read;
char client_ip[INET_ADDRSTRLEN];
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
// 获取客户端地址信息
if (getpeername(client_fd, (struct sockaddr*)&client_addr, &addr_len) == 0) {
inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, INET_ADDRSTRLEN);
} else {
strcpy(client_ip, "unknown");
}
// 边缘触发模式下需要读取所有可用数据
while ((bytes_read = read(client_fd, buffer, BUFFER_SIZE - 1)) > 0) {
buffer[bytes_read] = '\0';
printf("Received %zd bytes from %s: %s\n", bytes_read, client_ip, buffer);
// 简单的HTTP响应处理
if (strstr(buffer, "HTTP")) {
const char *http_response = 
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: 13\r\n"
"Connection: close\r\n"
"\r\n"
"Hello, World!";
if (safe_write(client_fd, http_response, strlen(http_response)) < 0) {
perror("write HTTP response failed");
break;
}
} else {
// Echo回传
if (safe_write(client_fd, buffer, bytes_read) < 0) {
perror("echo to client failed");
break;
}
}
// 检查退出命令
if (strncmp(buffer, "quit", 4) == 0) {
printf("Client %s requested to quit\n", client_ip);
break;
}
}
if (bytes_read == 0) {
printf("Client %s closed connection\n", client_ip);
} else if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
perror("read from client failed");
}
}
/**
* Epoll事件循环
*/
void epoll_event_loop(epoll_server_t *server) {
int nfds, i;
printf("Starting epoll event loop...\n");
while (server->running) {
// 等待事件,超时1秒
nfds = epoll_wait(server->epoll_fd, server->events, MAX_EVENTS, EPOLL_TIMEOUT);
if (nfds < 0) {
if (errno == EINTR) {
continue;  // 被信号中断
}
perror("epoll_wait failed");
break;
}
// 处理所有就绪的事件
for (i = 0; i < nfds; i++) {
int fd = server->events[i].data.fd;
uint32_t events = server->events[i].events;
// 检查错误事件
if (events & (EPOLLERR | EPOLLHUP)) {
printf("Error occurred on fd %d, closing connection\n", fd);
close(fd);
continue;
}
// 服务器socket可读,表示有新连接
if (fd == server->server_fd) {
handle_new_connection(server);
} 
// 客户端socket可读
else if (events & EPOLLIN) {
handle_client_data(fd);
}
// 对端关闭连接
else if (events & EPOLLRDHUP) {
printf("Client fd %d closed connection\n", fd);
close(fd);
}
}
}
}
/**
* 清理Epoll服务器资源
*/
void cleanup_epoll_server(epoll_server_t *server) {
server->running = 0;
if (server->events) {
free(server->events);
server->events = NULL;
}
if (server->epoll_fd >= 0) {
close(server->epoll_fd);
server->epoll_fd = -1;
}
if (server->server_fd >= 0) {
close(server->server_fd);
server->server_fd = -1;
}
printf("Epoll server cleaned up\n");
}
'; printf("Received %zd bytes from %s: %s\n", bytes_read, client_ip, buffer); // 简单的HTTP响应处理 if (strstr(buffer, "HTTP")) { const char *http_response = "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: 13\r\n" "Connection: close\r\n" "\r\n" "Hello, World!"; if (safe_write(client_fd, http_response, strlen(http_response)) < 0) { perror("write HTTP response failed"); break; } } else { // Echo回传 if (safe_write(client_fd, buffer, bytes_read) < 0) { perror("echo to client failed"); break; } } // 检查退出命令 if (strncmp(buffer, "quit", 4) == 0) { printf("Client %s requested to quit\n", client_ip); break; } } if (bytes_read == 0) { printf("Client %s closed connection\n", client_ip); } else if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { perror("read from client failed"); } } /** * Epoll事件循环 */ void epoll_event_loop(epoll_server_t *server) { int nfds, i; printf("Starting epoll event loop...\n"); while (server->running) { // 等待事件,超时1秒 nfds = epoll_wait(server->epoll_fd, server->events, MAX_EVENTS, EPOLL_TIMEOUT); if (nfds < 0) { if (errno == EINTR) { continue; // 被信号中断 } perror("epoll_wait failed"); break; } // 处理所有就绪的事件 for (i = 0; i < nfds; i++) { int fd = server->events[i].data.fd; uint32_t events = server->events[i].events; // 检查错误事件 if (events & (EPOLLERR | EPOLLHUP)) { printf("Error occurred on fd %d, closing connection\n", fd); close(fd); continue; } // 服务器socket可读,表示有新连接 if (fd == server->server_fd) { handle_new_connection(server); } // 客户端socket可读 else if (events & EPOLLIN) { handle_client_data(fd); } // 对端关闭连接 else if (events & EPOLLRDHUP) { printf("Client fd %d closed connection\n", fd); close(fd); } } } } /** * 清理Epoll服务器资源 */ void cleanup_epoll_server(epoll_server_t *server) { server->running = 0; if (server->events) { free(server->events); server->events = NULL; } if (server->epoll_fd >= 0) { close(server->epoll_fd); server->epoll_fd = -1; } if (server->server_fd >= 0) { close(server->server_fd); server->server_fd = -1; } printf("Epoll server cleaned up\n"); }

三、高性能线程池实现

3.1 线程池架构与任务调度

/**
* 高性能线程池实现
* 支持动态线程管理、任务队列、优雅关闭
*/
#include <pthread.h>
#define MAX_THREADS 64
#define MAX_QUEUE 65536
#define THREAD_TIMEOUT 60  // 线程空闲超时(秒)
// 任务结构体
typedef struct task {
void (*function)(void *arg);  // 任务函数指针
void *arg;                    // 任务参数
struct task *next;            // 下一个任务
} task_t;
// 任务队列
typedef struct {
task_t *front;                // 队列头
task_t *rear;                 // 队列尾
pthread_mutex_t lock;         // 队列锁
pthread_cond_t cond;          // 条件变量
size_t count;                 // 任务数量
size_t queue_size;            // 队列大小限制
} task_queue_t;
// 线程池结构体
typedef struct {
pthread_t *threads;           // 线程数组
task_queue_t queue;           // 任务队列
volatile int shutdown;        // 关闭标志
volatile int active_threads;  // 活跃线程数
size_t min_threads;           // 最小线程数
size_t max_threads;           // 最大线程数
size_t thread_count;          // 当前线程数
} thread_pool_t;
/**
* 初始化任务队列
*/
int task_queue_init(task_queue_t *queue, size_t max_size) {
queue->front = queue->rear = NULL;
queue->count = 0;
queue->queue_size = max_size;
if (pthread_mutex_init(&queue->lock, NULL) != 0) {
return -1;
}
if (pthread_cond_init(&queue->cond, NULL) != 0) {
pthread_mutex_destroy(&queue->lock);
return -1;
}
return 0;
}
/**
* 向任务队列添加任务
*/
int task_queue_push(task_queue_t *queue, void (*function)(void *), void *arg) {
task_t *new_task = malloc(sizeof(task_t));
if (!new_task) {
return -1;
}
new_task->function = function;
new_task->arg = arg;
new_task->next = NULL;
pthread_mutex_lock(&queue->lock);
// 检查队列是否已满
if (queue->count >= queue->queue_size) {
pthread_mutex_unlock(&queue->lock);
free(new_task);
return -1;  // 队列已满
}
// 添加任务到队列
if (queue->rear == NULL) {
queue->front = queue->rear = new_task;
} else {
queue->rear->next = new_task;
queue->rear = new_task;
}
queue->count++;
// 通知等待的线程
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock(&queue->lock);
return 0;
}
/**
* 从任务队列获取任务
*/
task_t *task_queue_pop(task_queue_t *queue, int timeout_seconds) {
struct timespec ts;
task_t *task = NULL;
pthread_mutex_lock(&queue->lock);
// 等待任务或超时
while (queue->count == 0 && timeout_seconds != 0) {
if (timeout_seconds > 0) {
// 计算绝对超时时间
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout_seconds;
if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts) == ETIMEDOUT) {
break;
}
} else {
// 无限等待
pthread_cond_wait(&queue->cond, &queue->lock);
}
}
// 获取任务
if (queue->count > 0) {
task = queue->front;
queue->front = task->next;
if (queue->front == NULL) {
queue->rear = NULL;
}
queue->count--;
}
pthread_mutex_unlock(&queue->lock);
return task;
}
/**
* 销毁任务队列
*/
void task_queue_destroy(task_queue_t *queue) {
task_t *task;
pthread_mutex_lock(&queue->lock);
// 清理队列中剩余的任务
while (queue->front != NULL) {
task = queue->front;
queue->front = task->next;
free(task->arg);  // 释放任务参数
free(task);
}
pthread_mutex_unlock(&queue->lock);
pthread_mutex_destroy(&queue->lock);
pthread_cond_destroy(&queue->cond);
}
/**
* 线程工作函数
*/
void *thread_worker(void *arg) {
thread_pool_t *pool = (thread_pool_t *)arg;
task_t *task;
while (1) {
// 获取任务(带超时)
task = task_queue_pop(&pool->queue, THREAD_TIMEOUT);
if (task) {
// 执行任务
pool->active_threads++;
task->function(task->arg);
pool->active_threads--;
// 清理任务内存
free(task->arg);
free(task);
} else {
// 超时或线程池关闭
if (pool->shutdown) {
break;
}
// 动态线程管理:如果空闲线程过多,退出当前线程
pthread_mutex_lock(&pool->queue.lock);
if (pool->thread_count > pool->min_threads && 
pool->active_threads < pool->thread_count / 2) {
pool->thread_count--;
pthread_mutex_unlock(&pool->queue.lock);
break;
}
pthread_mutex_unlock(&pool->queue.lock);
}
}
return NULL;
}
/**
* 初始化线程池
*/
int thread_pool_init(thread_pool_t *pool, size_t min_threads, size_t max_threads, size_t queue_size) {
if (min_threads <= 0 || max_threads < min_threads || max_threads > MAX_THREADS) {
return -1;
}
pool->min_threads = min_threads;
pool->max_threads = max_threads;
pool->thread_count = 0;
pool->active_threads = 0;
pool->shutdown = 0;
// 初始化任务队列
if (task_queue_init(&pool->queue, queue_size) != 0) {
return -1;
}
// 分配线程数组内存
pool->threads = malloc(max_threads * sizeof(pthread_t));
if (!pool->threads) {
task_queue_destroy(&pool->queue);
return -1;
}
// 创建初始线程
for (size_t i = 0; i < min_threads; i++) {
if (pthread_create(&pool->threads[i], NULL, thread_worker, pool) != 0) {
// 创建失败,清理已创建的线程
pool->shutdown = 1;
for (size_t j = 0; j < i; j++) {
pthread_join(pool->threads[j], NULL);
}
free(pool->threads);
task_queue_destroy(&pool->queue);
return -1;
}
pool->thread_count++;
}
printf("Thread pool initialized with %zu threads\n", pool->thread_count);
return 0;
}
/**
* 向线程池提交任务
*/
int thread_pool_submit(thread_pool_t *pool, void (*function)(void *), void *arg) {
if (pool->shutdown) {
return -1;
}
// 动态创建新线程(如果需要)
pthread_mutex_lock(&pool->queue.lock);
if (pool->queue.count > pool->thread_count * 2 && 
pool->thread_count < pool->max_threads) {
if (pthread_create(&pool->threads[pool->thread_count], NULL, 
thread_worker, pool) == 0) {
pool->thread_count++;
printf("Created new thread, total: %zu\n", pool->thread_count);
}
}
pthread_mutex_unlock(&pool->queue.lock);
return task_queue_push(&pool->queue, function, arg);
}
/**
* 优雅关闭线程池
*/
void thread_pool_shutdown(thread_pool_t *pool) {
pool->shutdown = 1;
// 唤醒所有等待的线程
pthread_mutex_lock(&pool->queue.lock);
pthread_cond_broadcast(&pool->queue.cond);
pthread_mutex_unlock(&pool->queue.lock);
// 等待所有线程退出
for (size_t i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
// 清理资源
free(pool->threads);
task_queue_destroy(&pool->queue);
printf("Thread pool shutdown completed\n");
}

四、内存管理与性能优化

4.1 对象池与内存复用

/**
* 高性能内存池实现
* 减少malloc/free调用,提高内存分配效率
*/
#include <stdint.h>
#define MEMORY_POOL_SIZE 1024 * 1024 * 64  // 64MB内存池
#define BLOCK_SIZE 4096                    // 4KB内存块
// 内存块结构
typedef struct memory_block {
struct memory_block *next;     // 下一个内存块
size_t size;                   // 块大小
uint8_t data[];                // 数据区域(柔性数组)
} memory_block_t;
// 内存池结构
typedef struct {
uint8_t *memory;               // 内存池起始地址
size_t total_size;             // 总大小
size_t used_size;              // 已使用大小
memory_block_t *free_blocks;   // 空闲块链表
memory_block_t *used_blocks;   // 已使用块链表
pthread_mutex_t lock;          // 线程安全锁
} memory_pool_t;
/**
* 初始化内存池
*/
int memory_pool_init(memory_pool_t *pool, size_t size) {
// 分配对齐的内存
if (posix_memalign((void**)&pool->memory, 64, size) != 0) {
return -1;
}
pool->total_size = size;
pool->used_size = 0;
pool->free_blocks = NULL;
pool->used_blocks = NULL;
// 初始化整个内存池为一个大的空闲块
memory_block_t *initial_block = (memory_block_t*)pool->memory;
initial_block->next = NULL;
initial_block->size = size - sizeof(memory_block_t);
pool->free_blocks = initial_block;
if (pthread_mutex_init(&pool->lock, NULL) != 0) {
free(pool->memory);
return -1;
}
printf("Memory pool initialized with %zu bytes\n", size);
return 0;
}
/**
* 从内存池分配内存
*/
void *memory_pool_alloc(memory_pool_t *pool, size_t size) {
memory_block_t *block, *prev, *best_block, *best_prev;
size_t required_size = size + sizeof(memory_block_t);
pthread_mutex_lock(&pool->lock);
// 最佳适配算法
best_block = NULL;
best_prev = NULL;
prev = NULL;
block = pool->free_blocks;
while (block) {
if (block->size >= size) {
if (!best_block || block->size < best_block->size) {
best_block = block;
best_prev = prev;
}
}
prev = block;
block = block->next;
}
if (!best_block) {
pthread_mutex_unlock(&pool->lock);
return NULL;  // 内存不足
}
// 从空闲链表移除最佳块
if (best_prev) {
best_prev->next = best_block->next;
} else {
pool->free_blocks = best_block->next;
}
// 如果剩余空间足够大,分割内存块
if (best_block->size >= required_size + sizeof(memory_block_t) + BLOCK_SIZE) {
memory_block_t *new_block = (memory_block_t*)((uint8_t*)best_block + required_size);
new_block->size = best_block->size - required_size;
new_block->next = pool->free_blocks;
pool->free_blocks = new_block;
best_block->size = size;
}
// 添加到已使用链表
best_block->next = pool->used_blocks;
pool->used_blocks = best_block;
pool->used_size += best_block->size + sizeof(memory_block_t);
pthread_mutex_unlock(&pool->lock);
return best_block->data;
}
/**
* 释放内存回内存池
*/
void memory_pool_free(memory_pool_t *pool, void *ptr) {
if (!ptr) return;
memory_block_t *block = (memory_block_t*)((uint8_t*)ptr - sizeof(memory_block_t));
pthread_mutex_lock(&pool->lock);
// 从已使用链表移除
memory_block_t *curr = pool->used_blocks;
memory_block_t *prev = NULL;
while (curr && curr != block) {
prev = curr;
curr = curr->next;
}
if (!curr) {
pthread_mutex_unlock(&pool->lock);
return;  // 未找到该块
}
if (prev) {
prev->next = curr->next;
} else {
pool->used_blocks = curr->next;
}
// 添加到空闲链表
block->next = pool->free_blocks;
pool->free_blocks = block;
pool->used_size -= block->size + sizeof(memory_block_t);
// 合并相邻空闲块(可选优化)
// memory_pool_coalesce(pool);
pthread_mutex_unlock(&pool->lock);
}
/**
* 销毁内存池
*/
void memory_pool_destroy(memory_pool_t *pool) {
pthread_mutex_destroy(&pool->lock);
free(pool->memory);
pool->memory = NULL;
pool->total_size = 0;
pool->used_size = 0;
pool->free_blocks = NULL;
pool->used_blocks = NULL;
printf("Memory pool destroyed\n");
}
/**
* 连接池管理(数据库连接、网络连接等)
*/
typedef struct {
void **connections;           // 连接数组
size_t pool_size;             // 连接池大小
size_t used_count;            // 已使用连接数
pthread_mutex_t lock;         // 连接池锁
pthread_cond_t cond;          // 条件变量
} connection_pool_t;
/**
* 初始化连接池
*/
int connection_pool_init(connection_pool_t *pool, size_t size, 
void* (*create_connection)(void)) {
pool->connections = malloc(size * sizeof(void*));
if (!pool->connections) {
return -1;
}
pool->pool_size = size;
pool->used_count = 0;
// 创建初始连接
for (size_t i = 0; i < size; i++) {
pool->connections[i] = create_connection();
if (!pool->connections[i]) {
// 创建失败,清理已创建的连接
for (size_t j = 0; j < i; j++) {
free(pool->connections[j]);
}
free(pool->connections);
return -1;
}
}
if (pthread_mutex_init(&pool->lock, NULL) != 0) {
for (size_t i = 0; i < size; i++) {
free(pool->connections[i]);
}
free(pool->connections);
return -1;
}
if (pthread_cond_init(&pool->cond, NULL) != 0) {
pthread_mutex_destroy(&pool->lock);
for (size_t i = 0; i < size; i++) {
free(pool->connections[i]);
}
free(pool->connections);
return -1;
}
printf("Connection pool initialized with %zu connections\n", size);
return 0;
}
/**
* 从连接池获取连接
*/
void *connection_pool_get(connection_pool_t *pool, int timeout_ms) {
struct timespec ts;
void *conn = NULL;
pthread_mutex_lock(&pool->lock);
// 等待可用连接
while (pool->used_count >= pool->pool_size) {
if (timeout_ms > 0) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_nsec += (timeout_ms % 1000) * 1000000;
ts.tv_sec += timeout_ms / 1000 + ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
if (pthread_cond_timedwait(&pool->cond, &pool->lock, &ts) == ETIMEDOUT) {
pthread_mutex_unlock(&pool->lock);
return NULL;
}
} else {
pthread_cond_wait(&pool->cond, &pool->lock);
}
}
// 获取第一个可用连接
conn = pool->connections[pool->used_count];
pool->used_count++;
pthread_mutex_unlock(&pool->lock);
return conn;
}
/**
* 释放连接回连接池
*/
void connection_pool_release(connection_pool_t *pool, void *conn) {
pthread_mutex_lock(&pool->lock);
// 将连接放回池中
if (pool->used_count > 0) {
pool->used_count--;
pool->connections[pool->used_count] = conn;
// 通知等待的线程
pthread_cond_signal(&pool->cond);
}
pthread_mutex_unlock(&pool->lock);
}

五、完整服务器集成示例

5.1 高性能HTTP服务器

/**
* 完整的高性能HTTP服务器示例
* 集成Epoll、线程池、内存池等技术
*/
#include <sys/stat.h>
#include <fcntl.h>
// HTTP请求结构
typedef struct {
int client_fd;
char method[16];
char path[256];
char version[16];
char headers[2048];
size_t content_length;
} http_request_t;
// HTTP响应结构
typedef struct {
int status_code;
const char *status_text;
const char *content_type;
size_t content_length;
const char *body;
} http_response_t;
// 全局服务器上下文
typedef struct {
epoll_server_t epoll_server;
thread_pool_t thread_pool;
memory_pool_t memory_pool;
connection_pool_t db_connection_pool;  // 数据库连接池(可选)
} server_context_t;
/**
* 解析HTTP请求
*/
int parse_http_request(const char *request, http_request_t *parsed) {
char line[1024];
const char *ptr = request;
int line_num = 0;
while (*ptr && sscanf(ptr, "%[^\r\n]\r\n", line) == 1) {
ptr += strlen(line) + 2;  // 跳过\r\n
if (line_num == 0) {
// 解析请求行: METHOD PATH VERSION
if (sscanf(line, "%15s %255s %15s", 
parsed->method, parsed->path, parsed->version) != 3) {
return -1;
}
} else if (strlen(line) == 0) {
// 空行,头部结束
break;
} else {
// 解析头部字段
if (strstr(line, "Content-Length:") == line) {
sscanf(line, "Content-Length: %zu", &parsed->content_length);
}
// 可以添加其他重要头部的解析
}
line_num++;
}
return 0;
}
/**
* 构建HTTP响应
*/
size_t build_http_response(const http_response_t *response, char *buffer, size_t buffer_size) {
int len = snprintf(buffer, buffer_size,
"HTTP/1.1 %d %s\r\n"
"Content-Type: %s\r\n"
"Content-Length: %zu\r\n"
"Connection: close\r\n"
"Server: High-Performance-C-Server/1.0\r\n"
"\r\n"
"%s",
response->status_code,
response->status_text,
response->content_type,
response->content_length,
response->body);
return len < buffer_size ? len : buffer_size;
}
/**
* 处理静态文件请求
*/
int serve_static_file(const char *path, http_response_t *response) {
struct stat st;
int fd;
char *file_content;
// 安全检查:防止路径遍历攻击
if (strstr(path, "..")) {
return -1;
}
// 构建完整文件路径
char full_path[512];
snprintf(full_path, sizeof(full_path), "./www%s", path);
// 如果是目录,默认返回index.html
if (stat(full_path, &st) == 0 && S_ISDIR(st.st_mode)) {
strcat(full_path, "/index.html");
}
// 打开文件
fd = open(full_path, O_RDONLY);
if (fd < 0) {
return -1;  // 文件不存在
}
// 获取文件信息
if (fstat(fd, &st) < 0) {
close(fd);
return -1;
}
// 分配内存读取文件内容
file_content = malloc(st.st_size + 1);
if (!file_content) {
close(fd);
return -1;
}
// 读取文件内容
if (read(fd, file_content, st.st_size) != st.st_size) {
free(file_content);
close(fd);
return -1;
}
file_content[st.st_size] = '\0';
close(fd);
// 设置响应
response->status_code = 200;
response->status_text = "OK";
response->content_length = st.st_size;
response->body = file_content;
// 根据文件扩展名设置Content-Type
const char *ext = strrchr(path, '.');
if (ext) {
if (strcmp(ext, ".html") == 0) {
response->content_type = "text/html";
} else if (strcmp(ext, ".css") == 0) {
response->content_type = "text/css";
} else if (strcmp(ext, ".js") == 0) {
response->content_type = "application/javascript";
} else if (strcmp(ext, ".png") == 0) {
response->content_type = "image/png";
} else if (strcmp(ext, ".jpg") == 0) {
response->content_type = "image/jpeg";
} else {
response->content_type = "text/plain";
}
} else {
response->content_type = "text/plain";
}
return 0;
}
/**
* HTTP请求处理函数(线程池任务)
*/
void http_request_handler(void *arg) {
http_request_t *request = (http_request_t *)arg;
http_response_t response = {0};
char response_buffer[BUFFER_SIZE];
size_t response_len;
printf("Handling HTTP request: %s %s\n", request->method, request->path);
// 只支持GET方法
if (strcmp(request->method, "GET") != 0) {
response.status_code = 405;
response.status_text = "Method Not Allowed";
response.content_type = "text/plain";
response.content_length = 19;
response.body = "Method Not Allowed";
} else {
// 尝试提供静态文件
if (serve_static_file(request->path, &response) != 0) {
// 文件不存在,返回404
response.status_code = 404;
response.status_text = "Not Found";
response.content_type = "text/html";
response.content_length = 48;
response.body = "<html><body><h1>404 Not Found</h1></body></html>";
}
}
// 构建HTTP响应
response_len = build_http_response(&response, response_buffer, sizeof(response_buffer));
// 发送响应
if (safe_write(request->client_fd, response_buffer, response_len) < 0) {
perror("send HTTP response failed");
}
// 清理资源
if (response.body && response.status_code == 200) {
free((void*)response.body);  // 静态文件内容需要释放
}
close(request->client_fd);
free(request);
}
/**
* 信号处理函数
*/
void signal_handler(int sig) {
printf("\nReceived signal %d, shutting down...\n", sig);
g_server.running = 0;
}
/**
* 主服务器函数
*/
int main(int argc, char *argv[]) {
server_context_t server_ctx = {0};
int port = SERVER_PORT;
// 解析命令行参数
if (argc > 1) {
port = atoi(argv[1]);
if (port <= 0 || port > 65535) {
fprintf(stderr, "Invalid port number: %d\n", port);
return 1;
}
}
// 设置信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
printf("Starting high-performance HTTP server on port %d...\n", port);
// 初始化内存池
if (memory_pool_init(&server_ctx.memory_pool, MEMORY_POOL_SIZE) != 0) {
fprintf(stderr, "Failed to initialize memory pool\n");
return 1;
}
// 初始化线程池
if (thread_pool_init(&server_ctx.thread_pool, 4, 16, 1000) != 0) {
fprintf(stderr, "Failed to initialize thread pool\n");
memory_pool_destroy(&server_ctx.memory_pool);
return 1;
}
// 初始化Epoll服务器
if (init_epoll_server(&server_ctx.epoll_server, port) != 0) {
fprintf(stderr, "Failed to initialize epoll server\n");
thread_pool_shutdown(&server_ctx.thread_pool);
memory_pool_destroy(&server_ctx.memory_pool);
return 1;
}
// 修改客户端数据处理函数,使用线程池
// 这里需要修改之前的handle_client_data函数
// 将HTTP请求解析和响应处理放到线程池中执行
printf("Server started successfully. Press Ctrl+C to stop.\n");
// 启动Epoll事件循环
epoll_event_loop(&server_ctx.epoll_server);
// 清理资源
printf("Shutting down server...\n");
cleanup_epoll_server(&server_ctx.epoll_server);
thread_pool_shutdown(&server_ctx.thread_pool);
memory_pool_destroy(&server_ctx.memory_pool);
printf("Server shutdown completed.\n");
return 0;
}

总结

通过本文的完整实现,我们构建了一个高性能的C语言网络服务器,具备以下特点:

  1. 高并发处理 – 使用Epoll边缘触发模式,支持大量并发连接
  2. 线程安全 – 集成线程池,避免频繁创建销毁线程的开销
  3. 内存高效 – 实现内存池和连接池,减少系统调用
  4. 协议完整 – 支持HTTP协议,可扩展其他应用层协议
  5. 生产就绪 – 包含错误处理、资源管理和优雅关闭

这套架构可以作为各种网络服务的基础,如Web服务器、API网关、实时通信系统等。

© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容