diff --git a/common.h b/common.h index 5661d39..459fdde 100644 --- a/common.h +++ b/common.h @@ -96,7 +96,7 @@ const i32_t max_fail_time=0;//disable const u32_t heartbeat_interval=1000; -const u32_t timer_interval=400;//this should be smaller than heartbeat_interval and retry interval; +const u32_t timer_interval=50;//this should be smaller than heartbeat_interval and retry interval; //const uint32_t conv_timeout=120000; //120 second //const u32_t conv_timeout=120000; //for test @@ -152,6 +152,7 @@ struct dest_t dest_type type; inner_t inner; u32_t conv; + int cook=0; }; struct fd_info_t diff --git a/connection.cpp b/connection.cpp index 3c6b11c..63f9710 100644 --- a/connection.cpp +++ b/connection.cpp @@ -18,9 +18,9 @@ conn_manager_t conn_manager; void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection), //so we have to close the fd when conv expires { - int fd64=u64; + fd64_t fd64=u64; assert(fd_manager.exist(fd64)); - fd_manager.close(fd64); + fd_manager.fd64_close(fd64); } conv_manager_t::conv_manager_t() @@ -103,12 +103,14 @@ conv_manager_t::~conv_manager_t() } int conv_manager_t::erase_conv(u32_t conv) { - if(disable_conv_clear) return 0; + //if(disable_conv_clear) return 0; + assert(conv_last_active_time.find(conv)!=conv_last_active_time.end()); u64_t u64=conv_to_u64[conv]; if(program_mode==server_mode) { server_clear_function(u64); } + assert(conv_to_u64.find(conv)!=conv_to_u64.end()); conv_to_u64.erase(conv); u64_to_conv.erase(u64); conv_last_active_time.erase(conv); @@ -152,7 +154,7 @@ conv_manager_t::~conv_manager_t() old_it=it; it++; u32_t conv= old_it->first; - erase_conv(old_it->first); + erase_conv(conv); if(ip_port==0) { mylog(log_info,"conv %x cleared\n",conv); diff --git a/connection.h b/connection.h index f8a9076..289b807 100644 --- a/connection.h +++ b/connection.h @@ -48,6 +48,10 @@ struct conv_manager_t // manage the udp connections long long last_clear_time; conv_manager_t(); + conv_manager_t(const conv_manager_t &b) + { + assert(0==1); + } ~conv_manager_t(); int get_size(); void reserve(); @@ -70,30 +74,41 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o conv_manager_t conv_manager; fec_encode_manager_t fec_encode_manager; fec_decode_manager_t fec_decode_manager; - fd64_t timer_fd; + my_timer_t timer; ip_port_t ip_port; + conn_info_t() + { + } + conn_info_t(const conn_info_t &b) + { + assert(0==1); + } };//g_conn_info; struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections { - unordered_map mp;// to conn_info_t; - unordered_map::iterator clear_it; - long long last_clear_time; + unordered_map mp;// to conn_info_t; + unordered_map::iterator clear_it; + long long last_clear_time; - conn_manager_t(); - int exist(ip_port_t); - conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash - conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash - int insert(ip_port_t); - /* - int exist_fd64(fd64_t fd64); - void insert_fd64(fd64_t fd64,ip_port_t); - ip_port_t find_by_fd64(fd64_t fd64);*/ + conn_manager_t(); + conn_manager_t(const conn_info_t &b) + { + assert(0==1); + } + int exist(ip_port_t); + conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash + conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash + int insert(ip_port_t); + /* + int exist_fd64(fd64_t fd64); + void insert_fd64(fd64_t fd64,ip_port_t); + ip_port_t find_by_fd64(fd64_t fd64);*/ -int erase(unordered_map::iterator erase_it); -int clear_inactive(); -int clear_inactive0(); + int erase(unordered_map::iterator erase_it); + int clear_inactive(); + int clear_inactive0(); }; diff --git a/delay_manager.cpp b/delay_manager.cpp index 1be0500..711a446 100644 --- a/delay_manager.cpp +++ b/delay_manager.cpp @@ -62,7 +62,7 @@ int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len) } delay_data_t tmp=delay_data; - tmp.data=(char *)malloc(delay_data.len); + tmp.data=(char *)malloc(delay_data.len+100); memcpy(tmp.data,delay_data.data,delay_data.len); diff --git a/delay_manager.h b/delay_manager.h index d137395..628f061 100644 --- a/delay_manager.h +++ b/delay_manager.h @@ -10,6 +10,7 @@ #include "common.h" #include "packet.h" +#include "log.h" //enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote}; @@ -27,6 +28,80 @@ union dest_t u64_t u64; }; */ + +struct my_timer_t +{ + int timer_fd; + fd64_t timer_fd64; + my_timer_t() + { + if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0) + { + mylog(log_fatal,"timer_fd create error"); + myexit(1); + } + timer_fd64=fd_manager.create(timer_fd); + } + my_timer_t(const my_timer_t &b) + { + assert(0==1); + } + ~my_timer_t() + { + fd_manager.fd64_close(timer_fd64); + } + int add_fd_to_epoll(int epoll_fd) + { + epoll_event ev;; + ev.events = EPOLLIN; + ev.data.u64 = timer_fd; + int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev); + if (ret!= 0) { + mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + myexit(-1); + } + return 0; + } + int add_fd64_to_epoll(int epoll_fd) + { + epoll_event ev;; + ev.events = EPOLLIN; + ev.data.u64 = timer_fd64; + int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev); + if (ret!= 0) { + mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); + myexit(-1); + } + return 0; + } + int get_timer_fd() + { + return timer_fd; + } + fd64_t get_timer_fd64() + { + return timer_fd64; + } + int set_timer_repeat_us(my_time_t my_time) + { + itimerspec its; + memset(&its,0,sizeof(its)); + its.it_interval.tv_sec=my_time/1000000llu; + its.it_interval.tv_nsec=my_time%1000000llu*1000llu; + its.it_value.tv_nsec=1; //imidiately + timerfd_settime(timer_fd,0,&its,0); + return 0; + } + int set_timer_abs_us(my_time_t my_time) + { + itimerspec its; + memset(&its,0,sizeof(its)); + its.it_value.tv_sec=my_time/1000000llu; + its.it_value.tv_nsec=my_time%1000000llu*1000llu; + timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); + return 0; + } +}; struct delay_data_t { dest_t dest; diff --git a/fd_manager.cpp b/fd_manager.cpp index cf48d44..3c90d12 100644 --- a/fd_manager.cpp +++ b/fd_manager.cpp @@ -35,7 +35,7 @@ void fd_manager_t::remove_fd(int fd) fd64_to_fd_mp.erase(fd64); //return 0; }*/ -void fd_manager_t::close(fd64_t fd64) +void fd_manager_t::fd64_close(fd64_t fd64) { assert(exist(fd64)); int fd=fd64_to_fd_mp[fd64]; diff --git a/fd_manager.h b/fd_manager.h index 33df80e..c48db26 100644 --- a/fd_manager.h +++ b/fd_manager.h @@ -20,7 +20,7 @@ struct fd_manager_t //conver fd to a uniq 64bit number,avoid fd value conflict int exist_info(fd64_t); int exist(fd64_t fd64); int to_fd(fd64_t); - void close(fd64_t fd64); + void fd64_close(fd64_t fd64); void reserve(int n); u64_t create(int fd); fd_manager_t(); diff --git a/fec_manager.cpp b/fec_manager.cpp index 30d0b1b..d571688 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -124,7 +124,7 @@ fec_encode_manager_t::fec_encode_manager_t() } fec_encode_manager_t::~fec_encode_manager_t() { - fd_manager.close(timer_fd64); + fd_manager.fd64_close(timer_fd64); } u64_t fec_encode_manager_t::get_timer_fd64() { diff --git a/main.cpp b/main.cpp index 14403a7..f4676fb 100644 --- a/main.cpp +++ b/main.cpp @@ -27,8 +27,8 @@ int jitter_max=0; int mtu_warn=1350; -int fec_data_num=30; -int fec_redundant_num=20; +int fec_data_num=20; +int fec_redundant_num=8; int fec_mtu=1200; int fec_pending_num=200; int fec_pending_time=50000; @@ -56,7 +56,6 @@ int init_listen_socket() local_listen_fd =socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - int yes = 1; //setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); @@ -114,7 +113,7 @@ int delay_send(my_time_t delay,const dest_t &dest,char *data,int len) //mylog(log_info,"rand = %d\n",rand); if(rand>=80) { - return 0; + //return 0; //mylog(log_info,"dropped!\n"); } return delay_manager.add(delay,dest,data,len);; @@ -157,7 +156,7 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch for(int i=0;i%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } + if(rm_crc32(data,data_len)!=0) + { + mylog(log_debug,"crc32 check error"); + } int out_n;char **out_arr;int *out_len;int *out_delay; from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay); @@ -498,6 +513,11 @@ int server_event_loop() } mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port); + + my_timer_t timer; + timer.add_fd_to_epoll(epoll_fd); + timer.set_timer_repeat_us(timer_interval*1000); + while(1)//////////////////////// { @@ -527,7 +547,15 @@ int server_event_loop() read(timer_fd, &dummy, 8); //current_time_rough=get_current_time(); } - else */if (events[idx].data.u64 == (u64_t)local_listen_fd) + else */ + if(events[idx].data.u64==(u64_t)timer.get_timer_fd()) + { + uint64_t value; + read(timer.get_timer_fd(), &value, 8); + conn_manager.clear_inactive(); + //conn_info.conv_manager.clear_inactive(); + } + else if (events[idx].data.u64 == (u64_t)local_listen_fd) { //int recv_len; char data[buf_len]; @@ -544,6 +572,11 @@ int server_event_loop() { mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } + + if(rm_crc32(data,data_len)!=0) + { + mylog(log_debug,"crc32 check error"); + } mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), ntohs(udp_new_addr_in.sin_port),data_len); @@ -557,12 +590,21 @@ int server_event_loop() conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time); conn_info.conv_manager.reserve(); - u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); - ev.events = EPOLLIN; - ev.data.u64 = fd64; - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); - fd_manager.get_info(fd64).ip_port=ip_port; + u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64(); + ev.events = EPOLLIN; + ev.data.u64 = fec_fd64; + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev); + + fd_manager.get_info(fec_fd64).ip_port=ip_port; + + + + conn_info.timer.add_fd64_to_epoll(epoll_fd); + conn_info.timer.set_timer_repeat_us(timer_interval*1000); + u64_t timer_fd64=conn_info.timer.get_timer_fd64(); + fd_manager.get_info(timer_fd64).ip_port=ip_port; + } conn_info_t &conn_info=conn_manager.find(ip_port); @@ -683,6 +725,7 @@ int server_event_loop() dest.type=type_ip_port; //dest.conv=conv; dest.inner.ip_port=ip_port; + dest.cook=1; if(fd64==conn_info.fec_encode_manager.get_timer_fd64()) { @@ -699,9 +742,16 @@ int server_event_loop() assert(value==1); from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); } + else if(fd64==conn_info.timer.get_timer_fd64()) + { + uint64_t value; + read(conn_info.timer.get_timer_fd(), &value, 8); + conn_info.conv_manager.clear_inactive(); + } else { + assert(conn_info.conv_manager.is_u64_used(fd64)); conv=conn_info.conv_manager.find_conv_by_u64(fd64); @@ -724,8 +774,6 @@ int server_event_loop() mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } - - char * new_data; int new_len; put_conv(conv,data,data_len,new_data,new_len); diff --git a/packet.cpp b/packet.cpp index 38755ad..2c361b7 100644 --- a/packet.cpp +++ b/packet.cpp @@ -147,6 +147,7 @@ int send_fd (int fd,char * buf, int len,int flags) int my_send(const dest_t &dest,char *data,int len) { + if(dest.cook)put_crc32(data,len); switch(dest.type) { case type_ip_port: @@ -164,6 +165,7 @@ int my_send(const dest_t &dest,char *data,int len) } case type_fd64: { + if(!fd_manager.exist(dest.inner.fd64)) return -1; int fd=fd_manager.to_fd(dest.inner.fd64); return send_fd(fd,data,len,0); @@ -251,7 +253,23 @@ int get_conv0(u32_t &conv,const char *input,int len_in,char *&output,int &len_ou } return 0; } - +int put_crc32(char * s,int &len) +{ + if(len<0) return -1; + u32_t crc32=crc32h((unsigned char *)s,len); + write_u32(s+len,crc32); + len+=sizeof(u32_t); + return 0; +} +int rm_crc32(char * s,int &len) +{ + len-=sizeof(u32_t); + if(len<0) return -1; + u32_t crc32_in=read_u32(s+len); + u32_t crc32=crc32h((unsigned char *)s,len); + if(crc32!=crc32_in) return -1; + return 0; +} int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out) { static char buf[buf_len]; diff --git a/packet.h b/packet.h index 1f7d0a5..b628e73 100644 --- a/packet.h +++ b/packet.h @@ -39,5 +39,6 @@ int send_fd (int fd,char * buf, int len,int flags); int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out); int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out ); - +int put_crc32(char * s,int &len); +int rm_crc32(char * s,int &len); #endif /* PACKET_H_ */