From 8e00273db4c229b43b689a297863008289c86e84 Mon Sep 17 00:00:00 2001 From: wangyu- Date: Fri, 27 Oct 2017 08:28:43 -0500 Subject: [PATCH] just commit --- connection.h | 2 +- fec_manager.cpp | 24 ++++--------- fec_manager.h | 42 ++++++++++++++++++++-- misc.cpp | 2 +- tun_dev.cpp | 95 +++++++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 137 insertions(+), 28 deletions(-) diff --git a/connection.h b/connection.h index 2c326ae..04cf1c9 100644 --- a/connection.h +++ b/connection.h @@ -107,7 +107,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o fec_encode_manager_t fec_encode_manager; fec_decode_manager_t fec_decode_manager; my_timer_t timer; - ip_port_t ip_port; + //ip_port_t ip_port; u64_t last_active_time; stat_t stat; conn_info_t() diff --git a/fec_manager.cpp b/fec_manager.cpp index 6f19e5c..1b628c2 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -146,11 +146,11 @@ fec_encode_manager_t::fec_encode_manager_t() } timer_fd64=fd_manager.create(timer_fd); - re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_queue_len,fec_timeout,fec_mode); + reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode); + - seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. } -int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode) +int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode) { fec_data_num=data_num; fec_redundant_num=redundant_num; @@ -160,15 +160,8 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int que fec_mode=mode; assert(data_num+redundant_num st; int index; anti_replay_t() + { + clear(); + } + int clear() { memset(replay_buffer,-1,sizeof(replay_buffer)); + st.clear(); st.rehash(anti_replay_buff_size*3); index=0; + return 0; } void set_invaild(u32_t seq) { @@ -140,6 +146,22 @@ public: fec_encode_manager_t(); ~fec_encode_manager_t(); + int clear() + { + counter=0; + blob_encode.clear(); + ready_for_output=0; + + itimerspec zero_its; + memset(&zero_its, 0, sizeof(zero_its)); + + timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); + + seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. + + return 0; + } + my_time_t get_first_packet_time() { return first_packet_time_for_output; @@ -155,7 +177,7 @@ public: return fec_mode; } u64_t get_timer_fd64(); - int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); + int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); int input(char *s,int len/*,int &is_first_packet*/); int output(int &n,char ** &s_arr,int *&len); }; @@ -200,7 +222,7 @@ public: fec_decode_manager_t() { fec_data=new fec_data_t[fec_buff_num+5]; - re_init(); + clear(); } fec_decode_manager_t(const fec_decode_manager_t &b) { @@ -210,7 +232,21 @@ public: { delete fec_data; } - int re_init(); + int clear() + { + anti_replay.clear(); + mp.clear(); + mp.rehash(fec_buff_num*3); + + for(int i=0;i<(int)fec_buff_num;i++) + fec_data[i].used=0; + ready_for_output=0; + index=0; + + return 0; + } + + //int re_init(); int input(char *s,int len); int output(int &n,char ** &s_arr,int* &len_arr); }; diff --git a/misc.cpp b/misc.cpp index f02c021..2f188b6 100644 --- a/misc.cpp +++ b/misc.cpp @@ -472,7 +472,7 @@ int unit_test() int * len; fec_decode_manager.output(n,s_arr,len); - fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); + fec_encode_manager.reset_fec_parameter(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); fec_encode_manager.input((char *) a.c_str(), a.length()); fec_encode_manager.output(n,s_arr,len); diff --git a/tun_dev.cpp b/tun_dev.cpp index f0a3aee..205d7d8 100644 --- a/tun_dev.cpp +++ b/tun_dev.cpp @@ -146,6 +146,8 @@ int tun_dev_client_event_loop() } + + ev.events = EPOLLIN; ev.data.u64 = delay_manager.get_timer_fd(); @@ -156,6 +158,32 @@ int tun_dev_client_event_loop() myexit(-1); } + + + + + conn_info_t *conn_info_p=new conn_info_t; + conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack + + u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); + ev.events = EPOLLIN; + ev.data.u64 = fd64; + + mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64()); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); + if (ret!= 0) { + mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); + myexit(-1); + } + + //my_timer_t timer; + conn_info.timer.add_fd_to_epoll(epoll_fd); + conn_info.timer.set_timer_repeat_us(timer_interval*1000); + + + + + int fifo_fd=-1; if(fifo_file[0]!=0) @@ -172,13 +200,18 @@ int tun_dev_client_event_loop() mylog(log_info,"fifo_file=%s\n",fifo_file); } - dest_t dest; - dest.type=type_fd64; - dest.inner.fd64=remote_fd64; + //dest.conv=conv; //dest.inner.ip_port=dest_ip_port; //dest.cook=1; + dest_t udp_dest; + udp_dest.type=type_fd64; + udp_dest.inner.fd64=remote_fd64; + + dest_t tun_dest; + tun_dest.type=type_fd; + tun_dest.inner.fd=tun_fd; int got_feed_back=0; @@ -203,7 +236,14 @@ int tun_dev_client_event_loop() int idx; for (idx = 0; idx < nfds; ++idx) { - if(events[idx].data.u64==(u64_t)tun_fd) + if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()) + { + uint64_t value; + read(conn_info.timer.get_timer_fd(), &value, 8); + mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n"); + conn_info.stat.report_as_client(); + } + else if(events[idx].data.u64==(u64_t)tun_fd) { len=read(tun_fd,data,max_data_len); assert(len>=0); @@ -217,7 +257,7 @@ int tun_dev_client_event_loop() do_cook(data,len); - delay_manager.add(0,dest,data,len); + delay_manager.add(0,udp_dest,data,len); } else if(events[idx].data.u64==(u64_t)remote_fd64) { @@ -265,6 +305,7 @@ int tun_dev_client_event_loop() mylog(log_trace,"Received packet from udp,len: %d\n",len); assert(len>=0); + //delay_manager.add(0,tun_dest,data,len); assert(write(tun_fd,data,len)>=0); } else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) @@ -354,6 +395,33 @@ int tun_dev_server_event_loop() myexit(-1); } + + + + conn_info_t *conn_info_p=new conn_info_t; + conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack + + u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); + ev.events = EPOLLIN; + ev.data.u64 = fd64; + + mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64()); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); + if (ret!= 0) { + mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); + myexit(-1); + } + + //my_timer_t timer; + conn_info.timer.add_fd_to_epoll(epoll_fd); + conn_info.timer.set_timer_repeat_us(timer_interval*1000); + + + + + + + int fifo_fd=-1; if(fifo_file[0]!=0) @@ -403,8 +471,19 @@ int tun_dev_server_event_loop() int idx; for (idx = 0; idx < nfds; ++idx) { + if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()) + { + uint64_t value; + read(conn_info.timer.get_timer_fd(), &value, 8); - if(events[idx].data.u64==(u64_t)local_listen_fd) + //mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n"); + if(dest.inner.fd64_ip_port.ip_port.to_u64()==0) + { + continue; + } + conn_info.stat.report_as_server(dest.inner.fd_ip_port.ip_port); + } + else if(events[idx].data.u64==(u64_t)local_listen_fd) { struct sockaddr_in udp_new_addr_in={0}; socklen_t udp_new_addr_len = sizeof(sockaddr_in); @@ -446,6 +525,10 @@ int tun_dev_server_event_loop() ntohs(udp_new_addr_in.sin_port)); dest.inner.fd_ip_port.ip_port.ip=udp_new_addr_in.sin_addr.s_addr; dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port); + conn_info.fec_decode_manager.clear(); + conn_info.fec_encode_manager.clear(); + memset(&conn_info.stat,0,sizeof(conn_info.stat)); + } else {