fixed conv clear bug caused by rehash

This commit is contained in:
wangyu- 2017-10-08 01:50:48 -05:00
parent a3d3cf9577
commit fb3edca8e4
11 changed files with 203 additions and 43 deletions

View File

@ -96,7 +96,7 @@ const i32_t max_fail_time=0;//disable
const u32_t heartbeat_interval=1000; const u32_t heartbeat_interval=1000;
const u32_t timer_interval=400;//this should be smaller than heartbeat_interval and retry interval; const u32_t timer_interval=50;//this should be smaller than heartbeat_interval and retry interval;
//const uint32_t conv_timeout=120000; //120 second //const uint32_t conv_timeout=120000; //120 second
//const u32_t conv_timeout=120000; //for test //const u32_t conv_timeout=120000; //for test
@ -152,6 +152,7 @@ struct dest_t
dest_type type; dest_type type;
inner_t inner; inner_t inner;
u32_t conv; u32_t conv;
int cook=0;
}; };
struct fd_info_t struct fd_info_t

View File

@ -18,9 +18,9 @@ conn_manager_t conn_manager;
void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection), void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
//so we have to close the fd when conv expires //so we have to close the fd when conv expires
{ {
int fd64=u64; fd64_t fd64=u64;
assert(fd_manager.exist(fd64)); assert(fd_manager.exist(fd64));
fd_manager.close(fd64); fd_manager.fd64_close(fd64);
} }
conv_manager_t::conv_manager_t() conv_manager_t::conv_manager_t()
@ -103,12 +103,14 @@ conv_manager_t::~conv_manager_t()
} }
int conv_manager_t::erase_conv(u32_t conv) int conv_manager_t::erase_conv(u32_t conv)
{ {
if(disable_conv_clear) return 0; //if(disable_conv_clear) return 0;
assert(conv_last_active_time.find(conv)!=conv_last_active_time.end());
u64_t u64=conv_to_u64[conv]; u64_t u64=conv_to_u64[conv];
if(program_mode==server_mode) if(program_mode==server_mode)
{ {
server_clear_function(u64); server_clear_function(u64);
} }
assert(conv_to_u64.find(conv)!=conv_to_u64.end());
conv_to_u64.erase(conv); conv_to_u64.erase(conv);
u64_to_conv.erase(u64); u64_to_conv.erase(u64);
conv_last_active_time.erase(conv); conv_last_active_time.erase(conv);
@ -152,7 +154,7 @@ conv_manager_t::~conv_manager_t()
old_it=it; old_it=it;
it++; it++;
u32_t conv= old_it->first; u32_t conv= old_it->first;
erase_conv(old_it->first); erase_conv(conv);
if(ip_port==0) if(ip_port==0)
{ {
mylog(log_info,"conv %x cleared\n",conv); mylog(log_info,"conv %x cleared\n",conv);

View File

@ -48,6 +48,10 @@ struct conv_manager_t // manage the udp connections
long long last_clear_time; long long last_clear_time;
conv_manager_t(); conv_manager_t();
conv_manager_t(const conv_manager_t &b)
{
assert(0==1);
}
~conv_manager_t(); ~conv_manager_t();
int get_size(); int get_size();
void reserve(); void reserve();
@ -70,8 +74,15 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o
conv_manager_t conv_manager; conv_manager_t conv_manager;
fec_encode_manager_t fec_encode_manager; fec_encode_manager_t fec_encode_manager;
fec_decode_manager_t fec_decode_manager; fec_decode_manager_t fec_decode_manager;
fd64_t timer_fd; my_timer_t timer;
ip_port_t ip_port; ip_port_t ip_port;
conn_info_t()
{
}
conn_info_t(const conn_info_t &b)
{
assert(0==1);
}
};//g_conn_info; };//g_conn_info;
struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections
@ -82,6 +93,10 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m
long long last_clear_time; long long last_clear_time;
conn_manager_t(); conn_manager_t();
conn_manager_t(const conn_info_t &b)
{
assert(0==1);
}
int exist(ip_port_t); int exist(ip_port_t);
conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash
conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash

View File

@ -62,7 +62,7 @@ int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len)
} }
delay_data_t tmp=delay_data; delay_data_t tmp=delay_data;
tmp.data=(char *)malloc(delay_data.len); tmp.data=(char *)malloc(delay_data.len+100);
memcpy(tmp.data,delay_data.data,delay_data.len); memcpy(tmp.data,delay_data.data,delay_data.len);

View File

@ -10,6 +10,7 @@
#include "common.h" #include "common.h"
#include "packet.h" #include "packet.h"
#include "log.h"
//enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote}; //enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote};
@ -27,6 +28,80 @@ union dest_t
u64_t u64; u64_t u64;
}; };
*/ */
struct my_timer_t
{
int timer_fd;
fd64_t timer_fd64;
my_timer_t()
{
if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
{
mylog(log_fatal,"timer_fd create error");
myexit(1);
}
timer_fd64=fd_manager.create(timer_fd);
}
my_timer_t(const my_timer_t &b)
{
assert(0==1);
}
~my_timer_t()
{
fd_manager.fd64_close(timer_fd64);
}
int add_fd_to_epoll(int epoll_fd)
{
epoll_event ev;;
ev.events = EPOLLIN;
ev.data.u64 = timer_fd;
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev);
if (ret!= 0) {
mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
myexit(-1);
}
return 0;
}
int add_fd64_to_epoll(int epoll_fd)
{
epoll_event ev;;
ev.events = EPOLLIN;
ev.data.u64 = timer_fd64;
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev);
if (ret!= 0) {
mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
myexit(-1);
}
return 0;
}
int get_timer_fd()
{
return timer_fd;
}
fd64_t get_timer_fd64()
{
return timer_fd64;
}
int set_timer_repeat_us(my_time_t my_time)
{
itimerspec its;
memset(&its,0,sizeof(its));
its.it_interval.tv_sec=my_time/1000000llu;
its.it_interval.tv_nsec=my_time%1000000llu*1000llu;
its.it_value.tv_nsec=1; //imidiately
timerfd_settime(timer_fd,0,&its,0);
return 0;
}
int set_timer_abs_us(my_time_t my_time)
{
itimerspec its;
memset(&its,0,sizeof(its));
its.it_value.tv_sec=my_time/1000000llu;
its.it_value.tv_nsec=my_time%1000000llu*1000llu;
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
return 0;
}
};
struct delay_data_t struct delay_data_t
{ {
dest_t dest; dest_t dest;

View File

@ -35,7 +35,7 @@ void fd_manager_t::remove_fd(int fd)
fd64_to_fd_mp.erase(fd64); fd64_to_fd_mp.erase(fd64);
//return 0; //return 0;
}*/ }*/
void fd_manager_t::close(fd64_t fd64) void fd_manager_t::fd64_close(fd64_t fd64)
{ {
assert(exist(fd64)); assert(exist(fd64));
int fd=fd64_to_fd_mp[fd64]; int fd=fd64_to_fd_mp[fd64];

View File

@ -20,7 +20,7 @@ struct fd_manager_t //conver fd to a uniq 64bit number,avoid fd value conflict
int exist_info(fd64_t); int exist_info(fd64_t);
int exist(fd64_t fd64); int exist(fd64_t fd64);
int to_fd(fd64_t); int to_fd(fd64_t);
void close(fd64_t fd64); void fd64_close(fd64_t fd64);
void reserve(int n); void reserve(int n);
u64_t create(int fd); u64_t create(int fd);
fd_manager_t(); fd_manager_t();

View File

@ -124,7 +124,7 @@ fec_encode_manager_t::fec_encode_manager_t()
} }
fec_encode_manager_t::~fec_encode_manager_t() fec_encode_manager_t::~fec_encode_manager_t()
{ {
fd_manager.close(timer_fd64); fd_manager.fd64_close(timer_fd64);
} }
u64_t fec_encode_manager_t::get_timer_fd64() u64_t fec_encode_manager_t::get_timer_fd64()
{ {

View File

@ -27,8 +27,8 @@ int jitter_max=0;
int mtu_warn=1350; int mtu_warn=1350;
int fec_data_num=30; int fec_data_num=20;
int fec_redundant_num=20; int fec_redundant_num=8;
int fec_mtu=1200; int fec_mtu=1200;
int fec_pending_num=200; int fec_pending_num=200;
int fec_pending_time=50000; int fec_pending_time=50000;
@ -56,7 +56,6 @@ int init_listen_socket()
local_listen_fd =socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); local_listen_fd =socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
int yes = 1; int yes = 1;
//setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); //setsockopt(udp_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
@ -114,7 +113,7 @@ int delay_send(my_time_t delay,const dest_t &dest,char *data,int len)
//mylog(log_info,"rand = %d\n",rand); //mylog(log_info,"rand = %d\n",rand);
if(rand>=80) if(rand>=80)
{ {
return 0; //return 0;
//mylog(log_info,"dropped!\n"); //mylog(log_info,"dropped!\n");
} }
return delay_manager.add(delay,dest,data,len);; return delay_manager.add(delay,dest,data,len);;
@ -222,10 +221,11 @@ int client_event_loop()
conn_info_t *conn_info_p=new conn_info_t; conn_info_t *conn_info_p=new conn_info_t;
conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack
init_listen_socket(); conn_info.conv_manager.reserve();
conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time); conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time);
init_listen_socket();
epoll_fd = epoll_create1(0); epoll_fd = epoll_create1(0);
const int max_events = 4096; const int max_events = 4096;
@ -272,6 +272,10 @@ int client_event_loop()
myexit(-1); myexit(-1);
} }
//my_timer_t timer;
conn_info.timer.add_fd_to_epoll(epoll_fd);
conn_info.timer.set_timer_repeat_us(timer_interval*1000);
while(1)//////////////////////// while(1)////////////////////////
{ {
if(about_to_exit) myexit(0); if(about_to_exit) myexit(0);
@ -291,7 +295,13 @@ int client_event_loop()
} }
int idx; int idx;
for (idx = 0; idx < nfds; ++idx) { for (idx = 0; idx < nfds; ++idx) {
if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd())
{
uint64_t value;
read(conn_info.timer.get_timer_fd(), &value, 8);
conn_info.conv_manager.clear_inactive();
}
else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
{ {
char data[buf_len]; char data[buf_len];
int data_len; int data_len;
@ -301,6 +311,7 @@ int client_event_loop()
dest_t dest; dest_t dest;
dest.type=type_fd64; dest.type=type_fd64;
dest.inner.fd64=remote_fd64; dest.inner.fd64=remote_fd64;
dest.cook=1;
if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64())
{ {
@ -415,6 +426,10 @@ int client_event_loop()
mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
} }
if(rm_crc32(data,data_len)!=0)
{
mylog(log_debug,"crc32 check error");
}
int out_n;char **out_arr;int *out_len;int *out_delay; int out_n;char **out_arr;int *out_len;int *out_delay;
from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay); from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay);
@ -498,6 +513,11 @@ int server_event_loop()
} }
mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port); mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port);
my_timer_t timer;
timer.add_fd_to_epoll(epoll_fd);
timer.set_timer_repeat_us(timer_interval*1000);
while(1)//////////////////////// while(1)////////////////////////
{ {
@ -527,7 +547,15 @@ int server_event_loop()
read(timer_fd, &dummy, 8); read(timer_fd, &dummy, 8);
//current_time_rough=get_current_time(); //current_time_rough=get_current_time();
} }
else */if (events[idx].data.u64 == (u64_t)local_listen_fd) else */
if(events[idx].data.u64==(u64_t)timer.get_timer_fd())
{
uint64_t value;
read(timer.get_timer_fd(), &value, 8);
conn_manager.clear_inactive();
//conn_info.conv_manager.clear_inactive();
}
else if (events[idx].data.u64 == (u64_t)local_listen_fd)
{ {
//int recv_len; //int recv_len;
char data[buf_len]; char data[buf_len];
@ -544,6 +572,11 @@ int server_event_loop()
{ {
mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
} }
if(rm_crc32(data,data_len)!=0)
{
mylog(log_debug,"crc32 check error");
}
mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
ntohs(udp_new_addr_in.sin_port),data_len); ntohs(udp_new_addr_in.sin_port),data_len);
@ -557,12 +590,21 @@ int server_event_loop()
conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time); conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time);
conn_info.conv_manager.reserve(); conn_info.conv_manager.reserve();
u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
ev.events = EPOLLIN;
ev.data.u64 = fd64;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
fd_manager.get_info(fd64).ip_port=ip_port; u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64();
ev.events = EPOLLIN;
ev.data.u64 = fec_fd64;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev);
fd_manager.get_info(fec_fd64).ip_port=ip_port;
conn_info.timer.add_fd64_to_epoll(epoll_fd);
conn_info.timer.set_timer_repeat_us(timer_interval*1000);
u64_t timer_fd64=conn_info.timer.get_timer_fd64();
fd_manager.get_info(timer_fd64).ip_port=ip_port;
} }
conn_info_t &conn_info=conn_manager.find(ip_port); conn_info_t &conn_info=conn_manager.find(ip_port);
@ -683,6 +725,7 @@ int server_event_loop()
dest.type=type_ip_port; dest.type=type_ip_port;
//dest.conv=conv; //dest.conv=conv;
dest.inner.ip_port=ip_port; dest.inner.ip_port=ip_port;
dest.cook=1;
if(fd64==conn_info.fec_encode_manager.get_timer_fd64()) if(fd64==conn_info.fec_encode_manager.get_timer_fd64())
{ {
@ -699,9 +742,16 @@ int server_event_loop()
assert(value==1); assert(value==1);
from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay);
} }
else if(fd64==conn_info.timer.get_timer_fd64())
{
uint64_t value;
read(conn_info.timer.get_timer_fd(), &value, 8);
conn_info.conv_manager.clear_inactive();
}
else else
{ {
assert(conn_info.conv_manager.is_u64_used(fd64)); assert(conn_info.conv_manager.is_u64_used(fd64));
conv=conn_info.conv_manager.find_conv_by_u64(fd64); conv=conn_info.conv_manager.find_conv_by_u64(fd64);
@ -724,8 +774,6 @@ int server_event_loop()
mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn);
} }
char * new_data; char * new_data;
int new_len; int new_len;
put_conv(conv,data,data_len,new_data,new_len); put_conv(conv,data,data_len,new_data,new_len);

View File

@ -147,6 +147,7 @@ int send_fd (int fd,char * buf, int len,int flags)
int my_send(const dest_t &dest,char *data,int len) int my_send(const dest_t &dest,char *data,int len)
{ {
if(dest.cook)put_crc32(data,len);
switch(dest.type) switch(dest.type)
{ {
case type_ip_port: case type_ip_port:
@ -164,6 +165,7 @@ int my_send(const dest_t &dest,char *data,int len)
} }
case type_fd64: case type_fd64:
{ {
if(!fd_manager.exist(dest.inner.fd64)) return -1; if(!fd_manager.exist(dest.inner.fd64)) return -1;
int fd=fd_manager.to_fd(dest.inner.fd64); int fd=fd_manager.to_fd(dest.inner.fd64);
return send_fd(fd,data,len,0); return send_fd(fd,data,len,0);
@ -251,7 +253,23 @@ int get_conv0(u32_t &conv,const char *input,int len_in,char *&output,int &len_ou
} }
return 0; return 0;
} }
int put_crc32(char * s,int &len)
{
if(len<0) return -1;
u32_t crc32=crc32h((unsigned char *)s,len);
write_u32(s+len,crc32);
len+=sizeof(u32_t);
return 0;
}
int rm_crc32(char * s,int &len)
{
len-=sizeof(u32_t);
if(len<0) return -1;
u32_t crc32_in=read_u32(s+len);
u32_t crc32=crc32h((unsigned char *)s,len);
if(crc32!=crc32_in) return -1;
return 0;
}
int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out) int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out)
{ {
static char buf[buf_len]; static char buf[buf_len];

View File

@ -39,5 +39,6 @@ int send_fd (int fd,char * buf, int len,int flags);
int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out); int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out);
int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out ); int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out );
int put_crc32(char * s,int &len);
int rm_crc32(char * s,int &len);
#endif /* PACKET_H_ */ #endif /* PACKET_H_ */