From ae2fbd4b191803a82d8244fab044343337e906ee Mon Sep 17 00:00:00 2001 From: wangyu- Date: Thu, 7 Jun 2018 06:24:31 -0500 Subject: [PATCH] libev works --- main.cpp | 278 ++++++++++++++++++++++++++++++++++--------------------- misc.cpp | 7 +- misc.h | 4 +- 3 files changed, 178 insertions(+), 111 deletions(-) diff --git a/main.cpp b/main.cpp index c7acd61..10e4fe3 100755 --- a/main.cpp +++ b/main.cpp @@ -456,10 +456,111 @@ int client_on_raw_recv(conn_info_t &conn_info) //called when raw fd received a p } return 0; } + +void udp_accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + char buf[buf_len]; + + conn_info_t & conn_info= *((conn_info_t*)watcher->data);; + + int recv_len; + struct sockaddr_in udp_new_addr_in={0}; + socklen_t udp_new_addr_len = sizeof(sockaddr_in); + if ((recv_len = recvfrom(udp_fd, buf, max_data_len+1, 0, + (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { + mylog(log_error,"recv_from error,this shouldnt happen at client\n"); + myexit(1); + }; + + if(recv_len==max_data_len+1) + { + mylog(log_warn,"huge packet, data_len > %d,dropped\n",max_data_len); + return; + } + + if(recv_len>=mtu_warn) + { + mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",recv_len,mtu_warn); + } + mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), + ntohs(udp_new_addr_in.sin_port),recv_len); + + u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port); + u32_t conv; + + if(!conn_info.blob->conv_manager.is_u64_used(u64)) + { + if(conn_info.blob->conv_manager.get_size() >=max_conv_num) + { + mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); + return; + } + conv=conn_info.blob->conv_manager.get_new_conv(); + conn_info.blob->conv_manager.insert_conv(conv,u64); + mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv); + } + else + { + conv=conn_info.blob->conv_manager.find_conv_by_u64(u64); + } + + conn_info.blob->conv_manager.update_active_time(conv); + + if(conn_info.state.client_current_state==client_ready) + { + send_data_safer(conn_info,buf,recv_len,conv); + } + +} + +void raw_recv_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + conn_info_t & conn_info= *((conn_info_t*)watcher->data); + client_on_raw_recv(conn_info); +} + +void clear_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) +{ + conn_info_t & conn_info= *((conn_info_t*)watcher->data); + //u64_t value; + //read(timer_fd, &value, 8); + client_on_timer(conn_info); + mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter); + epoll_trigger_counter=0; +} + +void fifo_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + conn_info_t & conn_info= *((conn_info_t*)watcher->data); + + char buf[buf_len]; + int fifo_fd=watcher->fd; + + int len=read (fifo_fd, buf, sizeof (buf)); + if(len<0) + { + mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); + return; + } + buf[len]=0; + while(len>=1&&buf[len-1]=='\n') + buf[len-1]=0; + mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf); + if(strcmp(buf,"reconnect")==0) + { + mylog(log_info,"received command: reconnect\n"); + conn_info.state.client_current_state=client_idle; + conn_info.my_id=get_true_random_number_nz(); + } + else + { + mylog(log_info,"unknown command\n"); + } + +} + int client_event_loop() { - - char buf[buf_len]; conn_info_t conn_info; @@ -538,52 +639,88 @@ int client_event_loop() myexit(1); } setnonblocking(udp_fd); - epollfd = epoll_create1(0); - const int max_events = 4096; - struct epoll_event ev, events[max_events]; - if (epollfd < 0) { - mylog(log_fatal,"epoll return %d\n", epollfd); - myexit(-1); - } + //epollfd = epoll_create1(0); - ev.events = EPOLLIN; - ev.data.u64 = udp_fd; - ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, udp_fd, &ev); - if (ret!=0) { - mylog(log_fatal,"add udp_listen_fd error\n"); - myexit(-1); - } - ev.events = EPOLLIN; - ev.data.u64 = raw_recv_fd; + //const int max_events = 4096; + //struct epoll_event ev, events[max_events]; + //if (epollfd < 0) { + // mylog(log_fatal,"epoll return %d\n", epollfd); + // myexit(-1); + //} + + struct ev_loop * loop= ev_default_loop(0); + assert(loop != NULL); + + //ev.events = EPOLLIN; + //ev.data.u64 = udp_fd; + //ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, udp_fd, &ev); + //if (ret!=0) { + // mylog(log_fatal,"add udp_listen_fd error\n"); + // myexit(-1); + //} + + + struct ev_io udp_accept_watcher; + + udp_accept_watcher.data=&conn_info; + ev_io_init(&udp_accept_watcher, udp_accept_cb, udp_fd, EV_READ); + ev_io_start(loop, &udp_accept_watcher); + + + //ev.events = EPOLLIN; + //ev.data.u64 = raw_recv_fd; + + //ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, raw_recv_fd, &ev); + //if (ret!= 0) { + // mylog(log_fatal,"add raw_fd error\n"); + // myexit(-1); + //} + + struct ev_io raw_watcher; + + raw_watcher.data=&conn_info; + ev_io_init(&raw_watcher, raw_recv_cb, raw_recv_fd, EV_READ); + ev_io_start(loop, &raw_watcher); - ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, raw_recv_fd, &ev); - if (ret!= 0) { - mylog(log_fatal,"add raw_fd error\n"); - myexit(-1); - } int unbind=1; - set_timer(epollfd,timer_fd); + //set_timer(epollfd,timer_fd); + + struct ev_timer clear_timer; + + clear_timer.data=&conn_info; + ev_timer_init(&clear_timer, clear_timer_cb, 0, timer_interval/1000.0); + ev_timer_start(loop, &clear_timer); mylog(log_debug,"send_raw : from %x %d to %x %d\n",send_info.src_ip,send_info.src_port,send_info.dst_ip,send_info.dst_port); int fifo_fd=-1; + struct ev_io fifo_watcher; + fifo_watcher.data=&conn_info; + if(fifo_file[0]!=0) { fifo_fd=create_fifo(fifo_file); - ev.events = EPOLLIN; - ev.data.u64 = fifo_fd; + //ev.events = EPOLLIN; + //ev.data.u64 = fifo_fd; + + //ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, fifo_fd, &ev); + //if (ret!= 0) { + // mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); + // myexit(-1); + //} + + ev_io_init(&fifo_watcher, fifo_cb, fifo_fd, EV_READ); + ev_io_start(loop, &fifo_watcher); - ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, fifo_fd, &ev); - if (ret!= 0) { - mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); - myexit(-1); - } mylog(log_info,"fifo_file=%s\n",fifo_file); } + + ev_run(loop, 0); + /* while(1)//////////////////////// { if(about_to_exit) myexit(0); @@ -605,90 +742,18 @@ int client_event_loop() if (events[idx].data.u64 == (u64_t)raw_recv_fd) { iphdr *iph;tcphdr *tcph; - client_on_raw_recv(conn_info); + } else if(events[idx].data.u64 ==(u64_t)timer_fd) { - u64_t value; - read(timer_fd, &value, 8); - client_on_timer(conn_info); - mylog(log_trace,"epoll_trigger_counter: %d \n",epoll_trigger_counter); - epoll_trigger_counter=0; + } else if (events[idx].data.u64 == (u64_t)fifo_fd) { - int len=read (fifo_fd, buf, sizeof (buf)); - if(len<0) - { - mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); - continue; - } - buf[len]=0; - while(len>=1&&buf[len-1]=='\n') - buf[len-1]=0; - mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf); - if(strcmp(buf,"reconnect")==0) - { - mylog(log_info,"received command: reconnect\n"); - conn_info.state.client_current_state=client_idle; - conn_info.my_id=get_true_random_number_nz(); - } - else - { - mylog(log_info,"unknown command\n"); - } - } else if (events[idx].data.u64 == (u64_t)udp_fd) { - int recv_len; - struct sockaddr_in udp_new_addr_in={0}; - socklen_t udp_new_addr_len = sizeof(sockaddr_in); - if ((recv_len = recvfrom(udp_fd, buf, max_data_len+1, 0, - (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { - mylog(log_error,"recv_from error,this shouldnt happen at client\n"); - myexit(1); - }; - - if(recv_len==max_data_len+1) - { - mylog(log_warn,"huge packet, data_len > %d,dropped\n",max_data_len); - continue; - } - - if(recv_len>=mtu_warn) - { - mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",recv_len,mtu_warn); - } - mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), - ntohs(udp_new_addr_in.sin_port),recv_len); - - u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port); - u32_t conv; - - if(!conn_info.blob->conv_manager.is_u64_used(u64)) - { - if(conn_info.blob->conv_manager.get_size() >=max_conv_num) - { - mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); - continue; - } - conv=conn_info.blob->conv_manager.get_new_conv(); - conn_info.blob->conv_manager.insert_conv(conv,u64); - mylog(log_info,"new packet from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv); - } - else - { - conv=conn_info.blob->conv_manager.find_conv_by_u64(u64); - } - - conn_info.blob->conv_manager.update_active_time(conv); - - if(conn_info.state.client_current_state==client_ready) - { - send_data_safer(conn_info,buf,recv_len,conv); - } } else { @@ -696,7 +761,8 @@ int client_event_loop() myexit(-1); } } - } + }*/ + return 0; } diff --git a/misc.cpp b/misc.cpp index e721882..a4efe55 100644 --- a/misc.cpp +++ b/misc.cpp @@ -39,8 +39,8 @@ id_t const_id=0;//an id used for connection recovery,its generated randomly,it n int udp_fd=-1; //for client only. client use this fd to listen and handle udp connection int bind_fd=-1; //bind only,never send or recv. its just a dummy fd for bind,so that other program wont occupy the same port -int epollfd=-1; //fd for epoll -int timer_fd=-1; //the general timer fd for client and server.for server this is not the only timer find,every connection has a timer fd. +//int epollfd=-1; //fd for epoll +//int timer_fd=-1; //the general timer fd for client and server.for server this is not the only timer find,every connection has a timer fd. int fail_time_counter=0;//determine if the max_fail_time is reached int epoll_trigger_counter=0;//for debug only int debug_flag=0;//for debug only @@ -1019,6 +1019,7 @@ int unit_test() } +/* int set_timer(int epollfd,int &timer_fd)//put a timer_fd into epoll,general function,used both in client and server { int ret; @@ -1047,7 +1048,7 @@ int set_timer(int epollfd,int &timer_fd)//put a timer_fd into epoll,general func myexit(-1); } return 0; -} +}*/ int set_timer_server(int epollfd,int &timer_fd,fd64_t &fd64)//only for server diff --git a/misc.h b/misc.h index 3f43a9d..772b52b 100644 --- a/misc.h +++ b/misc.h @@ -84,8 +84,8 @@ extern id_t const_id;//an id used for connection recovery,its generated randomly extern int udp_fd; //for client only. client use this fd to listen and handle udp connection extern int bind_fd; //bind only,never send or recv. its just a dummy fd for bind,so that other program wont occupy the same port -extern int epollfd; //fd for epoll -extern int timer_fd; //the general timer fd for client and server.for server this is not the only timer find,every connection has a timer fd. +//extern int epollfd; //fd for epoll +//extern int timer_fd; //the general timer fd for client and server.for server this is not the only timer find,every connection has a timer fd. extern int fail_time_counter;//determine if the max_fail_time is reached extern int epoll_trigger_counter;//for debug only extern int debug_flag;//for debug only