From 5fcf83e61aa8e4e6238da5875436c6a595730099 Mon Sep 17 00:00:00 2001 From: wangyu- Date: Sun, 3 Jun 2018 12:06:47 -0500 Subject: [PATCH] server works --- common.h | 1 + connection.cpp | 10 + connection.h | 10 +- tunnel_client.cpp | 10 +- tunnel_server.cpp | 627 +++++++++++++++++++++++++++------------------- 5 files changed, 393 insertions(+), 265 deletions(-) diff --git a/common.h b/common.h index 65c6514..83b8ad1 100644 --- a/common.h +++ b/common.h @@ -178,6 +178,7 @@ struct dest_t struct fd_info_t { ip_port_t ip_port; + ev_io io_watcher; }; struct pseudo_header { diff --git a/connection.cpp b/connection.cpp index a1c8416..2aa77a9 100644 --- a/connection.cpp +++ b/connection.cpp @@ -18,7 +18,17 @@ void server_clear_function(u64_t u64)//used in conv_manager in server mode.for s { fd64_t fd64=u64; assert(fd_manager.exist(fd64)); + ev_io &watcher= fd_manager.get_info(fd64).io_watcher; + + ip_port_t &ip_port=fd_manager.get_info(fd64).ip_port;// + assert(conn_manager.exist(ip_port));// + ev_loop *loop =conn_manager.find(ip_port).loop; // overkill ? should we just use ev_default_loop(0)? + + ev_io_stop(loop,&watcher); + fd_manager.fd64_close(fd64); + + } //////////////////////////////////////////////////////////////////// diff --git a/connection.h b/connection.h index 96dbe11..dab8ab8 100644 --- a/connection.h +++ b/connection.h @@ -108,13 +108,17 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o fec_decode_manager_t fec_decode_manager; ev_timer timer; //my_timer_t timer; - //ip_port_t ip_port; + u64_t last_active_time; stat_t stat; + ev_loop* loop; int local_listen_fd; - int remote_fd; - fd64_t remote_fd64; + + int remote_fd; //only used for client + fd64_t remote_fd64;//only used for client + + ip_port_t ip_port;//only used for server conn_info_t() { diff --git a/tunnel_client.cpp b/tunnel_client.cpp index 7e12402..8101241 100644 --- a/tunnel_client.cpp +++ b/tunnel_client.cpp @@ -198,6 +198,10 @@ static void delay_manager_cb(struct ev_loop *loop, struct ev_timer *watcher, int { assert(!(revents&EV_ERROR)); + //uint64_t value; + //read(delay_manager.get_timer_fd(), &value, 8); + //mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); + //do nothing } @@ -364,7 +368,7 @@ int tunnel_client_event_loop() // mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); // myexit(-1); //} - //mylog(log_info,"fifo_file=%s\n",fifo_file); + mylog(log_info,"fifo_file=%s\n",fifo_file); ev_io_init(&fifo_watcher, fifo_cb, fifo_fd, EV_READ); ev_io_start(loop, &fifo_watcher); @@ -414,9 +418,7 @@ int tunnel_client_event_loop() } else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) { - //uint64_t value; - //read(delay_manager.get_timer_fd(), &value, 8); - //mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); + } else if(events[idx].data.u64>u32_t(-1) ) { diff --git a/tunnel_server.cpp b/tunnel_server.cpp index 20895e6..69743d5 100644 --- a/tunnel_server.cpp +++ b/tunnel_server.cpp @@ -6,44 +6,339 @@ */ #include "tunnel.h" -void data_from_local_or_fec_timeout(conn_info_t & conn_info,int from_local) -{ +static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents); +static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents); +static void remote_cb(struct ev_loop *loop, struct ev_io *watcher, int revents); + +enum tmp_mode_t{is_from_remote=0,is_fec_timeout,is_conn_timer}; + +void data_from_remote_or_fec_timeout_or_conn_timer(conn_info_t & conn_info,fd64_t fd64,tmp_mode_t mode) +{ + int ret; + + char data[buf_len]; + int data_len; + u32_t conv; + //fd64_t fd64=events[idx].data.u64; + //mylog(log_trace,"events[idx].data.u64 >u32_t(-1),%llu\n",(u64_t)events[idx].data.u64); + + + //assert(fd_manager.exist_info(fd64)); + //ip_port_t ip_port=fd_manager.get_info(fd64).ip_port; + + + //conn_info_t &conn_info=conn_manager.find(ip_port); + ip_port_t &ip_port=conn_info.ip_port; + assert(conn_manager.exist(ip_port)); + + int &local_listen_fd=conn_info.local_listen_fd; + + int out_n=-2;char **out_arr;int *out_len;my_time_t *out_delay; + + dest_t dest; + dest.inner.fd_ip_port.fd=local_listen_fd; + dest.inner.fd_ip_port.ip_port=ip_port; + dest.type=type_fd_ip_port; + dest.cook=1; + + if(mode==is_fec_timeout) + { + assert(fd64==0); + //uint64_t value; + //if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8) + //{ + // mylog(log_trace,"fd_manager.to_fd(fd64), &value, 8)!=8 ,%d\n",ret); + // continue; + //} + //if(value==0) + //{ + // mylog(log_trace,"value==0\n"); + // continue; + //} + //assert(value==1); + from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); + } + else if(mode==is_conn_timer) + { + assert(fd64==0); + //uint64_t value; + //read(conn_info.timer.get_timer_fd(), &value, 8); + conn_info.conv_manager.clear_inactive(); + if(debug_force_flush_fec) + { + from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); + } + + conn_info.stat.report_as_server(ip_port); + return; + } + else if(mode==is_from_remote) + { + if(!fd_manager.exist(fd64)) //fd64 has been closed + { + mylog(log_warn,"!fd_manager.exist(fd64)\n"); + return; + } + + //fd64_t &fd64 =conn_info.remote_fd64; + assert(conn_info.conv_manager.is_u64_used(fd64)); + + conv=conn_info.conv_manager.find_conv_by_u64(fd64); + conn_info.conv_manager.update_active_time(conv); + conn_info.update_active_time(); + + int fd=fd_manager.to_fd(fd64); + data_len=recv(fd,data,max_data_len,0); + + mylog(log_trace,"received a packet from udp_fd,len:%d,conv=%d\n",data_len,conv); + + if(data_len<0) + { + mylog(log_debug,"udp fd,recv_len<0 continue,%s\n",strerror(errno)); + + return; + } + + if(!disable_mtu_warn&&data_len>=mtu_warn) + { + mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); + } + + char * new_data; + int new_len; + put_conv(conv,data,data_len,new_data,new_len); + + from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); + } + else + { + assert(0==1); + } + + mylog(log_trace,"out_n=%d\n",out_n); + for(int i=0;ifd; + int ret; + + mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n"); + char data[buf_len]; + int data_len; + struct sockaddr_in udp_new_addr_in={0}; + socklen_t udp_new_addr_len = sizeof(sockaddr_in); + if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0, + (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { + mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno)); + return; + }; + mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), + ntohs(udp_new_addr_in.sin_port),data_len); + + if(!disable_mtu_warn&&data_len>=mtu_warn)///////////////////////delete this for type 0 in furture + { + mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); + } + + + if(de_cook(data,data_len)!=0) + { + mylog(log_debug,"de_cook error"); + return; + } + + + ip_port_t ip_port; + ip_port.ip=udp_new_addr_in.sin_addr.s_addr; + ip_port.port=ntohs(udp_new_addr_in.sin_port); + mylog(log_trace,"ip_port= %s\n",ip_port.to_s()); + if(!conn_manager.exist(ip_port)) + { + if(conn_manager.mp.size() >=max_conn_num) + { + mylog(log_warn,"new connection %s ignored bc max_conn_num exceed\n",ip_port.to_s()); + return; + } + + conn_manager.insert(ip_port); + conn_info_t &conn_info=conn_manager.find(ip_port); + conn_info.ip_port=ip_port; + conn_info.loop=ev_default_loop(0); + conn_info.local_listen_fd=local_listen_fd; + + //u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64(); + //mylog(log_debug,"fec_fd64=%llu\n",fec_fd64); + //ev.events = EPOLLIN; + //ev.data.u64 = fec_fd64; + //ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev); + + //fd_manager.get_info(fec_fd64).ip_port=ip_port; + + conn_info.timer.data=&conn_info; + ev_init(&conn_info.timer,conn_timer_cb); + ev_timer_set(&conn_info.timer, 0, timer_interval/1000.0 ); + ev_timer_start(loop,&conn_info.timer); + + //conn_info.timer.add_fd64_to_epoll(epoll_fd); + //conn_info.timer.set_timer_repeat_us(timer_interval*1000); + + //mylog(log_debug,"conn_info.timer.get_timer_fd64()=%llu\n",conn_info.timer.get_timer_fd64()); + + //u64_t timer_fd64=conn_info.timer.get_timer_fd64(); + //fd_manager.get_info(timer_fd64).ip_port=ip_port; + + + + conn_info.fec_encode_manager.set_data(&conn_info); + conn_info.fec_encode_manager.set_loop_and_cb(loop,fec_encode_cb); + + + mylog(log_info,"new connection from %s\n",ip_port.to_s()); + + } + conn_info_t &conn_info=conn_manager.find(ip_port); + + conn_info.update_active_time(); + int out_n;char **out_arr;int *out_len;my_time_t *out_delay; + from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay); + + mylog(log_trace,"out_n= %d\n",out_n); + for(int i=0;i=max_conv_num) + { + mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); + continue; + } + + int new_udp_fd; + ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port); + + if (ret != 0) { + mylog(log_warn, "[%s]new_connected_socket failed\n",ip_port.to_s()); + continue; + } + + fd64_t fd64 = fd_manager.create(new_udp_fd); + //ev.events = EPOLLIN; + //ev.data.u64 = fd64; + //ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev); + + conn_info.conv_manager.insert_conv(conv, fd64); + fd_manager.get_info(fd64).ip_port=ip_port; + + ev_io &io_watcher=fd_manager.get_info(fd64).io_watcher; + io_watcher.u64=fd64; + io_watcher.data=&conn_info; + + ev_init(&io_watcher,remote_cb); + ev_io_set(&io_watcher,new_udp_fd,EV_READ); + ev_io_start(conn_info.loop,&io_watcher); + + + mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",ip_port.to_s(),conv,new_udp_fd,fd64); + } + conn_info.conv_manager.update_active_time(conv); + fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv); + dest_t dest; + dest.type=type_fd64; + dest.inner.fd64=fd64; + delay_send(out_delay[i],dest,new_data,new_len); + } } static void remote_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { + assert(!(revents&EV_ERROR)); + conn_info_t & conn_info= *((conn_info_t*)watcher->data); + fd64_t fd64=watcher->u64; + + data_from_remote_or_fec_timeout_or_conn_timer(conn_info,fd64,is_from_remote); } static void fifo_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { + assert(!(revents&EV_ERROR)); + int fifo_fd=watcher->fd; + + char buf[buf_len]; + int len=read (fifo_fd, buf, sizeof (buf)); + if(len<0) + { + mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); + return; + } + buf[len]=0; + handle_command(buf); } static void delay_manager_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) { + assert(!(revents&EV_ERROR)); + + //uint64_t value; + //read(delay_manager.get_timer_fd(), &value, 8); + //mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); + + //do nothing } static void fec_encode_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) { + assert(!(revents&EV_ERROR)); + conn_info_t & conn_info= *((conn_info_t*)watcher->data); + + data_from_remote_or_fec_timeout_or_conn_timer(conn_info,0,is_fec_timeout); } static void conn_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) { + assert(!(revents&EV_ERROR)); + conn_info_t & conn_info= *((conn_info_t*)watcher->data); + + data_from_remote_or_fec_timeout_or_conn_timer(conn_info,0,is_conn_timer); } static void prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, int revents) { + assert(!(revents&EV_ERROR)); + delay_manager.check(); +} + +static void global_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents) +{ + assert(!(revents&EV_ERROR)); + + //uint64_t value; + //read(timer.get_timer_fd(), &value, 8); + conn_manager.clear_inactive(); + mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n"); } int tunnel_server_event_loop() @@ -51,64 +346,93 @@ int tunnel_server_event_loop() int i, j, k;int ret; int yes = 1; - int epoll_fd; - int remote_fd; + //int epoll_fd; + //int remote_fd; int local_listen_fd; new_listen_socket(local_listen_fd,local_ip_uint32,local_port); - epoll_fd = epoll_create1(0); - assert(epoll_fd>0); + //epoll_fd = epoll_create1(0); + //assert(epoll_fd>0); - const int max_events = 4096; - struct epoll_event ev, events[max_events]; - if (epoll_fd < 0) { - mylog(log_fatal,"epoll return %d\n", epoll_fd); - myexit(-1); - } + //const int max_events = 4096; + //struct epoll_event ev, events[max_events]; + //if (epoll_fd < 0) { + // mylog(log_fatal,"epoll return %d\n", epoll_fd); + // myexit(-1); + //} - ev.events = EPOLLIN; - ev.data.u64 = local_listen_fd; - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev); - if (ret!=0) { - mylog(log_fatal,"add udp_listen_fd error\n"); - myexit(-1); - } + struct ev_loop * loop= ev_default_loop(0); + assert(loop != NULL); - ev.events = EPOLLIN; - ev.data.u64 = delay_manager.get_timer_fd(); - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); - if (ret!= 0) { - mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); - myexit(-1); - } + //ev.events = EPOLLIN; + //ev.data.u64 = local_listen_fd; + //ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, local_listen_fd, &ev); + //if (ret!=0) { + // mylog(log_fatal,"add udp_listen_fd error\n"); + // myexit(-1); + //} + struct ev_io local_listen_watcher; + ev_io_init(&local_listen_watcher, local_listen_cb, local_listen_fd, EV_READ); + ev_io_start(loop, &local_listen_watcher); - mylog(log_debug," delay_manager.get_timer_fd() =%d\n", delay_manager.get_timer_fd()); + //ev.events = EPOLLIN; + //ev.data.u64 = delay_manager.get_timer_fd(); + //ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); + //if (ret!= 0) { + // mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + // myexit(-1); + //} + + delay_manager.set_loop_and_cb(loop,delay_manager_cb); + + //mylog(log_debug," delay_manager.get_timer_fd() =%d\n", delay_manager.get_timer_fd()); mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port); - my_timer_t timer; - timer.add_fd_to_epoll(epoll_fd); - timer.set_timer_repeat_us(timer_interval*1000); + //my_timer_t timer; + //timer.add_fd_to_epoll(epoll_fd); + //timer.set_timer_repeat_us(timer_interval*1000); - mylog(log_debug," timer.get_timer_fd() =%d\n",timer.get_timer_fd()); + ev_timer global_timer; + ev_init(&global_timer,global_timer_cb); + ev_timer_set(&global_timer, 0, timer_interval/1000.0 ); + ev_timer_start(loop,&global_timer); + + //mylog(log_debug," timer.get_timer_fd() =%d\n",timer.get_timer_fd()); + + struct ev_io fifo_watcher; int fifo_fd=-1; if(fifo_file[0]!=0) { fifo_fd=create_fifo(fifo_file); - ev.events = EPOLLIN; - ev.data.u64 = fifo_fd; + //ev.events = EPOLLIN; + //ev.data.u64 = fifo_fd; + + //ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev); + //if (ret!= 0) { + //mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); + //myexit(-1); + //} + ev_io_init(&fifo_watcher, fifo_cb, fifo_fd, EV_READ); + ev_io_start(loop, &fifo_watcher); - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev); - if (ret!= 0) { - mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); - myexit(-1); - } mylog(log_info,"fifo_file=%s\n",fifo_file); } + ev_prepare prepare_watcher; + ev_init(&prepare_watcher,prepare_cb); + ev_prepare_start(loop,&prepare_watcher); + + + ev_run(loop, 0); + + mylog(log_warn,"ev_run returned\n"); + myexit(0); + + /* while(1)//////////////////////// { @@ -131,240 +455,25 @@ int tunnel_server_event_loop() { if(events[idx].data.u64==(u64_t)timer.get_timer_fd()) { - uint64_t value; - read(timer.get_timer_fd(), &value, 8); - conn_manager.clear_inactive(); - mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n"); + } else if (events[idx].data.u64 == (u64_t)fifo_fd) { - char buf[buf_len]; - int len=read (fifo_fd, buf, sizeof (buf)); - if(len<0) - { - mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); - continue; - } - buf[len]=0; - handle_command(buf); + } else if (events[idx].data.u64 == (u64_t)local_listen_fd) { - mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n"); - char data[buf_len]; - int data_len; - struct sockaddr_in udp_new_addr_in={0}; - socklen_t udp_new_addr_len = sizeof(sockaddr_in); - if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0, - (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { - mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno)); - continue; - }; - mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), - ntohs(udp_new_addr_in.sin_port),data_len); - if(!disable_mtu_warn&&data_len>=mtu_warn)///////////////////////delete this for type 0 in furture - { - mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); - } - - - if(de_cook(data,data_len)!=0) - { - mylog(log_debug,"de_cook error"); - continue; - } - - - ip_port_t ip_port; - ip_port.ip=udp_new_addr_in.sin_addr.s_addr; - ip_port.port=ntohs(udp_new_addr_in.sin_port); - mylog(log_trace,"ip_port= %s\n",ip_port.to_s()); - if(!conn_manager.exist(ip_port)) - { - if(conn_manager.mp.size() >=max_conn_num) - { - mylog(log_warn,"new connection %s ignored bc max_conn_num exceed\n",ip_port.to_s()); - continue; - } - - conn_manager.insert(ip_port); - conn_info_t &conn_info=conn_manager.find(ip_port); - - u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64(); - mylog(log_debug,"fec_fd64=%llu\n",fec_fd64); - ev.events = EPOLLIN; - ev.data.u64 = fec_fd64; - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev); - - fd_manager.get_info(fec_fd64).ip_port=ip_port; - - conn_info.timer.add_fd64_to_epoll(epoll_fd); - conn_info.timer.set_timer_repeat_us(timer_interval*1000); - - mylog(log_debug,"conn_info.timer.get_timer_fd64()=%llu\n",conn_info.timer.get_timer_fd64()); - - u64_t timer_fd64=conn_info.timer.get_timer_fd64(); - fd_manager.get_info(timer_fd64).ip_port=ip_port; - - mylog(log_info,"new connection from %s\n",ip_port.to_s()); - - } - conn_info_t &conn_info=conn_manager.find(ip_port); - - conn_info.update_active_time(); - int out_n;char **out_arr;int *out_len;my_time_t *out_delay; - from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay); - - mylog(log_trace,"out_n= %d\n",out_n); - for(int i=0;i=max_conv_num) - { - mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); - continue; - } - - int new_udp_fd; - ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port); - - if (ret != 0) { - mylog(log_warn, "[%s]new_connected_socket failed\n",ip_port.to_s()); - continue; - } - - fd64_t fd64 = fd_manager.create(new_udp_fd); - ev.events = EPOLLIN; - ev.data.u64 = fd64; - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev); - - conn_info.conv_manager.insert_conv(conv, fd64); - fd_manager.get_info(fd64).ip_port=ip_port; - - - mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",ip_port.to_s(),conv,new_udp_fd,fd64); - } - conn_info.conv_manager.update_active_time(conv); - fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv); - dest_t dest; - dest.type=type_fd64; - dest.inner.fd64=fd64; - delay_send(out_delay[i],dest,new_data,new_len); - } } else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) { - uint64_t value; - read(delay_manager.get_timer_fd(), &value, 8); - mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); + } else if (events[idx].data.u64 >u32_t(-1)) { - char data[buf_len]; - int data_len; - u32_t conv; - fd64_t fd64=events[idx].data.u64; - mylog(log_trace,"events[idx].data.u64 >u32_t(-1),%llu\n",(u64_t)events[idx].data.u64); - if(!fd_manager.exist(fd64)) //fd64 has been closed - { - mylog(log_trace,"!fd_manager.exist(fd64)\n"); - continue; - } - assert(fd_manager.exist_info(fd64)); - ip_port_t ip_port=fd_manager.get_info(fd64).ip_port; - assert(conn_manager.exist(ip_port)); - - conn_info_t &conn_info=conn_manager.find(ip_port); - - int out_n=-2;char **out_arr;int *out_len;my_time_t *out_delay; - - dest_t dest; - dest.inner.fd_ip_port.fd=local_listen_fd; - dest.inner.fd_ip_port.ip_port=ip_port; - dest.type=type_fd_ip_port; - dest.cook=1; - - if(fd64==conn_info.fec_encode_manager.get_timer_fd64()) - { - uint64_t value; - if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8) - { - mylog(log_trace,"fd_manager.to_fd(fd64), &value, 8)!=8 ,%d\n",ret); - continue; - } - if(value==0) - { - mylog(log_trace,"value==0\n"); - continue; - } - assert(value==1); - from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); - } - else if(fd64==conn_info.timer.get_timer_fd64()) - { - uint64_t value; - read(conn_info.timer.get_timer_fd(), &value, 8); - conn_info.conv_manager.clear_inactive(); - if(debug_force_flush_fec) - { - from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); - } - - conn_info.stat.report_as_server(ip_port); - continue; - } - else - { - assert(conn_info.conv_manager.is_u64_used(fd64)); - - conv=conn_info.conv_manager.find_conv_by_u64(fd64); - conn_info.conv_manager.update_active_time(conv); - conn_info.update_active_time(); - - int fd=fd_manager.to_fd(fd64); - data_len=recv(fd,data,max_data_len,0); - - mylog(log_trace,"received a packet from udp_fd,len:%d,conv=%d\n",data_len,conv); - - if(data_len<0) - { - mylog(log_debug,"udp fd,recv_len<0 continue,%s\n",strerror(errno)); - - continue; - } - - if(!disable_mtu_warn&&data_len>=mtu_warn) - { - mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); - } - - char * new_data; - int new_len; - put_conv(conv,data,data_len,new_data,new_len); - - from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); - } - - mylog(log_trace,"out_n=%d\n",out_n); - for(int i=0;i