基于epoll的多线程网络服务程序设计——C语言
? 采用C语言设计了一个基于epoll的多线程网络服务程序。每个线程都有一个epoll来捕获处于这个线程的socket事件。当子线程数量为0,即只有一个线程,则网络监听服务与socket消息处理处于同一个epoll。当子线程数量大于0时,主线程监听socket连接,当有新的连接到来时将其加入到活跃socket数量最小的子线程的epoll中。
server.h
#ifndef EPOLL_C_SERVER_H
#define EPOLL_C_SERVER_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#define RESULT_OK 0
#define RESULT_ERROR -1
typedef int (*MSG_HANDLE)(int socket_fd,void* arg) ;
typedef struct
{
int epoll_fd;
pthread_t thd_fd;
MSG_HANDLE data_func;
unsigned int active_conection_cnt;
pthread_mutex_t thd_mutex;
}socket_thd_struct;
typedef struct
{
int epoll_fd;
unsigned short ip_port;
MSG_HANDLE data_func;
unsigned int socket_pthread_count;
socket_thd_struct* socket_thd;
}server_struct;
server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle);
int serverRun(server_struct *param_server);
#endif
server.c
#include "server.h"
static void* socketPthreadRun(void* arg)
{
socket_thd_struct* pa_sock_st=(socket_thd_struct*)arg;
int active_counts=0;
struct epoll_event ev;
struct epoll_event events[5];
int ret=0;
while(1)
{
active_counts=epoll_wait(pa_sock_st->epoll_fd,events,5,-1);
sprintf(stdout,"active count:%d\n",active_counts);
int index=0;
for(index=0;index<active_counts;index++)
{
if(events[index].events&EPOLLERR || events[index].events&EPOLLRDHUP)
{
sprintf(stdout,"client close this socket\n");
epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
close(events[index].data.fd);
pthread_mutex_lock(&(pa_sock_st->thd_mutex));
pa_sock_st->active_conection_cnt--;
pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
}
else if(events[index].events&EPOLLIN)
{
sprintf(stdout,"handle recv client data\n");
ret=pa_sock_st->data_func(events[index].data.fd,NULL);
if(ret==-1)
{
sprintf(stderr,"client socket exception happened\n");
epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
close(events[index].data.fd);
pthread_mutex_lock(&(pa_sock_st->thd_mutex));
pa_sock_st->active_conection_cnt--;
pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
}
if(ret==0)
{
sprintf(stdout,"client close this socket\n");
epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
close(events[index].data.fd);
pthread_mutex_lock(&(pa_sock_st->thd_mutex));
pa_sock_st->active_conection_cnt--;
pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
}
else if(ret==1)
{
ev.data.fd=events[index].data.fd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_MOD,events[index].data.fd,&ev);
}
}
}
}
pthread_exit(NULL);
}
server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle)
{
server_struct* serv_st=(server_struct*)malloc(sizeof(server_struct));
serv_st->ip_port=param_port;
serv_st->data_func=param_handle;
serv_st->epoll_fd=epoll_create(256);
serv_st->socket_pthread_count=param_thd_count;
serv_st->socket_thd=NULL;
if(serv_st->socket_pthread_count>0)
{
sprintf(stdout,"create client socket sub thread\n");
serv_st->socket_thd=(socket_thd_struct*)malloc(sizeof(socket_thd_struct)*serv_st->socket_pthread_count);
int index=0;
for(index=0;index<serv_st->socket_pthread_count;index++)
{
serv_st->socket_thd[index].epoll_fd=epoll_create(256);
serv_st->socket_thd[index].data_func=param_handle;
serv_st->socket_thd[index].active_conection_cnt=0;
serv_st->socket_thd[index].thd_fd=0;
pthread_create(&(serv_st->socket_thd[index].thd_fd),NULL,socketPthreadRun,(void*)&(serv_st->socket_thd[index]));
pthread_mutex_init(&(serv_st->socket_thd[index].thd_mutex),NULL);
}
}
return serv_st;
}
int serverRun(server_struct *param_server)
{
int ret=RESULT_OK;
int server_socket=0;
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
struct epoll_event ev;
struct epoll_event events[5];
int active_count=0;
int index=0;
int new_socket=0;
struct sockaddr_in client_info;
socklen_t client_info_len=0;
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htons(INADDR_ANY);
server_addr.sin_port=htons(param_server->ip_port);
server_socket=socket(PF_INET,SOCK_STREAM,0);
if(server_socket<0)
{
sprintf(stderr,"create socket error\n");
return RESULT_ERROR;
}
printf("create server socket ssuccessful\n");
param_server->epoll_fd=epoll_create(256);
ev.data.fd=server_socket;
ev.events=EPOLLIN|EPOLLET;
if(epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,server_socket,&ev)!=0)
{
sprintf(stderr,"server socket add to epoll error\n");
return RESULT_ERROR;
}
sprintf(stdout,"server socket add to epoll successful\n");
if(bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))!=0)
{
sprintf(stderr,"server bind failed:%d\n",param_server->ip_port);
return RESULT_ERROR;
}
sprintf(stdout,"server socket bind successful\n");
if(listen(server_socket,param_server->ip_port)!=0)
{
sprintf(stderr,"server listen failed:%d\n",param_server->ip_port);
return RESULT_ERROR;
}
sprintf(stdout,"server socket listen successful\n");
while(1)
{
active_count=epoll_wait(param_server->epoll_fd,events,5,-1);
sprintf(stdout,"active count:%d\n",active_count);
for(index=0;index<active_count;index++)
{
if(events[index].data.fd==server_socket)
{
sprintf(stdout,"new socket comming\n");
client_info_len=sizeof(client_info);
new_socket=accept(server_socket,(struct sockaddr*)&client_info,&client_info_len);
if(new_socket<0)
{
sprintf(stderr,"server accept failed\n");
continue;
}
sprintf(stdout,"new socket:%d.%d.%d.%d:%d-->connected\n",((unsigned char*)&(client_info.sin_addr))[0],((unsigned char*)&(client_info.sin_addr))[1],((unsigned char*)&(client_info.sin_addr))[2],((unsigned char*)&(client_info.sin_addr))[3],client_info.sin_port);
ev.data.fd=new_socket;
ev.events=EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP;
if(param_server->socket_pthread_count==0)
{
epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);
}
else
{
int tmp_index=0;
int mix_cnt_thread_id=0;
unsigned int act_cnts=0;
for(tmp_index=0;tmp_index<param_server->socket_pthread_count;tmp_index++)
{
pthread_mutex_lock(&(param_server->socket_thd[tmp_index].thd_mutex));
act_cnts=param_server->socket_thd[tmp_index].active_conection_cnt;
pthread_mutex_unlock(&(param_server->socket_thd[tmp_index].thd_mutex));
if(mix_cnt_thread_id>act_cnts)
{
mix_cnt_thread_id=tmp_index;
}
}
epoll_ctl(param_server->socket_thd[mix_cnt_thread_id].epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);
pthread_mutex_lock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
param_server->socket_thd[mix_cnt_thread_id].active_conection_cnt++;
pthread_mutex_unlock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
}
sprintf(stdout,"add new client socket to epoll\n");
}
else if(events[index].events&EPOLLERR || events[index].events&EPOLLRDHUP)
{
sprintf(stdout,"client close this socket\n");
epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
close(events[index].data.fd);
}
else if(events[index].events&EPOLLIN)
{
sprintf(stdout,"begin recv client data\n");
ret=param_server->data_func(events[index].data.fd,NULL);
if(ret==-1)
{
sprintf(stderr,"client socket exception happened\n");
epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
close(events[index].data.fd);
}
if(ret==0)
{
sprintf(stdout,"client close this socket\n");
epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
close(events[index].data.fd);
}
else if(ret==1)
{
ev.data.fd=events[index].data.fd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(param_server->epoll_fd,EPOLL_CTL_MOD,events[index].data.fd,&ev);
}
}
}
}
close(server_socket);
return RESULT_OK;
}
|