epoll并发服务器的实现

Epoll并发聊天服务器的实现

一、相关知识

1.实现并发通信的三种方式

​ 实现并发通信主要有三种方式:多进程服务器多路复用服务器(I/O复用)、多线程服务器

  • 多进程服务器

​ 多进程服务器指的是利用不同进程处理来自不同客户端发来的连接请求,进程之间以轮转的方式运行,由于各个进程之间轮转运行的时间间隔很小,故在用户看来其实现了并行处理所有的客户请求。

​ 多进程服务器主要使用fork()函数进行创建子进程,将主进程和子进程隔离开来对各个客户端的请求进行分别响应,fork()函数的原型为:

#include<unisted.h>

pid_t fork(void);
//成功时返回进程ID,失败时返回-1

fork()函数将创建调用的进程副本,也就是说,并非根据完全不同的程序创建进程,而是复制正在运行的、调用fork函数的进程

​ 在子进程完成其任务之后,若程序员没有对其进行销毁,那么该子进程会变成僵尸进程(Zombie),若父进程未主动要求获得子进程的结束状态值,操作系统将一直保存,并让子进程长时间处于僵尸进程状态。我们可以使用wait()函数销毁僵尸进程:

#include<sys/wait.h>

pid_t wait(int * statloc);
//成功时返回终止的子进程ID,失败时返回-1

​ 调用此函数时如果已有子进程终止,那么子进程终止时传递的返回值(exit函数的参数值、main函数的return返回值)将保存到该函数的参数所指内存空间。

​ 但函数参数指向的单元中还包含其他信息,因此需要通过下列宏进行分离:

  • WIFEXITED子进程正常终止时返回“真”
  • WEXITSTATUS返回子进程的返回值
int status;
pid_t pid=fork();
.....
if(pid == 0)return 3;
wait(&status);
if(WIFEXITED(status))
    //输出3
    printf("Child send: %d \n",WEXITSTATUS(status));

调用wait函数时,如果没有已终止的子进程,那么程序将阻塞直到有子进程终止,因此需谨慎调用该函数!

​ 相较于wait()销毁僵尸进程,waitpid()是更好的函数,并且使用相关信号(signal()函数)可以在子进程调用完成后自动销毁,但是多进程服务器并不是本文的重点,故不再赘述。

  • 多路复用服务器

​ 下面用两张图简单介绍多进程服务器与多路复用服务器的区别

​ IO多路复用相对于阻塞式和非阻塞式的好处就是它可以监听多个 socket ,并且不会消耗过多资源。select函数是实现I/O多路复用的最简单也是最重要的函数。

​ 当用户进程调用 select 时,它会监听其中所有 socket() 直到有一个或多个 socket 数据已经准备好,否则就一直处于阻塞状态。select()的缺点在于单个进程能够监视的文件描述符的数量存在最大限制,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的的开销也线性增长。同时,由于网络响应时间的延迟使得大量的tcp链接处于非常活跃状态,但调用select()会对所有的socket进行一次线性扫描,所以这也浪费了一定的开销。

​ select()函数的参数有些复杂,下面使用注释对select()的各个参数进行详细的解释

#include<sys/select.h>
#include<sys/time.h>

int select(
int maxfd, fd_set * readset, fd_set * writeset, fd_set * exceptset, const struct timeval * timeout);
//maxfd:监视对象文件描述符数量
//readset:将所有关注”是否存在待读取数据“的文件描述符注册到fd_set型变量,并传递其地址值
//writeset:将所有关注”是否可传输无阻塞数据“的文件描述符注册到fd_set型变量,并传递其地址值
//exceptset:将所有关注”是否发生异常“的文件描述符注册到fd_set型变量,并传递其地址值
//timeout:调用select函数后,为防止陷入无限循环的阻塞,传递超时信息
//返回值:发生错误时返回-1,超时返回时返回0,因发生关注的事件返回时,返回大于0的值,该值是发生事件的文件描述符数

​ select函数使用具体流程:

  1. 定义select需要的各种参数,例如fd_max、timeout、所需监听的套接字列表等。
  2. 使用select函数,并对其返回值进行判断。
  3. 若返回值为0,那么继续循环使用select监视套接字列表;若大于0,使用FD_ISSET确认是哪一个列表发生了变化,之后对select函数的返回值(即发生变化的套接字)进行操作。

​ select不合理的两个缺点:

  • 调用后常见的针对所有文件描述符的循环语句。
  • 每次调用函数时都需要向该函数传递监视对象信息。

​ 调用select函数后,不是把发生变化的文件描述符单独集中到一起,而是通过观察作为监视对象的fd_set变量的变化,找出发生变化的文件描述符,因此无法避免针对所有监视对象的循环语句。

​ 向系统传递监视对象信息是select函数的瓶颈,因为这种应用程序与系统层面的交互将对程序造成很大负担。

​ 由于select函数是针对套接字的处理,而套接字是由操作系统管理的,故无法绕开应用程序与操作系统之间的交互,那么可行的优化方法是,仅向操作系统传递1次监视对象,监视范围或内容发生变化时只通知发生变化的事项

​ epoll()函数解决了select()函数在处理上的瓶颈,其优点为:

  • 无需编写以监视状态变化为目的的针对所有文件描述符的循环语句。
  • 调用对应于select函数的epoll_wait函数时无需每次传递监视对象信息。

​ epoll实现需要的三个函数

  • epoll_create:创建保存epoll文件描述符的空间,取代了select中自己创建fd_set的操作,由操作系统负责保存监视对象文件描述符
  • epoll_ctl:向空间注册并注销文件描述符
  • epoll_wait:与select函数类似,等待文件描述符发生变化

​ epoll方式通过epoll_event结构体将发生变化的文件描述符单独集中到一起

struct epoll_event
{
    __uint32_t events;
    epoll_data_t data;
}
	typedef union epoll_data
    {
        void * ptr;
        int fd;
        __uint32_t u32;
        __uint64_t u64;
    }epoll_data_t;
#include<sys/epoll.h>

int epoll_create(int size);
//成功时返回epoll文件描述符,失败时返回-1
//调用epoll_create时创建的文件描述符保存空间称为”epoll例程”,通过参数size传递的值决定epoll例程的大小,但该值仅供操作系统参考
#include<sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event);
//成功时返回0,失败时返回-1
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
//成功时返回发生事件的文件描述符数,失败时返回-1
//该函数的调用方式如下:
int event_cnt;
struct epoll_event * ep_events;
.....
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE);//EPOLL_SIZE是宏常量
.....
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
//调用函数后,返回发生事件的文件描述符数,同时在第二个参数指向的缓冲中保存发生事件的文件描述符集合,因此无需像select那样插入针对所有文件描述符的循环

​ epoll()有两种不同的触发方式,分别是条件触发和边缘触发

  • 条件触发:只要输入缓冲有数据就会一直通知该事件,即多次注册
  • 边缘触发:输入缓冲收到数据时仅注册1次该事件,即使输入缓冲中还留有数据,也不会再进行注册

​ epoll默认使用条件触发,即在有限的输入缓冲下,多个客户端对其进行连接那么会触发多次wait函数。

将注册客户端套接字中的

event.events = EPOLLIN

改为

event.events = EPOLLIN | EPOLLET

就可以实现epoll的边缘触发,即从客户端接收数据时,仅注册1次事件,但是需要进行额外的处理,例如将其变为非阻塞模式。

  • 多线程服务器

​ 多进程服务器的瓶颈在于在切换进程时涉及到内核态和用户态的切换以及上下文的保存,这种操作在大量的切换过程中十分占用资源,而利用线程可以很好地避开进程切换的瓶颈,多线程的优点为:

  • 线程的创建和上下文切换比进程的创建和上下文切换更快。
  • 线程间交换数据时无需特殊技术。

​ 多线程服务器的编写涉及到线程库(pthread.h)以及信号量的互斥同步等操作,在实现起来相对复杂,这里不在赘述。

2 Epoll的两种触发方式

​ epoll()函数具有两种触发模式:边缘触发(ET, Edge Triggered)和条件触发(LT, Level Triggered),这两种模式定义了epoll()函数如何响应文件描述符的就绪事件。

  1. 条件触发(Level Triggered, LT):
    • 在此模式下,只要文件描述符处于就绪状态(例如,有数据可读、可写),epoll_wait就会通知这个事件。
    • 它更容易理解和使用,因为只要条件满足,事件就会一直被报告。
    • 但是,这可能导致效率问题,尤其是在高负载时。如果应用程序没有读取所有可用数据,下次调用epoll_wait时,它仍然会报告相同的文件描述符,可能导致多余的处理。
  2. 边缘触发(Edge Triggered, ET):
    • 在边缘触发模式下,只有文件描述符状态发生变化时(例如,从非就绪变为就绪),epoll_wait才会通知事件。
    • 这种模式对于提高效率非常有用,因为它减少了事件的重复报告。
    • 但是,它也更难正确地使用。应用程序必须确保每次都处理所有的可用数据,因为新的数据到来不会再次触发事件,除非文件描述符的状态再次改变。

​ 上面的描述可能比较晦涩,用一个简单的例子来讲解两种触发模式的特点:

儿子:“妈妈,我收到了500元压岁钱。”

妈妈:“嗯,真棒!“

儿子:”我给隔壁小王买了烤鸭,花了200元。”

妈妈:“嗯,做的好!”

儿子:“妈妈,我还买了玩具,剩下50元。”

妈妈:”嗯,你可以自己分配!“

​ 从上述对话中可以看出,儿子从收到压岁钱开始一直向妈妈报告,这就时条件触发的原理。如果将儿子比作输入缓冲,压岁钱比作输入数据,儿子的报告比作事件,可以将条件触发简单地描述为:

条件触发方式中,只要输入缓冲中有数据就会一直通知该事件。

​ 例如,服务器端输入缓冲中收到50字节的数据时,服务器端操作系统将通知该事件(注册到发生变化的文件描述符)。但服务器端读取20字节后还剩30字节的情况下,仍会注册事件。

​ 下面再看一个例子:

儿子:”妈妈,我收到了500元压岁钱。”

妈妈:“嗯,再接再厉。”

儿子:“。。。。。。”

妈妈:“说话呀!压岁钱呢?”

​ 上面的对话就反映了边缘触发的工作模式,即输入缓冲收到数据时仅注册1次该事件,即使输入缓冲中还留有数据,也不会再进行注册。

​ epoll默认以条件触发方式工作,select也同样是以该方式工作的,这两种触发方式各有利弊,使用哪种模式取决于特定的应用场景和对性能的需求。边缘触发通常用于高性能服务器,因为它可以减少系统调用次数,而条件触发则适用于简单应用或者对性能要求不高的场景。

3 线程的使用及线程同步

​ 利用线程的高并发特点使客户端的I/O操作分离,这样可以使整个代码更加清晰,并且能够提高效率。

​ 线程具有单独的执行流,因此需要单独定义线程的main函数,还需要请求操作系统在单独的执行流中执行该函数,完成该功能的函数如下:

#include<pthread.h>

int pthread_create(pthread_t * restrict thread, const pthread_attr_t * restrict attr, void * (* start_routine)(void *), void * restrict arg);
//成功时返回0,失败时返回其他值
//thread:保存新创建进程ID的变量地址值
//attr:用于传递线程属性的参数,传递NULL时,创建默认属性的线程
//start_routine:相当于线程main函数的、在单独执行流中执行的函数地址值
//arg:通过第三个参数传递调用函数时包含传递参数信息的变量地址值

​ 线程相关代码在编译时需要添加-lpthread选项声明需要连接线程库,只有这样才能调用头文件pthread.h中声明的函数

​ 由于线程的执行是非阻塞的,所以如果不使用相关函数控制线程的执行流,必须使用sleep函数阻塞进程,让进程等待所有线程都返回之后在返回!

​ 下面介绍控制线程执行流的函数:

#include<pthread.h>

int pthread_join(pthread_t thread, void ** status);
//成功时返回0,失败时返回其他值
//ptread:该参数值ID的线程终止后才会从该函数返回
//status:保存线程的main函数返回值的指针变量地址值

​ 该函数可以阻塞进程的执行。

​ 当多个线程同时运行时,就会产生临界区的问题。

线程安全函数被多个线程同时调用时也不会引发问题,而非线程安全函数被同时调用时会引发问题。

​ 在编译时添加-D_REENTRANT宏使函数库中的非线程安全函数变为定义好的线程安全函数。

​ 线程同步是在编写有关线程程序时的核心问题,因为线程的调用顺序是随机的,并且同一个进程的线程共用一个栈区,所以共享变量的使用需要格外谨慎,修改共享变量的代码我们称之为临界区。对于临界区的访问需要通过同步和互斥来进行保护:

  • 同步:需要指定访问同一内存空间的线程执行顺序的情况

  • 互斥:同时访问同一内存空间时发生的情况

​ 互斥的实质就是上锁机制,其依赖互斥量来实现,下面介绍互斥量的创建及销毁函数:

#include<pthread.h>

int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr * attr);
int pthread_mutex_destroy(pthread_mutex_t * mutex);
//成功时返回0,失败时返回其他值
//mutex:创建互斥量时传递保存互斥量的变量地址值,销毁时传递需要销毁的互斥量地址值
//attr:传递即将创建的互斥量属性,没有特别需要指定的属性时传递NULL

​ 接下来介绍利用互斥量锁住或释放临界区时使用的函数:

int pthread_mutex_lock(pthread_mutex_t * mutex);
int pthread_mutex_unlock(pthread_mutex_t * mutex);
//成功时返回0,失败时返回其他值

​ 函数用法:

pthread_mutex_lock(&mutex);
//临界区的开始
//....
//临界区的结束
pthread_mutex_unlock(&mutex);

​ 为了提高效率,我们应该最大限度减少互斥量lock、unlock函数的调用次数

​ 同步需要使用信号量(semaphore)来完成,下面介绍信号量创建及销毁的方法。

#include<semaphore.h>

int sem_init(sem_t * sem, int pshared, unsigned int value);
int sem_destroy(sem_t * sem);
//成功时返回0,失败时返回其他值
//sem:传递信号量时传递保存信号量的变量地址,销毁时传递需要销毁的信号变量地址值
//pshared:传递其他值时,创建可由多个进程共享的信号量;传递0时,创建只允许1个进程内部使用的信号量
//value:指定新创建的信号量初始值

​ 信号量中相当于互斥量lock、unlock的函数,其中post让信号量增加1,wait让信号量减少1。

int sem_post(sem_t * sem);
int sem_wait(sem_t * sem);
//成功时返0,失败时返回其他值

二、代码实现

​ 使用epoll实现聊天服务器的代码不是很复杂,因为最核心的功能——数据的收发已经被epoll函数很好地实现了,我们所要做的就是调用该函数并且在此基础上添加一些简单的功能。

chat_clnt.c:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void *send_msg(void *arg);
void *recv_msg(void *arg);
void error_handling(char *msg);

char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, char *argv[])
{
	int sock;
	struct sockaddr_in serv_addr;
	pthread_t snd_thread, rcv_thread;
	void *thread_return;
	if (argc != 4)
	{
		printf("Usage : %s <IP> <port> <name>\n", argv[0]);
		exit(1);
	}

	sprintf(name, "[%s]", argv[3]);
	sock = socket(PF_INET, SOCK_STREAM, 0);

	memset(&serv_addr, 0, sizeof(serv_addr));
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
	serv_addr.sin_port = htons(atoi(argv[2]));

	if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1)
		error_handling("connect() error");

	pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
	pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
	pthread_join(snd_thread, &thread_return);
	pthread_join(rcv_thread, &thread_return);
	close(sock);
	return 0;
}

void *send_msg(void *arg) // send thread main
{
	int sock = *((int *)arg);
	char name_msg[NAME_SIZE + BUF_SIZE];
	while (1)
	{
		fgets(msg, BUF_SIZE, stdin);
		if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
		{
			close(sock);
			exit(0);
		}
		sprintf(name_msg, "%s %s", name, msg);
		write(sock, name_msg, strlen(name_msg));
	}
	return NULL;
}

void *recv_msg(void *arg) // read thread main
{
	int sock = *((int *)arg);
	char name_msg[NAME_SIZE + BUF_SIZE];
	int str_len;
	while (1)
	{
		str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
		if (str_len == -1)
			return (void *)-1;
		name_msg[str_len] = 0;
		fputs(name_msg, stdout);
	}
	return NULL;
}

void error_handling(char *msg)
{
	fputs(msg, stderr);
	fputc('\n', stderr);
	exit(1);
}

clnt_serv.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_CLNT 256
#define BUF_SIZE 100
#define EPOLL_SIZE 50
void setnonblockingmode(int fd);
void error_handling(char *buf);

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	socklen_t adr_sz;
	int str_len, i, j;
	char buf[BUF_SIZE];

	struct epoll_event *ep_events;
	struct epoll_event event;
	int epfd, event_cnt;

	int clnt_socks[MAX_CLNT];
	int clnt_cnt = 0;
	pthread_mutex_t mutex;

	if (argc != 2)
	{
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}

	serv_sock = socket(PF_INET, SOCK_STREAM, 0);
	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family = AF_INET;
	serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
	serv_adr.sin_port = htons(atoi(argv[1]));

	if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1)
		error_handling("bind() error");
	if (listen(serv_sock, 5) == -1)
		error_handling("listen() error");

	epfd = epoll_create(EPOLL_SIZE);
	ep_events = malloc(sizeof(struct epoll_event) * EPOLL_SIZE);

	setnonblockingmode(serv_sock);
	event.events = EPOLLIN;
	event.data.fd = serv_sock;
	epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);

	while (1)
	{
		event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
		if (event_cnt == -1)
		{
			puts("epoll_wait() error");
			break;
		}

		// puts("return epoll_wait");
		for (i = 0; i < event_cnt; i++)
		{
			if (ep_events[i].data.fd == serv_sock)//client comes!
			{
				adr_sz = sizeof(clnt_adr);
				clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &adr_sz);
				setnonblockingmode(clnt_sock);
				event.events = EPOLLIN | EPOLLET;
				event.data.fd = clnt_sock;
				pthread_mutex_lock(&mutex);
				clnt_socks[clnt_cnt++] = clnt_sock;
				pthread_mutex_unlock(&mutex);

				epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
				printf("connected client: %d \n", clnt_sock);
			}
			else
			{
				while (1)
				{
					str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
					if (str_len == 0) // close request!
					{
						epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
						close(ep_events[i].data.fd);
						pthread_mutex_lock(&mutex);
						for (j = 0; j < clnt_cnt; j++)
						{
							if (ep_events[i].data.fd == clnt_socks[j])
							{
								while (j < clnt_cnt - 1)
								{
									clnt_socks[j] = clnt_socks[j + 1];
									j++;
								}
								break;
							}
						}
						clnt_cnt--;
						pthread_mutex_unlock(&mutex);
						printf("closed client: %d \n", ep_events[i].data.fd);
						break;
					}
					else if (str_len < 0)
					{
						if (errno == EAGAIN)
							break;
					}
					else
					{
						pthread_mutex_lock(&mutex);
						for (j = 0; j < clnt_cnt; j++)
						{
							if (clnt_socks[j] != ep_events[i].data.fd)
								write(clnt_socks[j], buf, str_len);
						}
						pthread_mutex_unlock(&mutex);
					}
				}
			}
		}
	}
	close(serv_sock);
	close(epfd);
	return 0;
}

void setnonblockingmode(int fd)
{
	int flag = fcntl(fd, F_GETFL, 0);
	fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
void error_handling(char *buf)
{
	fputs(buf, stderr);
	fputc('\n', stderr);
	exit(1);
}

Makefile:

CC = gcc
CFLAGS = -Wall -pthread
TARGET_SERVER = chat_serv
TARGET_CLIENT = chat_clnt

all: $(TARGET_SERVER) $(TARGET_CLIENT)

$(TARGET_SERVER): chat_serv.o
	$(CC) $(CFLAGS) -o $(TARGET_SERVER) chat_serv.o

$(TARGET_CLIENT): chat_clnt.o
	$(CC) $(CFLAGS) -o $(TARGET_CLIENT) chat_clnt.o

chat_serv.o: chat_serv.c
	$(CC) $(CFLAGS) -c chat_serv.c

chat_clnt.o: chat_clnt.c
	$(CC) $(CFLAGS) -c chat_clnt.c

clean:
	rm -f *.o $(TARGET_SERVER) $(TARGET_CLIENT)

三、运行结果展示

当有客户接入聊天室的时候会服务器会显示其占用的文件描述符;每一个客户在发送消息是会在其发送的消息前附加上其昵称,这样更容易识别是哪一个客户发来的消息。

版权声明:本文为博主作者:Briskk原创文章,版权归属原作者,如果侵权,请联系我们删除!

原文链接:https://blog.csdn.net/Briskk/article/details/135232660

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
社会演员多的头像社会演员多普通用户
上一篇 2024年1月16日
下一篇 2024年1月16日

相关推荐