一、实验目的
在Linux系统中,使用C语言编程设计一个基于TCP/IP的多线程echo服务器程序,从而实现对客户端发送内容的响应。主要实现为一个面向连接的、多线程并发服务器,并由客户端程序发送信息。
二、实验内容
修改多线程的echo服务器程序,其中客户端能将相应信息传输给服务器,服务器从客户端接收信息,并用将请求的信息内容应答客户端。同时要求一个服务器能够为多个客户并发提供服务,使之能够满足:
- 服务器用多线程来实现并发;
- 在程序开始时创建固定数目的线程;
- 当客户连接请求到来时,不会创建服务于客户端的线程,而是创建连接并分配一个事先创建的线程为客户提供服务;
- 如果连接数多于预先创建的线程数,则新创建的连接客户进行排队,等待最近的连接被释放后,再利于此空闲线程进行连接。
三、实验步骤
1.设计服务器和客户端的编程思路:
2.设计程序测试步骤与预期结果:
同下四-2
四、程序以及运行结果
1.程序部分
服务器:
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/signal.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/errno.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <netdb.h>
#include <errno.h>
#define WORKERS 2
#define QLEN 32
#define BUFSIZE 4096
#define INTERVAL 5
#define LL_ADD(item, list) do { \
if (list == NULL){ \
list=item; \
} \
else{ \
item->prev = list; \
while(list->next != NULL){ \
list = list->next; \
} \
list->next = item; \
item->next = item->prev; \
item->prev = list; \
list = item->next; \
item->next = NULL; \
} \
} while(0)
#define LL_REMOVE(item, list) do { \
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (item == list) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
extern int errno;
unsigned short portbase = 0;
struct {
pthread_mutex_t st_mutex;
unsigned int st_concount;
unsigned int st_contotal;
unsigned long st_contime;
unsigned long st_bytecount;
} stats;
typedef struct MANAGER ThreadPool;
typedef struct JOB JOB;
struct WORKER {
pthread_t thread;
ThreadPool *pool;
int terminate;
struct WORKER *next;
struct WORKER *prev;
};
struct JOB {
void * (*func)(void *arg);
void *user_data;
struct JOB *next;
struct JOB *prev;
};
struct MANAGER {
struct WORKER *workers;
struct JOB *jobs;
int num;
pthread_cond_t jobs_cond;
pthread_mutex_t jobs_mutex;
};
int threadPoolCreate(ThreadPool *pool, int numWorkers);
int threadPoolDestory(ThreadPool *pool);
void threadPoolPush(ThreadPool *pool, void *(*func)(void *arg), void *arg);
void prstats(void);
int TCPechod(int fd);
int errexit(const char *format, ...);
int passiveTCP(const char *service, int qlen);
int passivesock(const char *service, const char *transport, int qlen);
int main(int argc, char *argv[])
{
char *service = "echo";
struct sockaddr_in fsin;
unsigned int alen;
int msock;
int ssock;
switch (argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
errexit("usage: TCPechod [port]\n");
}
msock = passiveTCP(service, QLEN);
ThreadPool pool;
int workers = WORKERS;
int ret = threadPoolCreate(&pool,workers);
if(ret < 0){
fprintf(stdout, "threadpool_create failed!\n");
return ret;
}
while (1) {
alen = sizeof(fsin);
ssock = accept(msock, (struct sockaddr *)&fsin, &alen);
if(ssock < 0){
if(errno == EINTR)
continue;
errexit("accept: %s\n", strerror(errno));
}
threadPoolPush(&pool,TCPechod, ssock);
}
threadPoolDestory(&pool);
}
static void* threadCallback(void *args)
{
struct WORKER *worker = (struct WORKER*)args;
while (1) {
pthread_mutex_lock(&worker->pool->jobs_mutex);
while(worker->pool->jobs == NULL){
if(worker->terminate)
break;
pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex);
}
if(worker->terminate){
pthread_mutex_unlock(&worker->pool->jobs_mutex);
break;
}
printf("\n~~~~~~~~~~~~~~~~Thread %lu is awake!~~~~~~~~~~~~~~~~\n\n",worker->thread);
struct JOB *job = worker->pool->jobs;
LL_REMOVE(job, worker->pool->jobs);
pthread_mutex_unlock(&worker->pool->jobs_mutex);
job->func(job->user_data);
worker->pool->num--;
}
free(worker);
pthread_exit(NULL);
}
int threadPoolCreate(ThreadPool *pool, int numWorkers)
{
pool->num = 0;
pthread_t th;
pthread_attr_t ta;
(void) pthread_attr_init(&ta);
(void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
(void) pthread_mutex_init(&stats.st_mutex, 0);
if (pthread_create(&th, &ta, (void * (*)(void *))prstats, 0) < 0)
errexit("pthread_create(prstats): %s\n", strerror(errno));
if(numWorkers < 1)
numWorkers = 1;
if(pool == NULL)
return -1;
memset(pool, 0, sizeof(ThreadPool));
pool->workers=NULL;
pool->jobs=NULL;
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
fprintf(stdout, "\n--------------------------------------------------------------------------------\n");
int i = 0;
for (i = 0; i < numWorkers; i++) {
struct WORKER *worker = (struct WORKER *)malloc(sizeof(struct WORKER));
if (worker == NULL) {
perror("malloc");
return -2;
}
memset(worker, 0, sizeof(struct WORKER));
worker->next=NULL;
worker->prev=NULL;
worker->pool = pool;
int ret = pthread_create(&worker->thread, &ta, threadCallback, worker);
if (ret) {
perror("pthread_create");
free(worker);
return -3;
}
fprintf(stdout, "worker thread: %lu\n", worker->thread);
LL_ADD(worker, pool->workers);
}
fprintf(stdout, "--------------------------------------------------------------------------------\n");
return 0;
}
void threadPoolPush(ThreadPool *pool, void *(*func)(void *arg), void *arg)
{
JOB *job=(JOB*)malloc(sizeof(JOB));
if(job==NULL){
perror("malloc");
exit(1);
}
memset(job, 0, sizeof(JOB));
job->func = func;
job->user_data = arg;
job->next=NULL;
job->prev=NULL;
pthread_mutex_lock(&pool->jobs_mutex);
if(job!=NULL)
LL_ADD(job, pool->jobs);
pool->num++;
if(pool->num > WORKERS){
printf("\n~~~~~~~~~~~~~~~~Waiting client number is %d.~~~~~~~~~~~~~~~~\n\n",(pool->num - WORKERS));
fflush(stdout);
}
else{
pthread_cond_signal(&pool->jobs_cond);
}
pthread_mutex_unlock(&pool->jobs_mutex);
}
int threadPoolDestory(ThreadPool *pool)
{
struct WORKER *worker = NULL;
for (worker = pool->workers; worker != NULL; worker = worker->next){
worker->terminate = 1;
}
pthread_mutex_lock(&pool->jobs_mutex);
int ret = pthread_cond_broadcast(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
return ret;
}
int TCPechod(int fd)
{
time_t start;
char buf[BUFSIZ];
int cc;
start = time(0);
(void) pthread_mutex_lock(&stats.st_mutex);
stats.st_concount++;
(void) pthread_mutex_unlock(&stats.st_mutex);
memset(buf,0,sizeof(buf));
while (cc = read(fd, buf, sizeof buf)) {
if (cc < 0)
errexit("echo read: %s\n", strerror(errno));
if (write(fd, buf, cc) <= 0)
errexit("echo write: %s\n", strerror(errno));
(void) pthread_mutex_lock(&stats.st_mutex);
stats.st_bytecount += cc;
(void) pthread_mutex_unlock(&stats.st_mutex);
memset(buf,0,sizeof(buf));
}
(void) close(fd);
(void) pthread_mutex_lock(&stats.st_mutex);
stats.st_contime += time(0) - start;
stats.st_concount--;
stats.st_contotal++;
(void) pthread_mutex_unlock(&stats.st_mutex);
return 0;
}
void prstats(void)
{
time_t now;
while (1) {
(void) sleep(INTERVAL);
(void) pthread_mutex_lock(&stats.st_mutex);
now = time(0);
(void) printf("--- %s", ctime(&now));
(void) printf("%-32s: %u\n", "Current connections",
stats.st_concount);
(void) printf("%-32s: %u\n", "Completed connections",
stats.st_contotal);
if (stats.st_contotal) {
(void) printf("%-32s: %.2f (secs)\n",
"Average complete connection time",
(float)stats.st_contime /
(float)stats.st_contotal);
(void) printf("%-32s: %.2f\n",
"Average byte count",
(float)stats.st_bytecount /
(float)(stats.st_contotal +
stats.st_concount));
}
(void) printf("%-32s: %lu\n\n", "Total byte count",
stats.st_bytecount);
(void) pthread_mutex_unlock(&stats.st_mutex);
}
}
int errexit(const char *format, ...)
{
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
exit(1);
}
int passivesock(const char *service, const char *transport, int qlen)
{
struct servent *pse;
struct protoent *ppe;
struct sockaddr_in sin;
int s, type;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
if ( pse = getservbyname(service, transport) )
sin.sin_port = htons(ntohs((unsigned short)pse->s_port)
+ portbase);
else if ((sin.sin_port=htons((unsigned short)atoi(service))) == 0)
errexit("can't get \"%s\" service entry\n", service);
if ( (ppe = getprotobyname(transport)) == 0)
errexit("can't get \"%s\" protocol entry\n", transport);
if (strcmp(transport, "udp") == 0)
type = SOCK_DGRAM;
else
type = SOCK_STREAM;
s = socket(PF_INET, type, ppe->p_proto);
if (s < 0)
errexit("can't create socket: %s\n", strerror(errno));
if (bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0)
errexit("can't bind to %s port: %s\n", service,
strerror(errno));
if (type == SOCK_STREAM && listen(s, qlen) < 0)
errexit("can't listen on %s port: %s\n", service,
strerror(errno));
return s;
}
int passiveTCP(const char *service, int qlen)
{
return passivesock(service, "tcp", qlen);
}
客户端:
#include <stdio.h>
#include <string.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<netdb.h>
int main(int argc,char *argv[])
{
int sockfd;
int confd;
struct hostent *he;
struct sockaddr_in addr_ser;
struct sockaddr_in addr_file;
char sendMsg[1024],recvMsg[1024];
if(argc!=3){
perror("Please input the server port!\n");
exit(1);
}
he=gethostbyname((char *)argv[1]);
if(he==NULL){
perror("Cannot get host by name!\n");
exit(1);
}
sockfd=socket(AF_INET,SOCK_STREAM,0);
if(sockfd==-1){
perror("Create socketfd failed!\n");
exit(1);
}
memset(&addr_ser,0,sizeof(addr_ser));
addr_ser.sin_family = AF_INET;
addr_ser.sin_port = htons((unsigned short)atoi(argv[2]));
addr_ser.sin_addr = *((struct in_addr *) he->h_addr);
confd = connect(sockfd, (struct sockaddr *)&addr_ser, sizeof(addr_ser));
if(confd == -1){
perror("Connectfd error!\n");
exit(1);
}
while(1){
printf("--------------------------------------------------\n");
memset(sendMsg,0,sizeof(sendMsg));
printf("Please input what you want to send:\0");
scanf("%s",sendMsg);
if(strncmp(sendMsg,"EXIT",4)==0){
break;
}
int n1;
n1=write(sockfd,sendMsg,strlen(sendMsg));
if(n1==0){
perror("client send wrong:");
break;
}
memset(recvMsg,0,sizeof(recvMsg));
int n2;
n2=read(sockfd,recvMsg,1024);
if(n2==-1){
perror("client recv wrong:");
break;
}
printf("Receive from server:%s\n",recvMsg);
fflush(stdout);
}
close(sockfd);
return 0;
}
2.步骤-运行结果
此实验中,设计最大的线程数量为2,开启一个服务器和四个客户端进行测试。 编译服务器server,打开服务器,输入端口号——输出创建的两个线程的线程号 等待5秒钟,prstats进程打印相关的服务器连接和echo信息的数据——由于没有客户端连接,所以结果为0,0,0 打开一个客户端,输入要连接的IP地址和端口号——服务器端显示分配给该响应的线程号
输入响应信息——服务器端输出已连接的客户端个数,客户端收到服务器的回声信息 打开第二个客户端,输入要连接的IP地址和端口号——服务器端显示分配给该响应的线程号 在第二个客户端输入响应信息,并启动第三个客户端——客户端2收到响应信息,服务器端显示两个连接和等待连接的客户端个数1 在第三个客户端中输入信息——服务器无响应 启动第四个客户端——服务器端显示两个连接和等待连接的客户端个数2 在第四个客户端中输入信息——服务器无响应 退出第一个客户端——第三个客户端马上收到回应,服务器端显示被分配的线程号,并展示有2个客户端正在连接,1个已经结束连接,平均连接时长、传输比特数等等。 在第三个客户端中测试输入信息——可以成功从服务器端收获回应信息 退出第二个客户端——第四个客户端马上收到回应,服务器端显示被分配的线程号,并展示有2个客户端正在连接,2个已经结束连接,平均连接时长、传输比特数等等。 在第四个客户端中测试输入信息——可以成功从服务器端收获回应信息
总结
喜欢这篇文章的话【点赞】、【收藏】下呗!
|