进程池
父进程
1、定义数据结构pChild,申请子进程数目的结构体空间
2、通过循环,socketpair创建全双工管道,创建子进程,将子进程pid,管道对端,是否忙碌等信息存储
3、socket,bind,listen,对应的端口处于监听状态 netstat
4、epoll_create创建epfd,监控socketFd和所有子进程的管道对端
5、while(1)循环 epoll_wait等待客户端的请求及子进程是否有通知
如果socketFd可读,说明是客户端有连接请求,accept对应连接请求,得到new_fd,循环遍历,找到非忙碌的子进程,将new_fd发送给对应子进程,将对应子进程标识为忙碌,然后父进程关闭new_fd。
判断就绪的描述符 是哪个子进程的管道对端,就将对应子进程标识为非忙碌,同时读出管道内数据。
子进程的流程
while(1)
{
1、接收任务,得到newFd
2、通过newFd给客户端发送文件
3、关闭newFd
4、通过写管道,通知父进程完成文件下载任务
}
1、 send_recv_syn 同步机制
设置发送端1000,接收端1500,但接收端因为网络原因,不能保证每个包都是接收到1500字节。
tcp_client.c
#include <func.h> #define N 1048576 int main(int argc,char* argv[]) { ARGS_CHECK(argc,3); int socketFd; socketFd=socket(AF_INET,SOCK_STREAM,0); ERROR_CHECK(socketFd,-1,"socket"); struct sockaddr_in ser; bzero(&ser,sizeof(ser)); ser.sin_family=AF_INET; ser.sin_port=htons(atoi(argv[2])); ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序 int ret; ret=connect(socketFd,(struct sockaddr*)&ser,sizeof(ser)); ERROR_CHECK(ret,"connect"); printf("connect success\n"); int i; int total=0; char buf[1000]={0}; for(i=0;i<N;i++) { ret=send(socketFd,buf,sizeof(buf),MSG_DONTWAIT); if(-1==ret) { //printf("errno=%d\n",errno); printf("total=%d\n",total); return -1; } total += ret; printf("ret = %d\n",ret); } printf("total send = %d\n",total); close(socketFd); }
tcp_server.c
#include <func.h> int main(int argc,sizeof(ser)); ser.sin_family=AF_INET; ser.sin_port=htons(atoi(argv[2])); ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序 int ret; ret=bind(socketFd,"bind"); listen(socketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息 int new_fd; struct sockaddr_in client; bzero(&client,sizeof(client)); int addrlen=sizeof(client); new_fd=accept(socketFd,(struct sockaddr*)&client,&addrlen); ERROR_CHECK(new_fd,"accept"); printf("client ip=%s,port=%d\n",inet_ntoa(client.sin_addr),ntohs(client.sin_port)); char buf[1500]={0}; int total = 0; while(1){ ret = recv(new_fd,0); total = total + ret; printf("ret = %d\n",ret); }; printf("total recv = %d\n",total); close(new_fd); close(socketFd); }
#include "function.h" int recvCycle(int newFd,void* p,int len){ int total = 0; int ret; char *pStart = (char*)p; while(total < len){ ret = recv(newFd,pStart + total,len - total,0); total = total + ret; } return 0; }
time型
#include "function.h" int main(int argc,"connect"); printf("connect success\n"); int dataLen; char buf[1000]={0}; //接收文件名 recvCycle(socketFd,&dataLen,4); recvCycle(socketFd,dataLen); int fd; fd=open(buf,O_CREAT|O_WRONLY,0666); ERROR_CHECK(fd,"open"); //接文件大小 off_t fileSize = 0;//文件大小off_t 长整型 recvCycle(socketFd,&fileSize,dataLen); time_t start,Now; //接受文件内容 start = Now = time(NULL); int downLoadSize = 0; while(1) { recvCycle(socketFd,4); if(dataLen>0) { recvCycle(socketFd,dataLen); write(fd,dataLen); downLoadSize+=dataLen; time(&Now); if(Now-start>=1){ printf("\r%5.2f%%",(float)downLoadSize / fileSize * 100); fflush(stdout); start = Now; } }else{ printf("\r100%% \n"); break; } } close(fd); close(socketFd); return 0; }
slice型
#include "function.h" int main(int argc,"open"); //接文件大小 off_t fileSize = 0,oldSize = 0,sliceSize;//文件大小off_t 长整型 off_t downLoadSize = 0; recvCycle(socketFd,dataLen); //接受文件内容 sliceSize = fileSize / 10000; while(1) { recvCycle(socketFd,dataLen); downLoadSize+=dataLen; if(downLoadSize - oldSize > sliceSize){ printf("\r%5.2f%%",(float)downLoadSize / fileSize * 100); fflush(stdout); oldSize = downLoadSize; } }else{ printf("\r100%% \n"); break; } } close(fd); close(socketFd); return 0; }
4、设置异常情况
(1)客户端在下载中突然断开,原先的服务端会一直死循环打印
父进程处于S和R状态来回切换,因为子进程断开后处于Z状态(僵尸状态),而fd未关闭会一直使epoll_wait的返回值为1。
原因:
(a)epoll_wait是每次监控到僵尸子进程为空闲。
(b)父进程和子进程分别掌握管道的一端,当子进程(客户端)关闭后,管道会被标记为一直可读,于是会main函数会一直在while(1)中不退出,read返回值为0,并一直打印 "child is not busy"。
方法:
修改tran_n.c,设置当send返回值为-1时及时退出。
tran_n.c
#include "function.h" int tranFile(int newFd){ Train_t train; int ret; //发送文件名 train.dataLen = strlen(FILENAME); strcpy(train.buf,FILENAME); int fd = open(FILENAME,O_RDONLY); ret = send(newFd,&train,4 + train.dataLen,0); ERROR_CHECK(ret,"send"); //发文件大小 struct stat buf; fstat(fd,&buf); train.dataLen = sizeof(buf.st_size); memcpy(train.buf,&buf.st_size,train.dataLen); ret = send(newFd,"send"); //发文件内容 while((train.dataLen = read(fd,train.buf,sizeof(train.buf)))){ ret = send(newFd,0); ERROR_CHECK(ret,"send"); } //发送结束标志 ret = send(newFd,4,"send"); return 0; }
修改后:
(2)服务器突然断开,客户端全部死循环
原因:recv返回值一直为0,total的值一直小于len。
方法:需要在函数后判断返回值是否为0,如果为0直接break返回-1
修改后:
(3)在改进(2)问题后,服务器断开后,再次执行./process_pool_server 192.168.3.160 2000 5
会出现如下异常:
原因:TCP断开连接后,恢复同一个端口需要一定时间,此时如果直接再重连同个端口就会出现上面的错误
方法在tcpInit中修改代码,设置setsockopt参数reuse即可。
#include "function.h" int tcpInit(int *pSocketFd,char *ip,char *port){ int socketFd; socketFd = socket(AF_INET,"socket"); int ret,reuse = 1; ret = setsockopt(socketFd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(int)); ERROR_CHECK(ret,"setsockopt"); struct sockaddr_in ser; bzero(&ser,sizeof(ser)); ser.sin_family = AF_INET; ser.sin_port = htons(atoi(port)); ser.sin_addr.s_addr = inet_addr(ip);//点分十进制转为32位的网络字节序 ret = bind(socketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息 *pSocketFd = socketFd; return 0; }
方法:异步拉起同步方法,设置一个管道,在同一进程中既可读又可写,当信号产生时写管道exitFds[1],并让exitFds[0]加入epoll监控events的集合,监控是否有管道中的读描述符是否可读(exitFds[0])。
(a)如果业务不重要,直接暴力kill
main.c
#include "function.h" int exitFds[2]; //全局变量,用于信号处理的管道,同一管道的目的是在同一进程中既可读又可写 void sigFunc(int sigNum){ write(exitFds[1],&sigNum,1);//写一个字节 } int main(int argc,char* argv[]){ ARGS_CHECK(argc,4); pipe(exitFds); signal(SIGUSR1,sigFunc); int ret; int childNum = atoi(argv[3]); Process_Data *pChild = (Process_Data*)calloc(childNum,sizeof(Process_Data)); makeChild(pChild,childNum);//创建子进程 int socketFd; tcpInit(&socketFd,argv[1],argv[2]);//建立TCP连接 int epfd; epfd = epoll_create(1);//创建一个句柄,占用一个文件描述符,参数表示需要监控的数目 struct epoll_event event,*evs; evs = (struct epoll_event*)calloc(childNum + 2,sizeof(struct epoll_event)); event.events = EPOLLIN; event.data.fd = socketFd; ret = epoll_ctl(epfd,EPOLL_CTL_ADD,socketFd,&event);//监听socketFd ERROR_CHECK(ret,"epoll_ctl"); event.data.fd = exitFds[0]; ret = epoll_ctl(epfd,exitFds[0],&event); ERROR_CHECK(ret,"epoll_ctl"); int i,j; for(i = 0; i < childNum ; i++){ event.data.fd = pChild[i].fd; ret = epoll_ctl(epfd,pChild[i].fd,&event);//将要监控的子进程的fd加入 ERROR_CHECK(ret,"epoll_ctl"); } int readyFdNum; int newFd; //int count = 0; while(1){ readyFdNum = epoll_wait(epfd,evs,childNum + 2,-1); //printf("count = %d,readyFdNum = %d\n",count++,readyFdNum); //epoll_wait等待事件的产生,参数evs用来从内核得到事件的集合 //用childNum + 1告知内核这个events有多大 for(i = 0; i < readyFdNum; i++){ //有客户端连入 if(evs[i].events == EPOLLIN && evs[i].data.fd == socketFd){ //断开后不会进入该循环 newFd = accept(socketFd,NULL,NULL);//不保存远程主机信息 for(j = 0; j < childNum; j++){ if(!pChild[j].busy){ //找到非忙碌的子进程,发任务(文件描述符) sendFd(pChild[j].fd,newFd); pChild[j].busy = 1; printf("%d child is busy\n",pChild[j].pid); break; } } close(newFd); //限制客户端只下载一次后就关闭,否则newFd的引用计数为2,即父进程和子进程都可以读取数据 } if(evs[i].events == EPOLLIN && evs[i].data.fd == exitFds[0]){ printf("start exit\n"); close(socketFd); //两种方式:1、暴力Kill 2、同步退出机制 //暴力kill for(j = 0;j < childNum; j++){ kill(pChild[j].pid,9); } for(j = 0;j < childNum; j++){ wait(NULL); } return 0; } for(j = 0; j < childNum; j++){ if(evs[i].data.fd == pChild[j].fd){ //遍历所有子进程的fd //判断就绪描述符是哪个子进程的管道对端,说明子进程已完成任务,就将对应子进程标记为非忙碌,并读出管道内容。 //printf("%d %d\n",evs[i].data.fd,pChild[j].fd); read(pChild[j].fd,&ret,1);//对端写一个字节这边读一个字节标记已完成任务,如果数据不读出,则会一直可读状态 // if(0 == ret2){ // return -1; // } pChild[j].busy = 0; //printf("ret2 = %d\n",ret2); printf("%d child is not busy\n",pChild[j].pid); } } // printf("%d\n",count++); // sleep(3); } } return 0; }
用10号信号Kill父进程及父进程状况:
子进程状况:
(b)如果业务重要,需要退出
方法一:sigprocmask屏蔽信号加保护 ..........................sigprocmask解除保护
方法二:同步退出机制,设置一个exitFlag,某个子进程完成当前任务后下次接收一个指定fd和exitFlag,若exitFlag为1则继续执行任务,如果exitFlag为0则有序退出,最大退出时间是所有子进程执行完当前任务的剩余最大时长。
效果如下:
具体代码见最后完整版。
方法:父进程监控子进程,在子进程非正常退出后父进程重新启用子进程
这里用三个名字来代表各个亲缘关系:爷进程、父进程、5个子进程
while(fork()) //爷进程进入循环 { int status; wait(&status); //获取退出码 if(WIFEXITED(status)) //如果父进程是正常退出 { printf("child exit normal\n"); exit(0);//爷进程退出 } //父进程非正常退出,重新回while循环创建父进程 }
效果如下:
当process_pool_server执行后,会出现1+1+5个进程,这时候给父进程发送信号通知其退出(不是给爷进程发信号)
function.h
#include <errno.h> #include <sys/epoll.h> #include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <pthread.h> #include <setjmp.h> #include <signal.h> #include <sys/msg.h> #include <strings.h> #include <sys/sem.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/wait.h> #include <syslog.h> #include <sys/select.h> #include <sys/time.h> #include <sys/mman.h> #include <pwd.h> #include <grp.h> #include <time.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/stat.h> #include <unistd.h> #include <sys/types.h> #include <dirent.h> #include <fcntl.h> #define ARGS_CHECK(argc,val) {if(argc!=val) {printf("error args\n");return -1;}} #define ERROR_CHECK(ret,retval,funcName) {if(ret==retval) {printf("LINE %d Function ERROR ",__LINE__);fflush(stdout);perror(funcName);return -1;}} #define THREAD_ERROR_CHECK(ret,funcName) {if(ret!=0) {printf("%s:%s\n",funcName,strerror(ret));return -1;}} //管理每一个子进程的数据结构 typedef struct{ pid_t pid;//子进程的pid int fd;//子进程的管道对端 short busy;//子进程是否忙碌,0代表非忙碌,1代表忙碌 }Process_Data; typedef struct{ int dataLen; char buf[1000]; }Train_t; #define FILENAME "file" int makeChild(Process_Data*,int); int tcpInit(int*,char*,char*); int childHandle(int); int sendFd(int,int,int); int recvFd(int,int*,int*); int tranFile(int);
main.c
#include "function.h" int exitFds[2]; //全局变量,用于信号处理的管道,同一管道的目的是在同一进程中既可读又可写 void sigFunc(int sigNum){ write(exitFds[1],char* argv[]){ while(fork()) //爷进程进入循环 { int status; wait(&status); //获取退出码 if(WIFEXITED(status)) //如果父进程是正常退出 { printf("child exit normal\n"); exit(0);//爷进程退出 } //父进程非正常退出,重新回while循环创建父进程 } ARGS_CHECK(argc,newFd,1); pChild[j].busy = 1; printf("%d child is busy\n",pChild[j].pid); break; } } close(newFd); //限制客户端只下载一次后就关闭,否则newFd的引用计数为2,即父进程和子进程都可以读取数据 } if(evs[i].events == EPOLLIN && evs[i].data.fd == exitFds[0]){ printf("start exit\n"); close(socketFd); //两种方式:1、暴力Kill 2、同步退出机制 //暴力kill for(j = 0;j < childNum; j++){ //kill(pChild[j].pid,9); sendFd(pChild[j].fd,0);//给所有的子进程发送0号描述符(没啥用),exitFlag为0 } for(j = 0;j < childNum; j++){ wait(NULL); } return 0; } for(j = 0; j < childNum; j++){ if(evs[i].data.fd == pChild[j].fd){ //遍历所有子进程的fd //判断就绪描述符是哪个子进程的管道对端,说明子进程已完成任务,就将对应子进程标记为非忙碌,并读出管道内容。 //printf("%d %d\n",1);//对端写一个字节这边读一个字节标记已完成任务,如果数据不读出,则会一直可读状态 // if(0 == ret2){ // return -1; // } pChild[j].busy = 0; //printf("ret2 = %d\n",pChild[j].pid); } } // printf("%d\n",count++); // sleep(3); } } return 0; }
child.c
#include "function.h" //服务端 int makeChild(Process_Data *pChild,int childNum) { int i; pid_t pid; int fds[2]; int ret; for(i=0;i < childNum;i++) { //初始化socketpair类型描述符,与pipe不同,每一端既可读又可写 ret = socketpair(AF_LOCAL,fds); ERROR_CHECK(ret,"socketpair"); pid = fork(); if(0 == pid) //子进程 { close(fds[1]); ret = childHandle(fds[0]); if(-1 == ret){ return -1; } } close(fds[0]); //父进程 pChild[i].pid = pid; pChild[i].fd = fds[1]; pChild[i].busy = 0; } return 0; } int childHandle(int fd) { int ret; int newFd; int exitFlag; while(1){ //开5个子进程,newFd为10,因内核控制信息,父子进程共享同一块文件描述符 recvFd(fd,&newFd,&exitFlag);//接收到任务 #ifdef DEBUG printf("newFd = %d\n",newFd); #endif if(exitFlag){ ret = tranFile(newFd);//给客户端发送文件 printf("I get task %d\n",newFd); if(-1 == ret){ printf("tranFile not finish!\n"); continue; } //newFd的值为10,socketFd为3,有五个子进程管道,为4-8,epfd也占1 printf("finish send file\n"); close(newFd); } else{ exit(0);//不能用break } // if(0 == newFd){ // printf("conncect Failed!\n"); // return -1; // } write(fd,1); //通知父进程非忙碌,写一个字节即可 } }
tran_file.c
#include "function.h" int tranFile(int newFd){ Train_t train; int ret; //发送文件名 train.dataLen = strlen(FILENAME); strcpy(train.buf,"send"); return 0; }
tcpInit.c
#include "function.h" int tcpInit(int *pSocketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息 *pSocketFd = socketFd; return 0; }
send_fd.c
#include "function.h" int sendFd(int sfd,int fd,int exitFlag) { struct msghdr msg; memset(&msg,sizeof(msg)); struct iovec iov[2]; char buf2[10]="world"; iov[0].iov_base= &exitFlag; iov[0].iov_len=5; iov[1].iov_base=buf2; iov[1].iov_len=5; msg.msg_iov=iov; msg.msg_iovlen=2; struct cmsghdr *cmsg; int len=CMSG_LEN(sizeof(int)); cmsg=(struct cmsghdr *)calloc(1,len); cmsg->cmsg_len=len; cmsg->cmsg_level=SOL_SOCKET; cmsg->cmsg_type=SCM_RIGHTS; *(int*)CMSG_DATA(cmsg)=fd; msg.msg_control=cmsg; msg.msg_controllen=len; int ret; ret=sendmsg(sfd,&msg,"sendmsg"); return 0; } int recvFd(int sfd,int *fd,int *exitFlag) { struct msghdr msg; memset(&msg,sizeof(msg)); struct iovec iov[2]; char buf2[10]; iov[0].iov_base=exitFlag; iov[0].iov_len=5; iov[1].iov_base=buf2; iov[1].iov_len=5; msg.msg_iov=iov; msg.msg_iovlen=2; struct cmsghdr *cmsg; int len=CMSG_LEN(sizeof(int)); cmsg=(struct cmsghdr *)calloc(1,len); cmsg->cmsg_len=len; cmsg->cmsg_level=SOL_SOCKET; cmsg->cmsg_type=SCM_RIGHTS; msg.msg_control=cmsg; msg.msg_controllen=len; int ret; ret=recvmsg(sfd,"sendmsg"); *fd=*(int*)CMSG_DATA(cmsg); return 0; }
客户端
client.c
#include "function.h" int main(int argc,dataLen); //接受文件内容 sliceSize = fileSize / 10000; while(1) { ret = recvCycle(socketFd,4); if(-1 == ret){ printf("\n"); printf("server is update!\n"); break; } if(dataLen > 0) { ret = recvCycle(socketFd,dataLen); if(-1 == ret){ printf("\n"); printf("server is update!\n"); break; } write(fd,dataLen); downLoadSize += dataLen; if(downLoadSize - oldSize > sliceSize){ printf("\r%5.2f%%",(float)downLoadSize / fileSize * 100); fflush(stdout); oldSize = downLoadSize; } } else{ printf("\r100%% \n"); break; } } close(fd); close(socketFd); return 0; }
recvCycle.c
#include "function.h" int recvCycle(int newFd,0); //printf("recv ret = %d\n",ret); if(0 == ret){ return -1; } total = total + ret; } return 0; }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。