From ba81aa14d484101d1e4aa63e22b42eb7fafe1bda Mon Sep 17 00:00:00 2001 From: wangyu- Date: Fri, 27 Oct 2017 05:23:46 -0500 Subject: [PATCH] just commit --- connection.h | 2 +- fec_manager.cpp | 24 ++---- fec_manager.h | 42 ++++++++- misc.cpp | 2 +- packet.cpp | 12 ++- packet.h | 1 + tun_dev.cpp | 222 ++++++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 272 insertions(+), 33 deletions(-) diff --git a/connection.h b/connection.h index 2c326ae..04cf1c9 100644 --- a/connection.h +++ b/connection.h @@ -107,7 +107,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o fec_encode_manager_t fec_encode_manager; fec_decode_manager_t fec_decode_manager; my_timer_t timer; - ip_port_t ip_port; + //ip_port_t ip_port; u64_t last_active_time; stat_t stat; conn_info_t() diff --git a/fec_manager.cpp b/fec_manager.cpp index 6f19e5c..1b628c2 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -146,11 +146,11 @@ fec_encode_manager_t::fec_encode_manager_t() } timer_fd64=fd_manager.create(timer_fd); - re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_queue_len,fec_timeout,fec_mode); + reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode); + - seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. } -int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode) +int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode) { fec_data_num=data_num; fec_redundant_num=redundant_num; @@ -160,15 +160,8 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int que fec_mode=mode; assert(data_num+redundant_num st; int index; anti_replay_t() + { + clear(); + } + int clear() { memset(replay_buffer,-1,sizeof(replay_buffer)); + st.clear(); st.rehash(anti_replay_buff_size*3); index=0; + return 0; } void set_invaild(u32_t seq) { @@ -140,6 +146,22 @@ public: fec_encode_manager_t(); ~fec_encode_manager_t(); + int clear() + { + counter=0; + blob_encode.clear(); + ready_for_output=0; + + itimerspec zero_its; + memset(&zero_its, 0, sizeof(zero_its)); + + timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0); + + seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. + + return 0; + } + my_time_t get_first_packet_time() { return first_packet_time_for_output; @@ -155,7 +177,7 @@ public: return fec_mode; } u64_t get_timer_fd64(); - int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); + int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); int input(char *s,int len/*,int &is_first_packet*/); int output(int &n,char ** &s_arr,int *&len); }; @@ -200,7 +222,7 @@ public: fec_decode_manager_t() { fec_data=new fec_data_t[fec_buff_num+5]; - re_init(); + clear(); } fec_decode_manager_t(const fec_decode_manager_t &b) { @@ -210,7 +232,21 @@ public: { delete fec_data; } - int re_init(); + int clear() + { + anti_replay.clear(); + mp.clear(); + mp.rehash(fec_buff_num*3); + + for(int i=0;i<(int)fec_buff_num;i++) + fec_data[i].used=0; + ready_for_output=0; + index=0; + + return 0; + } + + //int re_init(); int input(char *s,int len); int output(int &n,char ** &s_arr,int* &len_arr); }; diff --git a/misc.cpp b/misc.cpp index f02c021..2f188b6 100644 --- a/misc.cpp +++ b/misc.cpp @@ -472,7 +472,7 @@ int unit_test() int * len; fec_decode_manager.output(n,s_arr,len); - fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); + fec_encode_manager.reset_fec_parameter(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1); fec_encode_manager.input((char *) a.c_str(), a.length()); fec_encode_manager.output(n,s_arr,len); diff --git a/packet.cpp b/packet.cpp index d6c0599..6b133d1 100644 --- a/packet.cpp +++ b/packet.cpp @@ -177,9 +177,7 @@ int my_send(const dest_t &dest,char *data,int len) { if(dest.cook) { - put_crc32(data,len); - if(!disable_obscure)do_obscure(data,len); - if(!disable_xor)encrypt_0(data,len,key_string); + do_cook(data,len); } switch(dest.type) { @@ -322,6 +320,14 @@ int put_crc32(char * s,int &len) return 0; } +int do_cook(char * data,int &len) +{ + put_crc32(data,len); + if(!disable_obscure)do_obscure(data,len); + if(!disable_xor)encrypt_0(data,len,key_string); + return 0; +} + int de_cook(char * s,int &len) { if(!disable_xor)decrypt_0(s,len,key_string); diff --git a/packet.h b/packet.h index 99fb288..3693660 100644 --- a/packet.h +++ b/packet.h @@ -42,5 +42,6 @@ int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out ); int put_crc32(char * s,int &len); int rm_crc32(char * s,int &len); +int do_cook(char * data,int &len); int de_cook(char * s,int &len); #endif /* PACKET_H_ */ diff --git a/tun_dev.cpp b/tun_dev.cpp index d2bbb30..205d7d8 100644 --- a/tun_dev.cpp +++ b/tun_dev.cpp @@ -145,13 +145,73 @@ int tun_dev_client_event_loop() myexit(-1); } - dest_t dest; - dest.type=type_fd64; - dest.inner.fd64=remote_fd64; + + + + ev.events = EPOLLIN; + ev.data.u64 = delay_manager.get_timer_fd(); + + mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd()); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); + if (ret!= 0) { + mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + myexit(-1); + } + + + + + + conn_info_t *conn_info_p=new conn_info_t; + conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack + + u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); + ev.events = EPOLLIN; + ev.data.u64 = fd64; + + mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64()); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); + if (ret!= 0) { + mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); + myexit(-1); + } + + //my_timer_t timer; + conn_info.timer.add_fd_to_epoll(epoll_fd); + conn_info.timer.set_timer_repeat_us(timer_interval*1000); + + + + + + int fifo_fd=-1; + + if(fifo_file[0]!=0) + { + fifo_fd=create_fifo(fifo_file); + ev.events = EPOLLIN; + ev.data.u64 = fifo_fd; + + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev); + if (ret!= 0) { + mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); + myexit(-1); + } + mylog(log_info,"fifo_file=%s\n",fifo_file); + } + + //dest.conv=conv; //dest.inner.ip_port=dest_ip_port; //dest.cook=1; + dest_t udp_dest; + udp_dest.type=type_fd64; + udp_dest.inner.fd64=remote_fd64; + + dest_t tun_dest; + tun_dest.type=type_fd; + tun_dest.inner.fd=tun_fd; int got_feed_back=0; @@ -176,7 +236,14 @@ int tun_dev_client_event_loop() int idx; for (idx = 0; idx < nfds; ++idx) { - if(events[idx].data.u64==(u64_t)tun_fd) + if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()) + { + uint64_t value; + read(conn_info.timer.get_timer_fd(), &value, 8); + mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n"); + conn_info.stat.report_as_client(); + } + else if(events[idx].data.u64==(u64_t)tun_fd) { len=read(tun_fd,data,max_data_len); assert(len>=0); @@ -188,7 +255,9 @@ int tun_dev_client_event_loop() else put_header(header_normal,data,len); - delay_manager.add(0,dest,data,len);; + do_cook(data,len); + + delay_manager.add(0,udp_dest,data,len); } else if(events[idx].data.u64==(u64_t)remote_fd64) { @@ -203,6 +272,14 @@ int tun_dev_client_event_loop() continue; } + if(de_cook(data,len)<0) + { + mylog(log_warn,"de_cook(data,len)failed \n"); + continue; + + } + + char header=0; if(get_header(header,data,len)!=0) { @@ -228,8 +305,33 @@ int tun_dev_client_event_loop() mylog(log_trace,"Received packet from udp,len: %d\n",len); assert(len>=0); + //delay_manager.add(0,tun_dest,data,len); assert(write(tun_fd,data,len)>=0); } + else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) + { + uint64_t value; + read(delay_manager.get_timer_fd(), &value, 8); + mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); + //printf("",delay_mp.size()); + //fflush(stdout); + } + else if (events[idx].data.u64 == (u64_t)fifo_fd) + { + char buf[buf_len]; + int len=read (fifo_fd, buf, sizeof (buf)); + if(len<0) + { + mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); + continue; + } + buf[len]=0; + handle_command(buf); + } + else + { + assert(0==1); + } } delay_manager.check(); } @@ -283,6 +385,59 @@ int tun_dev_server_event_loop() myexit(-1); } + ev.events = EPOLLIN; + ev.data.u64 = delay_manager.get_timer_fd(); + + mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd()); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); + if (ret!= 0) { + mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + myexit(-1); + } + + + + + conn_info_t *conn_info_p=new conn_info_t; + conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack + + u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); + ev.events = EPOLLIN; + ev.data.u64 = fd64; + + mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64()); + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); + if (ret!= 0) { + mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); + myexit(-1); + } + + //my_timer_t timer; + conn_info.timer.add_fd_to_epoll(epoll_fd); + conn_info.timer.set_timer_repeat_us(timer_interval*1000); + + + + + + + + int fifo_fd=-1; + + if(fifo_file[0]!=0) + { + fifo_fd=create_fifo(fifo_file); + ev.events = EPOLLIN; + ev.data.u64 = fifo_fd; + + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev); + if (ret!= 0) { + mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); + myexit(-1); + } + mylog(log_info,"fifo_file=%s\n",fifo_file); + } + //ip_port_t dest_ip_port; dest_t dest; @@ -316,17 +471,37 @@ int tun_dev_server_event_loop() int idx; for (idx = 0; idx < nfds; ++idx) { + if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()) + { + uint64_t value; + read(conn_info.timer.get_timer_fd(), &value, 8); - if(events[idx].data.u64==(u64_t)local_listen_fd) + //mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n"); + if(dest.inner.fd64_ip_port.ip_port.to_u64()==0) + { + continue; + } + conn_info.stat.report_as_server(dest.inner.fd_ip_port.ip_port); + } + else if(events[idx].data.u64==(u64_t)local_listen_fd) { struct sockaddr_in udp_new_addr_in={0}; socklen_t udp_new_addr_len = sizeof(sockaddr_in); if ((len = recvfrom(local_listen_fd, data, max_data_len, 0, - (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { + (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) < 0) { mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno)); continue; //myexit(1); }; + + + if(de_cook(data,len)<0) + { + mylog(log_warn,"de_cook(data,len)failed \n"); + continue; + + } + char header=0; if(get_header(header,data,len)!=0) { @@ -350,6 +525,10 @@ int tun_dev_server_event_loop() ntohs(udp_new_addr_in.sin_port)); dest.inner.fd_ip_port.ip_port.ip=udp_new_addr_in.sin_addr.s_addr; dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port); + conn_info.fec_decode_manager.clear(); + conn_info.fec_encode_manager.clear(); + memset(&conn_info.stat,0,sizeof(conn_info.stat)); + } else { @@ -359,6 +538,8 @@ int tun_dev_server_event_loop() len=1; data[0]=header_reject; + do_cook(data,len); + dest_t tmp_dest; tmp_dest.type=type_fd_ip_port; @@ -398,9 +579,34 @@ int tun_dev_server_event_loop() put_header(header_normal,data,len); + do_cook(data,len); + delay_manager.add(0,dest,data,len);; - + } + else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) + { + uint64_t value; + read(delay_manager.get_timer_fd(), &value, 8); + mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); + //printf("",delay_mp.size()); + //fflush(stdout); + } + else if (events[idx].data.u64 == (u64_t)fifo_fd) + { + char buf[buf_len]; + int len=read (fifo_fd, buf, sizeof (buf)); + if(len<0) + { + mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno)); + continue; + } + buf[len]=0; + handle_command(buf); + } + else + { + assert(0==1); } } delay_manager.check();