经典的C++ Linux网络编程项目.
不相关的话:
C++后端相关项目集中体现在:Linux环境编程,网络编程,并发(多线程)编程
常见项目就是webserver以及基于moduo等库进行业务开发(聊天服务,结合mysql,redis)等.比如下面项目:
并发同步方法
在多线程并发条件下,对于共享资源,需要用锁机制解决.在c++标准中,有mutex,condition_variable以及新增的barrier,latch,counting_semaphore等机制.
这里使用了linux上的信号量sem_t和pthread_mutex_t锁机制.
sem_t
#include <iostream>
#include <semaphore.h>
#include <thread>
#include <chrono>
sem_t sem;
void task(const char* name, int waitSeconds) {
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
sem_wait(&sem); // 尝试获取信号量
std::cout << "Task " << name << " is running.\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
sem_post(&sem); // 释放信号量
}
int main() {
sem_init(&sem, 0, 2); // 初始化信号量,初始值为2
std::thread t1(task, "A", 1);
std::thread t2(task, "B", 2);
std::thread t3(task, "C", 3);
t1.join();
t2.join();
t3.join();
sem_destroy(&sem); // 销毁信号量
return 0;
}
sem_wait 函数用于等待一个信号量。它会尝试减少(减1)信号量的计数值。如果信号量的值大于0,则该函数会将信号量的值减1,并立即返回。
如果信号量的值为0,则调用 sem_wait 的线程或进程会被阻塞,直到另一个线程或进程通过调用 sem_post 增加了信号量的值,使得其大于0为止。
常用于表示“获取”一个资源或进入临界区。当信号量代表可用资源的数量时,sem_wait 可以理解为尝试占用一个资源。成功时返回 0,失败时返回 -1 并设置 errno
sem_post函数用于增加(加1)信号量的计数值,并通知可能正在等待该信号量的一个线程或进程。该函数总是成功地将信号量的值加1。
如果有其他线程或进程正在等待这个信号量(即因为信号量的值为0而被阻塞),那么其中一个等待的线程或进程会被唤醒并继续执行。常用于表示“释放”一个资源或离开临界区。当某个线程完成了对共享资源的操作后,可以通过调用 sem_post 来通知其他线程可以继续操作该资源。成功时返回 0,失败时返回 -1 并设置 errno
pthread_mutex_t
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREADS 5
#define ITERATIONS 100000
int counter = 0;
pthread_mutex_t mutex;
void* increment_counter(void* arg) {
for (int i = 0; i < ITERATIONS; ++i) {
pthread_mutex_lock(&mutex); // 加锁
++counter;
pthread_mutex_unlock(&mutex); // 解锁
}
return NULL;
}
int main() {
pthread_t threads[NUM_THREADS];
// 初始化互斥锁
if (pthread_mutex_init(&mutex, NULL) != 0) {
printf("Mutex init failed\n");
return 1;
}
// 创建多个线程
for (int i = 0; i < NUM_THREADS; ++i) {
if (pthread_create(&threads[i], NULL, increment_counter, NULL) != 0) {
printf("Thread creation failed\n");
return 1;
}
}
// 等待所有线程完成
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_join(threads[i], NULL);
}
// 销毁互斥锁
pthread_mutex_destroy(&mutex);
printf("Final counter value: %d\n", counter);
return 0;
}
pthread_mutex_t 是 POSIX 线程(也称为 pthreads)库中用于实现互斥锁(mutex)的数据类型。互斥锁是一种同步机制,用于保护共享资源免受并发访问的影响,从而避免数据竞争和不一致的状态。
互斥锁:互斥锁是一种同步原语,用于确保在任何给定时间只有一个线程可以访问特定的代码段或共享资源。
初始化与销毁:互斥锁需要先初始化才能使用,并且在不再需要时应该被销毁以释放相关资源。
加锁与解锁:通过 pthread_mutex_lock() 和 pthread_mutex_unlock() 函数来控制对临界区的访问。
- 初始化互斥锁
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
mutex: 要初始化的互斥锁指针。attr: 互斥锁属性(通常可以传入NULL使用默认属性)。- 返回值:成功返回
0,失败返回错误码。
- 销毁互斥锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
mutex: 要销毁的互斥锁指针。- 返回值:成功返回
0,失败返回错误码。
- 加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
mutex: 要加锁的互斥锁指针。- 返回值:成功返回
0,失败返回错误码。
- 尝试加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex);
mutex: 要尝试加锁的互斥锁指针。- 返回值:成功返回
0,如果锁已经被其他线程持有则返回EBUSY,失败返回其他错误码。
- 解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
mutex: 要解锁的互斥锁指针。- 返回值:成功返回
0,失败返回错误码。
pthread_cond_t
在 POSIX 线程(也称为 pthreads)中,条件变量(pthread_cond_t)是一种用于线程间通信的同步机制。条件变量允许一个线程等待某个条件成立,而另一个线程可以在满足条件时通知等待的线程继续执行
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define BUFFER_SIZE 10
int buffer[BUFFER_SIZE];
int fill_ptr = 0;
int use_ptr = 0;
int count = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
void put(int value) {
buffer[fill_ptr] = value;
fill_ptr = (fill_ptr + 1) % BUFFER_SIZE;
count++;
}
int get() {
int tmp = buffer[use_ptr];
use_ptr = (use_ptr + 1) % BUFFER_SIZE;
count--;
return tmp;
}
void* producer(void* arg) {
int item;
while (1) {
item = rand() % 100; // 生产随机数
pthread_mutex_lock(&mutex);
while (count == BUFFER_SIZE) {
pthread_cond_wait(&cond_var, &mutex); // 缓冲区满,等待
}
put(item);
pthread_cond_signal(&cond_var); // 通知消费者
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void* consumer(void* arg) {
int item;
while (1) {
pthread_mutex_lock(&mutex);
while (count == 0) {
pthread_cond_wait(&cond_var, &mutex); // 缓冲区空,等待
}
item = get();
pthread_cond_signal(&cond_var); // 通知生产者
pthread_mutex_unlock(&mutex);
printf("Consumed: %d\n", item);
}
return NULL;
}
int main() {
pthread_t prod_thread, cons_thread;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond_var, NULL);
pthread_create(&prod_thread, NULL, producer, NULL);
pthread_create(&cons_thread, NULL, consumer, NULL);
pthread_join(prod_thread, NULL);
pthread_join(cons_thread, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_var);
return 0;
}
- 初始化条件变量
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);cond: 指向要初始化的条件变量。attr: 条件变量属性,通常设置为NULL使用默认属性。- 返回值:成功返回
0,错误则返回非零错误码。
- 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);cond: 要销毁的条件变量。- 返回值:成功返回
0,错误则返回非零错误码。
等待和通知
- 等待条件变量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);cond: 条件变量。mutex: 保护共享资源的互斥锁,必须是在调用前已经锁定的同一个互斥锁。- 功能:原子地解锁互斥锁并使当前线程进入等待状态,直到被其他线程通过
pthread_cond_signal或pthread_cond_broadcast唤醒。唤醒后,在重新开始执行之前会自动重新获取互斥锁。 - 返回值:成功返回
0,错误则返回非零错误码。
- 定时等待条件变量
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);cond: 条件变量。mutex: 保护共享资源的互斥锁。abstime: 绝对超时时间点,使用struct timespec定义。- 功能:类似于
pthread_cond_wait,但是它会在指定的时间过后自动返回,即使条件尚未满足。 - 返回值:成功返回
0,超时返回ETIMEDOUT,其他错误则返回相应的错误码。
- 通知单个等待线程
int pthread_cond_signal(pthread_cond_t *cond);cond: 条件变量。- 功能:唤醒至少一个正在等待该条件变量的线程。如果有多个线程在等待,则具体唤醒哪一个由实现定义。
- 返回值:成功返回
0,错误则返回非零错误码。
- 广播通知所有等待线程
int pthread_cond_broadcast(pthread_cond_t *cond);cond: 条件变量。- 功能:唤醒所有正在等待该条件变量的线程。
- 返回值:成功返回
0,错误则返回非零错误码
异步日志写入
文件读写是IO操作,会引起系统中断的耗时操作. 通过异步写入,
IO操作设计的阻塞/非阻塞与同步/异步区分:
- 阻塞 vs 非阻塞主要关注于如何处理未准备好数据的情况:是立即返回还是等待。
- 同步 vs 异步则更多地涉及到整个I/O操作流程的设计理念:是否需要等待I/O操作完成才能继续执行。
利用单例模式创建了日志类,这样每个线程都共享这个实例.
通过多线程写入文件,避免IO的阻塞. 如果使用异步写入,主线程将日志写入到一个阻塞列表,创建的读出线程读取这个阻塞列表并将内容写入文件.
这个过程涉及到生产者-消费者问题,在加入数据时,判断队列是否满,如果满则返回false表明不能再添加,同时唤醒条件变量等待的线程(也就是读取线程). 如果没满则添加数据,同时也唤醒读取线程.
当读取数据时,如果队列为空,则阻塞读取线程,释放锁,让写入线程写入阻塞列表(缓冲区).
所以异步日志创建时就会创建一个消费者线程用于读取数据并写入到文件. 设计一个缓冲区(阻塞队列),利用上面的并发同步方法,也就是一个生产者-消费者模型.
此外日志有根据日期写入到不同的文件,此外写入的行数超出要求的行数时会另外创建一个日志.
数据库连接池
复用数据库的连接. 创建包含多个连接的连接池,初始化时根据账号密码以及数据库url创建连接存入list列表,这是个共享资源也需要使用锁进行保护. 这里使用信号量,根据连接数初始化信号量,每当获得一个连接,使用sem.wait(),信号量-1,信号量机制是当信号量为0时进行阻塞,否则减1. 当释放一个连接(将一个连接加入连接池),使用sem.post()信号量加1并唤醒阻塞线程. 对于一些共享资源,比如m_FreeConn和m_CurConn等变量进行锁保护.
线程池
在I/O模型中,同步I/O和异步I/O主要的区别是内核向应用程序通知的是就绪事件还是完成事件,以及是由应用程序还是由内核来完成I/O的读写操作
同步I/O:内核向应用程序通知就绪事件,由应用程序自身来完成I/O的读写操作
异步I/O:由内核来完成I/O的读写后向应用程序通知完成事件
在并发模式中,同步和异步的主要区别是*功能完成的流程是否是顺序化的,是否需要等待*
- 同步:当遇到阻塞任务时,会一直等待,直到该任务处理完成,程序完全按照代码顺序执行;
- 异步:程序的执行需要由系统事件驱动,程序的执行是不确定的,没有顺序上的要求
事件处理模式
Reactor模式和Proactor模式
Reactor模式是一种事件驱动的设计模式,主要用于处理并发的输入操作(如网络连接或文件I/O)。它的主要思想是将所有I/O操作分派给一个专门的事件循环来处理,这个事件循环会监听多个事件源,并在相应的事件发生时调用对应的处理器(Handler)。这种模式非常适合于需要处理大量并发连接的应用程序,比如Web服务器。
Reactor 模式的核心是使用事件循环监听多个文件描述符(如套接字),并在事件发生时调用相应的处理器
#include <iostream>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
#include <vector>
class EventHandler {
public:
virtual ~EventHandler() = default;
virtual void handle_event(int fd) = 0;
};
class Reactor {
private:
fd_set read_fds; // 监听的文件描述符集合
int max_fd; // 最大文件描述符
std::vector<EventHandler*> handlers;
public:
Reactor() : max_fd(0) {
FD_ZERO(&read_fds);
}
void register_handler(int fd, EventHandler* handler) {
FD_SET(fd, &read_fds);
if (fd > max_fd) {
max_fd = fd;
}
handlers.push_back(handler);
}
void run_event_loop() {
while (true) {
fd_set tmp_fds = read_fds;
int ret = select(max_fd + 1, &tmp_fds, nullptr, nullptr, nullptr);
if (ret < 0) {
std::cerr << "Select error!" << std::endl;
break;
}
for (int i = 0; i <= max_fd; ++i) {
if (FD_ISSET(i, &tmp_fds)) {
for (auto handler : handlers) {
handler->handle_event(i);
}
}
}
}
}
};
class ReadHandler : public EventHandler {
public:
void handle_event(int fd) override {
char buffer[1024];
int bytes = read(fd, buffer, sizeof(buffer) - 1);
if (bytes > 0) {
buffer[bytes] = '\0';
std::cout << "Received: " << buffer << std::endl;
} else {
std::cerr << "Client disconnected." << std::endl;
close(fd);
}
}
};
int main() {
// 创建服务器套接字
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8080);
server_addr.sin_addr.s_addr = INADDR_ANY;
bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr));
listen(server_fd, 5);
Reactor reactor;
ReadHandler read_handler;
// 注册服务器套接字
reactor.register_handler(server_fd, &read_handler);
// 运行事件循环
reactor.run_event_loop();
return 0;
}
Proactor模式是一种异步的事件处理模式,与Reactor模式不同,它允许操作在后台执行,当操作完成时通知应用程序。通常,Proactor模式涉及到的操作如读写文件或网络I/O都是通过操作系统提供的异步I/O服务来实现的。然而,在某些不支持异步I/O的操作系统上,可以通过同步I/O结合多线程或者回调机制来模拟Proactor模式的效果。这意味着主线程可以提交I/O操作给一个工作线程池,然后继续处理其他任务,而工作线程会在I/O操作完成后通过某种方式(例如回调函数)通知主线程
Proactor 模式依赖于异步 I/O 操作,通常由操作系统提供支持。
#include <iostream>
#include <aio.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <vector>
class Proactor {
private:
std::vector<aiocb*> aio_controls; // 异步 I/O 控制块
public:
~Proactor() {
for (auto cb : aio_controls) {
delete cb;
}
}
void start_read(int fd) {
aiocb* cb = new aiocb();
memset(cb, 0, sizeof(aiocb));
char buffer[1024];
cb->aio_fildes = fd;
cb->aio_buf = buffer;
cb->aio_nbytes = sizeof(buffer) - 1;
cb->aio_offset = 0;
aio_controls.push_back(cb);
if (aio_read(cb) < 0) {
std::cerr << "Error starting async read." << std::endl;
return;
}
std::cout << "Async read started on fd: " << fd << std::endl;
}
void wait_for_completion() {
for (auto cb : aio_controls) {
while (aio_error(cb) == EINPROGRESS) {
usleep(1000); // 等待操作完成
}
ssize_t bytes = aio_return(cb);
if (bytes > 0) {
std::cout << "Async read completed. Data: "
<< static_cast<char*>(cb->aio_buf) << std::endl;
} else {
std::cerr << "Async read failed." << std::endl;
}
}
}
};
int main() {
// 打开一个文件进行异步读取
int fd = open("test.txt", O_RDONLY);
if (fd < 0) {
std::cerr << "Failed to open file." << std::endl;
return -1;
}
Proactor proactor;
proactor.start_read(fd); // 开始异步读取
proactor.wait_for_completion(); // 等待完成
close(fd);
return 0;
}
| 特性 | Reactor 模式 | Proactor 模式 |
|---|---|---|
| 核心机制 | 同步 I/O,事件驱动 | 异步 I/O,操作系统支持 |
| 适用场景 | 高并发连接,如网络服务器 | 高性能 I/O 密集型应用 |
| 复杂度 | 较低,易于理解和实现 | 较高,需要操作系统支持异步 I/O |
| 效率 | 受限于同步 I/O | 更高效,I/O 操作在后台完成 |
| 典型实现 | select、poll、epoll | POSIX AIO、Windows Overlapped I/O |
- reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
- proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。
并发编程模式
半同步/半异步模式
半同步/半异步(Half-Sync/Half-Async)模式是一种设计模式,用于处理不同类型的并发任务。它通过分离同步和异步的任务处理路径,使得系统能够更高效地管理资源并提高响应速度。这种模式特别适用于那些需要同时处理同步任务(如用户界面交互)和异步任务(如后台计算或I/O操作)的应用场景.
同步层(Synchronous Layer):负责处理需要立即响应的任务,例如用户界面的交互事件。这些任务通常要求快速响应以提供良好的用户体验。
队列(Queue):作为同步层和异步层之间的桥梁,用于传递消息或任务。同步层将任务放入队列中,异步层从队列中取出任务进行处理。
异步层(Asynchronous Layer):负责执行耗时较长的任务,比如文件I/O、网络通信或者复杂的计算。这些任务不会阻塞主线程,从而保证了系统的流畅性。
半同步/半反应堆模式
半同步/半反应堆(Half-Sync/Half-Reactor)模式是一种混合的设计模式,它结合了同步和异步处理的优势来管理并发连接。这种模式通常用于服务器程序中,旨在提高系统的性能和可扩展性。
Reactor部分:主要负责监听多个文件描述符(如套接字),并在相应的事件发生时(比如有新的连接请求或数据到达)通知对应的事件处理器。
- 主线程运行一个事件循环,调用
select、poll或者更高效的epoll等函数等待感兴趣的事件发生。 - 当检测到新的连接请求时,主线程接受该连接,并将其交给工作线程池中的某个线程处理。
Worker部分:每个工作线程独立地处理从主线程接收过来的具体连接上的读写操作。
- 工作线程执行同步的I/O操作,这意味着它们会在读取或写入数据时阻塞,直到操作完成。
- 尽管如此,由于这些操作是在单独的线程中进行的,因此不会影响主线程对新连接的响应速度。
与半同步/半反应堆的区别
- 半同步/半反应堆:主要用于服务器程序中,其中主线程(Reactor部分)监听新的连接请求,然后将新建立的连接交给工作线程处理。这是一种专门针对网络编程的设计模式,强调的是如何有效地管理和分发多个并发连接。
- 半同步/半异步模式:更加通用,不仅仅局限于网络编程。它可以应用于任何需要同时处理同步任务(如UI更新)和异步任务(如后台数据处理)的场景。其重点在于如何通过分离同步和异步的任务处理路径来优化系统性能和响应速度。
Linux网络编程 | 并发模式:半同步/半异步模式、领导者/追随者模式_linux 网络编程 leadfollow模式-CSDN博客
IO多路复用
五种I/O模型
- 阻塞IO:调用者调用了某个函数,等待这个函数返回,期间什么也不做,不停的去检查这个函数有没有返回,必须等这个函数返回才能进行下一步动作
- 非阻塞IO:非阻塞等待,每隔一段时间就去检测IO事件是否就绪。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是立即返回,不管时间是否已经发生,若时间没有发生,则返回-1,此时可以根据errno区分这两种情况,对于accept,recv和send,事件未发生时,errno通常被设置成eagain
- 信号驱动IO:linux用套接口进行信号驱动IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO时间就绪,进程收到SIGIO信号。然后处理IO事件。
- IO复用:linux用select/poll函数实现IO复用模型,这两个函数也会使进程阻塞,但是和阻塞IO所不同的是这两个函数可以同时阻塞多个IO操作。而且可以同时对多个读操作、写操作的IO函数进行检测。知道有数据可读或可写时,才真正调用IO操作函数
- 异步IO:linux中,可以调用aio_read函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序。
注意:阻塞I/O,非阻塞I/O,信号驱动I/O和I/O复用都是同步I/O。同步I/O指内核向应用程序通知的是就绪事件,比如只通知有客户端连接,要求用户代码自行执行I/O操作,异步I/O是指内核向应用程序通知的是完成事件,比如读取客户端的数据后才通知应用程序,由内核完成I/O操作。
IO多路复用是一种允许单个进程监视多个文件描述符(如套接字、管道等)的技术,当其中任意一个文件描述符准备好进行读写操作时,该技术能够通知应用程序。这种方法可以显著提高服务器程序的性能,因为它避免了为每个连接创建单独线程或进程所带来的开销。
单线程Polling API的常规用法是:
让Polling API监控服务端socket的状态,然后开始死循循环,循环过程中主要有三种逻辑分支:
- 服务端socket的状态变为可读,即表示有客户端发起连接,此时就调用accept建立连接,得到一个客户端fd。将其加入到Polling API的监控集合,并标记其为可读。
- 客户端fd的状态变为可读,则调用read/recv从fd读取数据,然后执行业务逻辑,处理完,再将其加入到Polling API的监控集合,并标记其为可写。
- 客户端fd的状态变为可写,则调用write/send将数据发送给客户端。
select
select函数监视一组文件描述符,等待它们中的任何一个变为可读、可写或发生异常条件。
通过三个位图(fd_set类型)来分别表示需要监视的可读、可写及异常事件集合。
当调用select时,内核会阻塞当前进程,直到指定的文件描述符集合中的至少一个准备就绪。
返回值有响应的文件描述符个数.如果超时,则返回0;如果出错,则返回-1。
- 在调用
select之前,你需要将感兴趣的文件描述符添加到fd_set中(通过FD_SET)。 - 当
select返回时,fd_set中的内容会被更新,仅保留那些已经“就绪”的文件描述符。 - 换句话说,
select会清除未就绪的文件描述符,只留下那些可以进行读、写或异常处理的文件描述符。
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
nfds: 要监视的最大文件描述符加1。readfds,writefds,exceptfds: 分别指向要监视的可读、可写及异常事件的文件描述符集合。timeout: 等待的时间限制,若设置为NULL则无限期等待。
FD_ZERO
- 功能:清空一个
fd_set集合。
void FD_ZERO(fd_set *set);
FD_SET
- 功能:将一个特定的文件描述符添加到一个
fd_set集合中。
void FD_SET(int fd, fd_set *set);
FD_CLR
- 功能:从一个
fd_set集合中移除一个特定的文件描述符。
void FD_CLR(int fd, fd_set *set);
FD_ISSET
- 功能:检查一个特定的文件描述符是否在某个
fd_set集合中。
int FD_ISSET(int fd, fd_set *set);
fd_set数据结构
/* fd_set for select and pselect. */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
// __FD_SETSIZE = 1024 表示fd_set是个包含一个1024bit数组的结构体
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
select示例代码
fd_set read_fds, active_fds;
int max_fd = sockfd;
std::vector<unsigned int> clients_fd;
FD_ZERO(&read_fds);
FD_SET(sockfd, &read_fds);
while (true) {
// 轮询select 直到有连接
timeval timeout{1, 0};
read_fds = active_fds;
// select阻塞,直到相应文件描述符就绪(可读,可写或异常)
// 成功返回时(>=1),fd_set内容更新,仅保留就绪的fd
int ret = select(max_fd + 1, &read_fds, nullptr, nullptr, &timeout);
errif(ret < 0, "select failed");
if (FD_ISSET(sockfd, &read_fds)) {
// 服务器可读(有connect连接)
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
bzero(&client_addr, client_addr_len);
int client_sockfd =
accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len);
FD_SET(client_sockfd, &active_fds);
max_fd = std::max(client_sockfd, max_fd);
clients_fd.push_back(client_sockfd);
std::cout << "New connection, socket fd: " << client_sockfd
<< ", IP: " << inet_ntoa(client_addr.sin_addr)
<< ", Port: " << ntohs(client_addr.sin_port) << std::endl;
}
for (auto it = clients_fd.begin(); it != clients_fd.end();) {
unsigned int client_fd = *it;
if (FD_ISSET(client_fd, &read_fds)) {
//客户端可读数据
// 读取数据
char read_bytes[1024]{};
size_t bytes_len = read(client_fd, read_bytes, sizeof(read_bytes));
if (bytes_len <= 0) {
std::cout << "client closed...\n";
close(client_fd);
FD_CLR(client_fd, &active_fds);
it = clients_fd.erase(it);
continue;
} else {
std::string resp =
"你好,你发送了" + std::string(read_bytes, bytes_len);
errif(write(client_fd, resp.c_str(), resp.size()) == -1,
"write socket failed");
++it;
}
} else {
++it;
}
}
}
for (auto &client_fd : clients_fd) {
// 关闭所有客户端
close(client_fd);
}
// close 关闭服务端
close(sockfd);
优点
- 支持跨平台使用,几乎所有类Unix系统都支持。
缺点
- 文件描述符集合大小有限制(通常为1024)。
- 每次调用都需要重新构建文件描述符集合。
- 效率较低,随着监听的文件描述符数量增加性能下降明显。
poll
poll与select类似,但它使用了一个结构体数组来代替select中的位图。
这使得它可以处理更多数量的文件描述符,并且没有像select那样的固定上限。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds: 指向一个包含多个pollfd结构体的数组,每个结构体描述了一个文件描述符及其感兴趣的事件。nfds: 数组中元素的数量。timeout: 等待的时间限制,单位为毫秒。
pollfd数据结构
/* Data structure describing a polling request. */
struct pollfd
{
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */
short int revents; /* Types of events that actually occurred. */
};
fd:要监视的文件描述符。events:请求监视的事件类型,可以是以下标志的组合:POLLIN:有数据可读。POLLPRI:有紧急数据可读。POLLOUT:准备好写数据。POLLRDHUP(自 Linux 2.6.17 起):TCP连接被对端关闭,或对端关闭了写入一半的连接。POLLERR:发生错误。POLLHUP:挂起。POLLNVAL:无效请求;指定的文件描述符无效。revents:实际发生的事件,由内核在poll调用返回时填充。
事件类型如下
/* Event types that can be polled for. These bits may be set in `events'
to indicate the interesting event types; they will appear in `revents'
to indicate the status of the file descriptor. */
#define POLLIN 0x001 /* There is data to read. */
#define POLLPRI 0x002 /* There is urgent data to read. */
#define POLLOUT 0x004 /* Writing now will not block. */
/* Event types always implicitly polled for. These bits need not be set in
`events', but they will appear in `revents' to indicate the status of
the file descriptor. */
#define POLLERR 0x008 /* Error condition. */
#define POLLHUP 0x010 /* Hung up. */
#define POLLNVAL 0x020 /* Invalid polling request. */
poll示例代码
std::vector<pollfd> poll_fds;
pollfd server_pollfd;
server_pollfd.fd = sockfd;
server_pollfd.events = POLLIN; // 设置事件
poll_fds.push_back(server_pollfd);
while (true) {
int ret = poll(poll_fds.data(), poll_fds.size(), -1);
errif(ret < 0, "poll error");
if (poll_fds.at(0).revents & POLLIN) {
// 数据读入事件
// serverfd可读,即可accept
sockaddr_in client_addr{};
socklen_t addr_len = sizeof(client_addr);
int client_fd = accept(sockfd, (struct sockaddr *)&client_addr,
(socklen_t *)&addr_len);
errif(client_fd == -1, "accept failed");
pollfd client_pollfd;
client_pollfd.fd = client_fd;
client_pollfd.events = POLLIN;
poll_fds.push_back(client_pollfd);
std::cout << "New connection, socket fd: " << client_fd
<< ", IP: " << inet_ntoa(client_addr.sin_addr)
<< ", Port: " << ntohs(client_addr.sin_port) << std::endl;
}
for (auto it = poll_fds.begin() + 1; it != poll_fds.end();) {
char bytes_read[1024]{};
pollfd &client_pollfd = *it;
size_t read_bytes_len =
read(client_pollfd.fd, bytes_read, sizeof(bytes_read));
if (read_bytes_len <= 0) {
// 关闭
std::cout << "client closed...\n";
close(client_pollfd.fd);
it = poll_fds.erase(it);
} else {
// echo
std::string resp =
"你好,你的请求是" + std::string(bytes_read, read_bytes_len);
write(client_pollfd.fd, resp.data(), resp.size());
++it;
}
}
}
// 关闭所有客户端连接
for (const auto &pfd : poll_fds) {
close(pfd.fd);
}
优点
- 不受文件描述符数量的限制。
- 对于大量文件描述符的情况比
select更高效。
缺点
- 在大量文件描述符的情况下,效率仍然不高,因为每次调用都需要遍历整个文件描述符列表。
对于大量文件描述符的情况,poll 的性能优于 select,因为它不需要每次调用前都重新初始化文件描述符集合。
epoll
epoll是Linux特有的高级IO多路复用机制,旨在克服select和poll的局限性。- 它采用事件驱动的方式,只有当某个文件描述符有事件发生时才会被通知,而不是每次都检查所有文件描述符的状态。
epoll_create1: 创建一个新的epoll实例。epoll_ctl: 向epoll实例添加、修改或删除关注的文件描述符。epoll_wait: 等待事件的发生。
#include <sys/epoll.h>
// 创建epoll实例
/* Creates an epoll instance. Returns an fd for the new instance.
The "size" parameter is a hint specifying the number of file
descriptors to be associated with the new instance. The fd
returned by epoll_create() should be closed with close(). */
int epoll_create(int size);
/* Same as epoll_create but with an FLAGS parameter. The unused SIZE
parameter has been dropped. */
int epoll_create1(int flags);
// 控制epoll实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
当使用
epoll_create或epoll_create1函数创建一个新的epoll实例时,Linux 内核会在内部创建一个“事件表”。这个事件表本质上是一个动态调整大小的数据结构,用来跟踪所有被注册到该epoll实例上的文件描述符及其感兴趣的事件类型(如可读、可写等)。通过这种方式,应用程序可以向内核注册多个文件描述符,并指定对每个文件描述符感兴趣的事件类型。
epoll_ctl
epfd:指向由epoll_create或epoll_create1创建的epoll实例的文件描述符。op:指定要执行的操作类型,可以是以下值之一:EPOLL_CTL_ADD:将新的文件描述符加入到epoll实例的监视列表中,并关联相应的事件。EPOLL_CTL_MOD:修改已经存在于epoll实例中的文件描述符所关注的事件。EPOLL_CTL_DEL:从epoll实例中移除一个文件描述符,不再监视其事件。fd:要操作的目标文件描述符。event:指向struct epoll_event结构体的指针,包含与该文件描述符相关联的事件信息(当op是EPOLL_CTL_ADD或EPOLL_CTL_MOD时需要)。对于EPOLL_CTL_DEL操作,此参数可以为NULL,因为此时仅关心移除文件描述符本身。
epoll_event结构体
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
events:- 表示需要监视的事件类型。
- 它是一个位掩码(bitmask),可以是以下标志的组合:
EPOLLIN:表示文件描述符可读(有数据可读)。EPOLLOUT:表示文件描述符可写(可以写入数据)。EPOLLRDHUP:表示对端关闭了连接或关闭了写入方向(自 Linux 2.6.17 起支持)。EPOLLPRI:表示有紧急数据可读(例如带外数据)。EPOLLERR:表示发生了错误。EPOLLHUP:表示挂起(hang up),通常表示对端关闭了连接。EPOLLET:启用边缘触发模式(Edge-Triggered, ET)。默认是水平触发模式(Level-Triggered, LT)。EPOLLONESHOT:表示事件只会触发一次,之后需要重新添加到epoll实例中。
data:- 这是一个联合体(union),允许用户将任意数据与文件描述符关联起来。
- 定义如下:
typedef union epoll_data { void *ptr; // 指针类型 int fd; // 文件描述符 uint32_t u32; // 32位无符号整数 uint64_t u64; // 64位无符号整数 } epoll_data_t; - 最常见的用法是通过
fd字段存储文件描述符,以便在事件触发时快速定位对应的文件描述符。
epoll示例代码
int epoll_fd = epoll_create1(0);
errif(epoll_fd == -1, "epoll creation failed");
const int MAX_EVENTS = 10;
epoll_event ev, events[MAX_EVENTS];
ev.events = POLLIN | EPOLLET; // 水平触发模式
ev.data.fd = sockfd;
// 添加服务端fd
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
errif(ret == -1, "ctl failed");
while (true) {
// 等待事件
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
errif(nfds == -1, "epoll waut failed");
if ((events[0].data.fd == sockfd) && (events[0].events & POLLIN)) {
// accept就绪
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd =
accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len);
epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 水平触发模式
ev.data.fd = client_fd;
// 将客户端连接加入epoll实例
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
errif(ret == -1, "ctl failed");
}
for (int i = 1; i < MAX_EVENTS; i++) {
if (events[i].events & POLLIN) {
int client_fd = events[i].data.fd;
// read就绪
char bytes_read[1024]{};
size_t bytes_read_len = read(client_fd, bytes_read, sizeof(bytes_read));
if (bytes_read_len <= 0) {
std::cout << "client connection closed...\n";
close(client_fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
} else {
std::string resp =
"你好,你发送了:" + std::string(bytes_read, bytes_read_len);
write(client_fd, resp.data(), resp.size());
}
}
}
close(epoll_fd);
- 默认情况下,
epoll使用水平触发模式(Level-Triggered, LT),即只要文件描述符处于就绪状态,epoll_wait就会一直返回。 - 通过设置
EPOLLET标志,可以启用边缘触发模式(Edge-Triggered, ET)。在这种模式下,只有当文件描述符的状态发生变化时,epoll_wait才会返回。
优点
- 高效地管理大量文件描述符。
- 只返回活跃的文件描述符,减少了不必要的上下文切换。
- 支持边缘触发(Edge Triggered)和水平触发(Level Triggered)两种模式。
缺点
- 仅适用于Linux操作系统,不具有跨平台兼容性。
注意事项
- 边缘触发模式的使用:
- 在边缘触发模式下,必须一次性读取完所有可用数据,否则可能会丢失事件。
- 错误处理:
- 示例代码中包含了基本的错误处理,在实际应用中应更全面地处理各种异常情况。
- 缓冲区大小:
- 确保
BUFFER_SIZE足够大,以避免数据截断。
设置epoll触发事件时可以设置水平触发(LT)和边缘触发(ET).
如果使用 LT 模式,只要文件描述符处于就绪状态(例如,对于读操作而言,意味着有数据可读),每次调用 epoll_wait() 都会返回该文件描述符。这意味着即使没有完全读取所有可用的数据,在后续的 epoll_wait() 调用中,只要还有未处理的数据,这个文件描述符仍然会被标记为就绪。
ET 模式仅在文件描述符的状态发生变化时触发一次通知(即从不可读变为可读或反之)。这意味着如果你在一个 epoll_wait() 返回后没有完全处理完所有可用的数据,那么在下一次调用 epoll_wait() 之前,即使文件描述符仍然处于就绪状态,也不会再次收到通知。因此,你需要确保一次性读取尽可能多的数据直到没有更多数据为止。
- 选择 LT 还是 ET 主要取决于你的应用需求
- 如果你需要简单的实现,并且对偶尔的重复通知不敏感,LT 是更好的选择。
- 如果你追求高性能,并且能够保证每次都能高效地处理所有数据,ET 则可能更适合。
- 通常建议
- 对于大多数情况,默认使用 LT 模式即可满足需求。
- 在需要处理大量并发连接并且希望减少系统开销的情况下,可以考虑使用 ET 模式,但需要小心处理以避免错过事件。 LT 模式示例
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = listen_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &event);
while (true) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].data.fd == listen_sock) {
// 接受新连接...
} else {
// 处理现有连接上的数据...
// LT 模式下,如果有剩余数据未读,下次 epoll_wait 仍会通知
}
}
}
#### ET 模式示例
struct epoll_event event;
event.events = EPOLLIN | EPOLLET; // 启用 ET 模式
event.data.fd = listen_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &event);
while (true) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
if (events[i].data.fd == listen_sock) {
// 接受新连接...
} else {
// ET 模式下,需要循环读取直到 EAGAIN 或 EWOULDBLOCK
while ((nread = read(events[i].data.fd, buf, sizeof(buf))) > 0) {
// 处理读取到的数据...
}
if (nread == -1 && errno != EAGAIN) {
perror("read error");
}
}
}
}
- ET 模式的特点:只会在文件描述符状态发生变化时触发一次通知,因此需要开发者确保在每次事件触发时尽可能多地处理数据。
- 为什么需要
while循环- 确保读取所有可用数据,避免遗漏。
- 防止因为缓冲区大小限制或数据分段到达而导致未处理的数据残留。
- 提高数据处理的效率和可靠性。
在网络编程中,特别是在高并发场景下,以下情况可能导致数据未被完全读取:
- TCP 流的特性:
- TCP 是面向流的协议,数据是以字节流的形式传输的,而不是固定大小的消息包。接收方可能会收到部分数据,也可能收到多个消息拼接在一起的数据。
- 即使当前已经触发了“可读”事件,你并不能假设一次
read()调用就能读取到完整的数据块。- 缓冲区的限制:
- 每次调用
read()时,操作系统内核会将数据从内核缓冲区复制到用户空间缓冲区。如果用户空间缓冲区不足以容纳所有数据,则只能读取一部分数据。- 剩余的数据仍然存在于内核缓冲区中,但在 ET 模式下,如果没有新的事件触发,这些数据将不会被通知给应用程序。
| 特性 | select | poll | epoll |
|---|---|---|---|
| 跨平台支持 | 是 | 是 | 否(仅Linux) |
| 最大文件描述符数 | 有限制(通常是1024) | 无限制 | 无限制 |
| 性能 | 随文件描述符数量增加而降低 | 相对较好,但仍有改进空间 | 非常高效,适合高并发场景 |
| 事件通知方式 | 轮询所有文件描述符 | 轮询所有文件描述符 | 仅通知活跃的文件描述符 |
定时器
实现了一个定时器,定时器包括截至时间,回调函数以及client_data.
超时的时候调用回调函数,执行将fd从epoll中去除并close(fd),user_count--的操作. 并实现定时器队列,按照定时器的截至事件升序.
webserver包含多个client_data,client_data包含fd,address以及定时器用于控制超时事件.
WebServer::WebServer() {
// http_conn类对象
users = new http_conn[MAX_FD]; // 为每个连接创建一个http_conn和client_data对象
// 用户数据
users_timer = new client_data[MAX_FD];
// root文件夹路径
char server_path[200];
getcwd(server_path, 200);
// 设置文件根目录
char root[6] = "/root";
m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1);
strcpy(m_root, server_path);
strcat(m_root, root);
}
相关书籍和资料
最新版Web服务器项目详解 - 02 半同步半反应堆线程池(上)
Unix环境高级编程
Unix网络编程
类似项目github上有很多
forthespada/MyPoorWebServer: 一款可运行的基于C++ 实现的WebServer服务器,基于《TCPIP网络编程》和《Linux高性能服务器编程》实现的服务器项目。

Comments NOTHING