06 网络2

06 网络2

参考资料:9.2 I/O 多路复用:select/poll/epoll | 小林coding (xiaolincoding.com)

select

select 允许程序同时监视多个文件描述符(通常是套接字描述符),以确定是否有任何文件描述符准备好读取、写入或有错误。它是一种在单个线程中处理多个 I/O 操作的机制。

函数原型

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

参数

  • nfds: 需要监视的最大文件描述符加1。例如,如果你监视的最大文件描述符是 5,则 nfds 应该是 6。

  • readfds: 指向一个集合,包含需要监视读取状态的文件描述符。

  • writefds: 指向一个集合,包含需要监视写入状态的文件描述符。

  • exceptfds: 指向一个集合,包含需要监视异常条件的文件描述符。

  • timeout: 指向一个 timeval 结构,指定 select 调用的超时时间。如果为 NULL,则 select 会一直阻塞直到有文件描述符准备好。

返回值

  • 成功时,返回准备好的文件描述符的数量。

  • 错误时,返回-1。

fd_set 结构体

fd_set 是一个用于 select 函数的特定数据结构,它能够存储一组非负的文件描述符(通常是套接字)。这个结构体在 <sys/select.h> 头文件中定义,但它的具体实现是依赖于平台的。

  • FD_ZERO(fd_set *set): 这个宏将 fd_set 初始化为空集合,即没有任何文件描述符。

  • FD_SET(int fd, fd_set *set): 这个宏将文件描述符 fd 添加到 fd_set 集合中。

  • FD_CLR(int fd, fd_set *set): 这个宏从 fd_set 集合中移除文件描述符 fd

  • FD_ISSET(int fd, fd_set *set): 这个宏检查文件描述符 fd 是否在 fd_set 集合中。

struct timeval 结构体

timeval 结构体用于指定时间间隔,它在 <sys/time.h> 头文件中定义。select 函数使用它来设置操作的超时时间。

  • struct timeval { long tv_sec; // 秒 long tv_usec; // 微秒 };

服务器示例

#include <iostream>
#include <vector>
#include <atomic>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
#include <cerrno>
#include <cstdlib>
#include <sys/select.h>
#include <chrono>
#include <mutex>
#include <thread>

#define PORT 12345
#define MAX_CLIENTS 5
#define BUFFER_SIZE 1024

std::atomic<bool> running(true);

// 线程安全的打印
std::mutex cout_mutex;

void thread_safe_print(const std::string &msg) {
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << msg << std::endl;
}

// 处理客户端连接的函数
void handle_client(int client_socket, std::atomic<bool> &running, fd_set &readfds) {
char buffer[BUFFER_SIZE];
ssize_t bytes_read;

while (running && (bytes_read = read(client_socket, buffer, BUFFER_SIZE)) > 0) {
buffer[bytes_read] = '\0'; // 确保字符串以空字符结尾
thread_safe_print("Received: " + std::string(buffer));

// 发送响应
const char *response = buffer;
if (write(client_socket, response, strlen(response)) < 0) {
thread_safe_print("Failed to write to socket: " + std::to_string(errno));
break;
}
}

// 当没有更多数据可读时,关闭套接字并从 readfds 中移除
close(client_socket);
FD_CLR(client_socket, &readfds);
thread_safe_print("Closing client socket");
}

int main() {
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
std::vector<int> clients;
fd_set readfds;
int max_fd = -1;

// 创建套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}

// 初始化服务器地址结构
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

// 绑定套接字
if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}

// 监听
if (listen(server_fd, MAX_CLIENTS) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}

thread_safe_print("Server is listening on port " + std::to_string(PORT));

// 将 server_fd 添加到 readfds 并设置 max_fd
FD_ZERO(&readfds);
FD_SET(server_fd, &readfds);
max_fd = server_fd;

while (running) {
// 复制 readfds 以便 select 可以修改它
fd_set temp_fds = readfds;

// 设置超时时间
struct timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;

// 调用 select
int activity = select(max_fd + 1, &temp_fds, NULL, NULL, &timeout);
if (activity < 0) {
if (errno != EINTR) {
perror("select error");
break;
}
}

if (activity == 0) {
thread_safe_print("No activity detected, continuing to listen...");
using namespace std::literals;
std::this_thread::sleep_for(100ms);
continue;
}

// 检查 server_fd 是否准备好接受新连接
if (FD_ISSET(server_fd, &temp_fds)) {
client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept failed");
continue;
}
clients.push_back(client_fd); // 添加到客户端列表
FD_SET(client_fd, &readfds); // 添加到监视集合
if (client_fd > max_fd) {
max_fd = client_fd; // 更新最大文件描述符
}
thread_safe_print("New client connected");
}

// 检查所有客户端套接字
for (int i = 0; i < clients.size(); ++i) {
client_fd = clients[i];
if (FD_ISSET(client_fd, &temp_fds)) {
// 处理每个就绪的客户端套接字
handle_client(client_fd, running, readfds);
// handle_client 函数可能会关闭套接字并从 readfds 中移除
// 因此我们需要从 clients 列表中删除它
clients.erase(clients.begin() + i);
--i; // 调整索引以跳过新元素
}
}

}

// 清理和关闭
close(server_fd);
for (int client_fd : clients) {
close(client_fd);
}
running = false;
return 0;
}

poll

poll 是一种比 select 更灵活的 I/O 多路复用技术,它允许程序同时监视多个文件描述符,以确定是否有任何文件描述符准备好进行读取、写入或出现错误。与 select 不同,poll 没有最大文件描述符数量的限制,并且可以监视更多的事件类型。

函数原型

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数

  • fds: 指向一个 pollfd 结构体数组,每个结构体代表一个要监视的文件描述符。

  • nfds: 数组中的结构体数量。

  • timeout: 指定 poll 调用的超时时间(毫秒)。如果为 -1,则 poll 会一直阻塞直到有文件描述符准备好。

返回值

  • 成功时,返回准备好的文件描述符的数量。

  • 错误时,返回 -1。

struct pollfd 结构体

pollfd 结构体用于 poll 函数,它定义了要监视的文件描述符和相关的事件。

struct pollfd {
int fd; // 文件描述符
short events; // 需要监视的事件
short revents; // 发生事件的结果
};
  • events 字段可以是以下宏的组合:

    • POLLIN: 文件描述符可读。
    • POLLOUT: 文件描述符可写。
    • POLLERR: 出现错误。
    • POLLHUP: 对端关闭连接。
  • revents 字段在 poll 返回后,会设置为以下宏的组合之一:

    • POLLIN: 有数据可读。
    • POLLOUT: 可以写入数据。
    • POLLERR: 检测到错误。
    • POLLHUP: 对端关闭连接。

服务器示例

以下是一个使用 poll 的简单 echo 服务器示例:

#include <iostream>
#include <vector>
#include <sys/poll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>

#define PORT 12345
#define MAX_CLIENTS 5
#define BUFFER_SIZE 1024

void handle_client(struct pollfd& pfd)
{
while(true)
{
char buffer[BUFFER_SIZE];
ssize_t bytes_read = read(pfd.fd, buffer, BUFFER_SIZE);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
std::cout << "Received: " << buffer << std::endl;
// 发送响应
if (write(pfd.fd, buffer, bytes_read) < 0) {
perror("write failed");
}
continue;
}
else if (bytes_read <= 0) {
std::cout << "Client disconnected" << std::endl;
close(pfd.fd);
break;
}
}
}

int main() {
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
std::vector<struct pollfd> pollfds;

// 创建套接字
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}

// 初始化服务器地址结构
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

// 绑定套接字
if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}

// 监听
if (listen(server_fd, MAX_CLIENTS) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}

std::cout << "Server is listening on port " << PORT << std::endl;

// 设置 pollfd 结构体
struct pollfd pfd;
pfd.fd = server_fd;
pfd.events = POLLIN;
pollfds.push_back(pfd);

while (true) {
int activity = poll(pollfds.data(), pollfds.size(), -1);
if (activity < 0) {
perror("poll failed");
break;
}

if (activity == 0) {
std::cout << "No activity detected, continuing to listen..." << std::endl;
continue;
}

// 检查 server_fd 是否准备好接受新连接
if (pollfds[0].revents & POLLIN) {
client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept failed");
continue;
}

// 添加新客户端到 pollfds
struct pollfd pfd_client;
pfd_client.fd = client_fd;
pfd_client.events = POLLIN;
pollfds.push_back(pfd_client);

std::cout << "New client connected" << std::endl;
}

// 检查所有客户端套接字
for (size_t i = 1; i < pollfds.size(); ++i) {
if (pollfds[i].revents & POLLIN) {
handle_client(pollfds[i]);
} else if (pollfds[i].revents & POLLHUP) {
// 对端关闭了连接
std::cout << "Client disconnected" << std::endl;
close(pollfds[i].fd);
} else if (pollfds[i].revents & POLLERR) {
// 检测到错误,例如网络故障
std::cout << "Error detected on client socket" << std::endl;
close(pollfds[i].fd);
}
pollfds.erase(pollfds.begin() + i);
--i; // 调整索引以跳过新元素
}
}

// 清理和关闭
close(server_fd);
for (auto &pfd : pollfds) {
close(pfd.fd);
}
return 0;
}

epoll

了解您的需求,我会给出一个完整的 epoll 示例,包括每个函数的原型、参数说明、返回值,以及一个完整的服务器示例代码。

epoll_create

函数原型

int epoll_create(int size);

参数说明

  • size: 指定了epoll实例所能同时监控的文件描述符的数量。这个参数并不是限制最大并发数量,而是一个提示给内核的初始大小。

返回值

  • 成功时,返回一个新的 epoll 实例的文件描述符。

  • 错误时,返回 -1,并设置 errno 以指示错误。

epoll_create1

函数原型

int epoll_create1(int flags);

参数说明

  • flags: 可以是以下标志的组合:

    • EPOLL_CLOEXEC: 设置 close-on-exec 属性。
    • 0:无特殊行为

返回值

  • 成功时,返回一个新的 epoll 实例的文件描述符。

  • 错误时,返回 -1,并设置 errno 以指示错误。

epoll_ctl

函数原型

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数说明

  • epfd: epoll 实例的文件描述符。

  • op: 操作类型,可以是 EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)或 EPOLL_CTL_DEL(删除)。

  • fd: 要操作的文件描述符。

  • event: 指向 epoll_event 结构体的指针,包含了要设置的事件和用户数据。

返回值

  • 成功时,返回 0。

  • 错误时,返回 -1,并设置 errno 以指示错误。

epoll_wait

函数原型

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数说明

  • epfd: epoll 实例的文件描述符。

  • events: 指向 epoll_event 结构体数组的指针,用于接收发生的事件。

  • maxevents: events 数组的最大容量。

  • timeout: 等待时间,单位为毫秒。-1 表示无限期等待。

返回值

  • 成功时,返回数组中已填充的事件数量。

  • 错误时,返回 -1,并设置 errno 以指示错误。

epoll_event 结构体

struct epoll_event {
uint32_t events; // 要监视的事件类型
epoll_data_t data; // 用户自定义数据
};

epoll 服务器示例

#include <iostream>
#include <vector>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>

#define PORT 12345
#define MAX_CONN 10
#define MAX_BUF 1024

int main() {
int server_fd, epoll_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer[MAX_BUF];

// 创建套接字
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}

// 初始化服务器地址结构
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

// 绑定套接字
if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}

// 监听
if (listen(server_fd, MAX_CONN) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}

std::cout << "Server is listening on port " << PORT << std::endl;

// 初始化 epoll 实例
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1 failed");
exit(EXIT_FAILURE);
}

// 将 server_fd 添加到 epoll 监视列表
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) < 0) {
perror("epoll_ctl failed");
exit(EXIT_FAILURE);
}

while (true) {
struct epoll_event events[MAX_CONN];
int n = epoll_wait(epoll_fd, events, MAX_CONN, -1);
if (n < 0) {
perror("epoll_wait failed");
break;
}

for (int i = 0; i < n; ++i) {
if (events[i].data.fd == server_fd) {
// 接受新的连接
client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept failed");
continue;
}
std::cout << "New client connected" << std::endl;

// 添加新的 client_fd 到 epoll
ev.events = EPOLLIN;
ev.data.fd = client_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) < 0) {
perror("epoll_ctl failed");
close(client_fd);
}
} else {
// 读取数据
int socket_fd = events[i].data.fd;
int nbytes = read(socket_fd, buffer, MAX_BUF);
if (nbytes < 0) {
perror("read failed");
continue;
} else if (nbytes == 0) {
// 客户端关闭连接
std::cout << "Client disconnected" << std::endl;
close(socket_fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socket_fd, nullptr);
} else {
// 回显读取的数据
buffer[nbytes] = '\0';
std::cout << "Read Message:" << buffer << std::endl;
write(socket_fd, buffer, nbytes);
}
}
}
}

// 清理资源
close(server_fd);
close(epoll_fd);
return 0;
}

练习

  • file server 用epoll改写一下