implemented connection recovery

This commit is contained in:
wangyu 2017-07-29 02:13:42 +08:00 committed by wangyu
parent b4d9549c0b
commit 74b60b2007

202
main.cpp
View File

@ -111,6 +111,7 @@ program_mode_t program_mode=unset_mode;//0 unset; 1client 2server
int disable_bpf_filter=0; //for test only,most time no need to disable this
const int disable_conv_clear=0;
const int disable_conn_clear=0;
int seq_mode=2; //0 dont increase /1 increase //increase randomly,about every 5 packet
@ -123,7 +124,11 @@ uint64_t epoll_udp_fd_sn=epoll_udp_fd_sn_begin; //all udp_fd_sn > max uint32
enum server_current_state_t {server_nothing=0,server_syn_ack_sent,server_handshake_sent,server_ready};
enum client_current_state_t {client_nothing=0,client_syn_sent,client_ack_sent,client_handshake_sent,client_ready};
union current_state_t
{
server_current_state_t server_current_state;
client_current_state_t client_current_state;
};
int udp_fd=-1; //for client only
int raw_recv_fd=-1;
@ -133,8 +138,7 @@ int epollfd=-1;
int random_number_fd=-1;
int timer_fd=-1;
unordered_map<uint32_t,uint64_t> udp_fd_mp(100007);
unordered_map<uint32_t,uint64_t> timer_fd_mp(100007);
char key_string[1000]= "secret key";
@ -547,36 +551,47 @@ struct raw_info_t
};//g_raw_info;
struct conn_info_t
{
server_current_state_t server_current_state;
client_current_state_t client_current_state;
current_state_t state;
raw_info_t raw_info;
//packet_info_t recv_info;
//packet_info_t send_info;
long long last_state_time;
long long last_hb_recv_time;
long long last_hb_sent_time;
id_t my_id;
id_t oppsite_id;
id_t oppsite_const_id;
int retry_counter;
conv_manager_t conv_manager;
anti_replay_t anti_replay;
int timer_fd;
int retry_counter;
id_t oppsite_const_id;
conn_info_t()
{
//send_packet_info.protocol=g_packet_info_send.protocol;
server_current_state=server_nothing;
client_current_state=client_nothing;
if(program_mode==server_mode)
state.server_current_state=server_nothing;
else
state.client_current_state=client_nothing;
last_state_time=0;
oppsite_const_id=0;
}
~conn_info_t()
{
//send_packet_info.protocol=g_packet_info_send.protocol;
}
};//g_conn_info;
struct conn_manager_t
{
unordered_map<uint64_t,conn_info_t> mp;
unordered_map<uint32_t,uint64_t> const_id_mp;
unordered_map<uint64_t,conn_info_t>::iterator clear_it;
unordered_map<int,conn_info_t *> udp_fd_mp; //a bit dirty to used pointer,but can void unordered_map search
unordered_map<int,conn_info_t *> timer_fd_mp;//we can use pointer here since unordered_map.rehash() uses shallow copy
unordered_map<id_t,conn_info_t *> const_id_mp;
unordered_map<uint64_t,conn_info_t>::iterator clear_it;
unordered_map<uint64_t,conn_info_t>::iterator it; //moved out from function,for easier to change unordered_map to map
unordered_map<uint64_t,conn_info_t>::iterator old_it;//
@ -585,6 +600,8 @@ struct conn_manager_t
conn_manager_t()
{
mp.reserve(10007);
//timer_fd_mp.reserve(10007);
//udp_fd_mp.reserve(100007);
//current_ready_ip=0;
// current_ready_port=0;
}
@ -619,13 +636,13 @@ struct conn_manager_t
}
int clear_inactive()
{
if(disable_conv_clear) return 0;
if(disable_conn_clear) return 0;
//map<uint32_t,uint64_t>::iterator it;
int cnt=0;
it=clear_it;
int size=mp.size();
int num_to_clean=size/conv_clear_ratio+1; //clear 1/10 each time,to avoid latency glitch
int num_to_clean=size/conn_clear_ratio+1; //clear 1/10 each time,to avoid latency glitch
uint64_t current_time=get_current_time();
for(;;)
@ -638,18 +655,26 @@ int clear_inactive()
it=mp.begin();
}
if( current_time - it->second.last_hb_recv_time >conv_timeout )
if(it->second.state.server_current_state==server_ready&& current_time - it->second.last_hb_recv_time <=conn_timeout )
{
it++;
}
else if(it->second.state.server_current_state!=server_ready&& current_time - it->second.last_state_time <=server_handshake_timeout )
{
it++;
}
else
{
//mylog(log_info,"inactive conv %u cleared \n",it->first);
old_it=it;
it++;
timer_fd_mp.erase(old_it->second.timer_fd);
close(old_it->second.timer_fd);// close will auto delte it from epoll
if(old_it->second.oppsite_const_id!=0)
{
const_id_mp.erase(old_it->second.oppsite_const_id);
mp.erase(old_it->first);
}
else
{
it++;
}
cnt++;
}
@ -1038,14 +1063,14 @@ void server_clear_function(uint64_t u64)
mylog(log_fatal,"fd:%d epoll delete failed!!!!\n",fd);
myexit(-1); //this shouldnt happen
}*/ //no need
ret= close(fd);
ret= close(fd); //closed fd should be auto removed from epoll
if (ret!=0)
{
mylog(log_fatal,"close fd %d failed !!!!\n",fd);
myexit(-1); //this shouldnt happen
}
udp_fd_mp.erase(udp_fd);
conn_manager.udp_fd_mp.erase(udp_fd);
}
@ -2424,7 +2449,7 @@ int keep_connection_client(conn_info_t &conn_info) //for client
mylog(log_trace,"timer!\n");
begin:
if(conn_info.client_current_state==client_nothing)
if(conn_info.state.client_current_state==client_nothing)
{
conn_info.anti_replay.re_init(); // this is not safe
@ -2453,9 +2478,9 @@ int keep_connection_client(conn_info_t &conn_info) //for client
if(raw_mode==mode_faketcp)
{
conn_info.client_current_state = client_syn_sent;
conn_info.state.client_current_state = client_syn_sent;
conn_info.last_state_time = get_current_time();
mylog(log_info,"state changed from nothing to syn_sent %d\n",conn_info.client_current_state);
mylog(log_info,"state changed from nothing to syn_sent %d\n",conn_info.state.client_current_state);
conn_info.retry_counter = RETRY_TIME;
send_info.seq = get_true_random_number_nz();
@ -2469,7 +2494,7 @@ int keep_connection_client(conn_info_t &conn_info) //for client
}
else if(raw_mode==mode_udp||raw_mode==mode_icmp)
{
conn_info.client_current_state = client_ack_sent;
conn_info.state.client_current_state = client_ack_sent;
conn_info.last_state_time = get_current_time();
mylog(log_info,"state changed from nothing to ack_sent\n");
conn_info.retry_counter = RETRY_TIME;
@ -2481,11 +2506,11 @@ int keep_connection_client(conn_info_t &conn_info) //for client
}
return 0;
}
if(conn_info.client_current_state==client_syn_sent &&get_current_time()-conn_info.last_state_time>handshake_timeout)
if(conn_info.state.client_current_state==client_syn_sent &&get_current_time()-conn_info.last_state_time>handshake_timeout)
{
if(conn_info.retry_counter==0)
{
conn_info.client_current_state=client_nothing;
conn_info.state.client_current_state=client_nothing;
mylog(log_info,"state back to nothing\n");
return 0;
//goto begin;
@ -2498,11 +2523,11 @@ int keep_connection_client(conn_info_t &conn_info) //for client
conn_info.last_state_time=get_current_time();
}
}
if(conn_info.client_current_state==client_ack_sent &&get_current_time()-conn_info.last_state_time>handshake_timeout)
if(conn_info.state.client_current_state==client_ack_sent &&get_current_time()-conn_info.last_state_time>handshake_timeout)
{
if(conn_info.retry_counter==0)
{
conn_info.client_current_state=client_nothing;
conn_info.state.client_current_state=client_nothing;
mylog(log_info,"state back to nothing\n");
return 0;
//goto begin;
@ -2523,11 +2548,11 @@ int keep_connection_client(conn_info_t &conn_info) //for client
}
}
if(conn_info.client_current_state==client_handshake_sent&&get_current_time()-conn_info.last_state_time>handshake_timeout)
if(conn_info.state.client_current_state==client_handshake_sent&&get_current_time()-conn_info.last_state_time>handshake_timeout)
{
if(conn_info.retry_counter==0)
{
conn_info.client_current_state=client_nothing;
conn_info.state.client_current_state=client_nothing;
mylog(log_info,"state back to nothing\n");
return 0;
//goto begin;
@ -2545,12 +2570,12 @@ int keep_connection_client(conn_info_t &conn_info) //for client
}
if(conn_info.client_current_state==client_ready)
if(conn_info.state.client_current_state==client_ready)
{
mylog(log_trace,"time %lld %lld\n",get_current_time(),conn_info.last_state_time);
if(get_current_time()-conn_info.last_hb_recv_time>heartbeat_timeout)
{
conn_info.client_current_state=client_nothing;
conn_info.state.client_current_state=client_nothing;
conn_info.my_id=get_true_random_number_nz();
mylog(log_info,"state back to nothing\n");
return 0;
@ -2577,8 +2602,9 @@ int keep_connection_server_multi(conn_info_t &conn_info)
current_time_rough=get_current_time();
conn_info.conv_manager.clear_inactive();
if(conn_info.server_current_state==server_ready)
if(conn_info.state.server_current_state==server_ready)
{
/*
if( get_current_time()-conn_info.last_hb_recv_time>heartbeat_timeout )
{
mylog(log_trace,"%lld %lld\n",get_current_time(),conn_info.last_state_time);
@ -2589,7 +2615,7 @@ int keep_connection_server_multi(conn_info_t &conn_info)
mylog(log_info,"changed state to server_nothing\n");
return 0;
}
}*/ //dont need to do this at server,conn_manger will clear expired connections
if(get_current_time()-conn_info.last_hb_sent_time<heartbeat_interval)
{
@ -2743,7 +2769,7 @@ int client_on_raw_recv(conn_info_t &conn_info)
raw_info_t &raw_info=conn_info.raw_info;
mylog(log_debug,"i m here\n");
if(conn_info.client_current_state==client_syn_sent )
if(conn_info.state.client_current_state==client_syn_sent )
{
mylog(log_debug,"i m here3\n");
@ -2774,13 +2800,13 @@ int client_on_raw_recv(conn_info_t &conn_info)
send_raw(raw_info,0,0);
conn_info.client_current_state=client_ack_sent;
conn_info.state.client_current_state=client_ack_sent;
conn_info.last_state_time=get_current_time();
conn_info.retry_counter=RETRY_TIME;
mylog(log_info,"changed state to client_ack_sent\n");
}
if(conn_info.client_current_state==client_ack_sent )
if(conn_info.state.client_current_state==client_ack_sent )
{
mylog(log_debug,"i m here2\n");
@ -2817,11 +2843,11 @@ int client_on_raw_recv(conn_info_t &conn_info)
mylog(log_info,"<<handshake sent %x %d>>\n",conn_info.my_id,conn_info.oppsite_id);
conn_info.client_current_state=client_handshake_sent;
conn_info.state.client_current_state=client_handshake_sent;
conn_info.last_state_time=get_current_time();
conn_info.retry_counter=RETRY_TIME;
}
if(conn_info.client_current_state==client_handshake_sent)
if(conn_info.state.client_current_state==client_handshake_sent)
{
@ -2863,12 +2889,12 @@ int client_on_raw_recv(conn_info_t &conn_info)
}*/
mylog(log_info,"changed state to client_ready\n");
conn_info.client_current_state=client_ready;
conn_info.state.client_current_state=client_ready;
conn_info.last_state_time=get_current_time();
conn_info.last_hb_recv_time=get_current_time();
}
if(conn_info.client_current_state==client_ready )
if(conn_info.state.client_current_state==client_ready )
{
@ -3014,7 +3040,9 @@ int server_on_raw_ready(conn_info_t &conn_info)
}
conn_info.conv_manager.insert_conv(tmp_conv_id, new_udp_fd);
udp_fd_mp[new_udp_fd]=pack_u64(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port);
conn_manager.udp_fd_mp[new_udp_fd] = &conn_info;
//pack_u64(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port);
mylog(log_info, "new conv conv_id=%x, assigned fd=%d\n",
tmp_conv_id, new_udp_fd);
@ -3111,7 +3139,7 @@ int server_on_raw_recv_multi()
mylog(log_info,"changed state to server_syn_ack_sent\n");
conn_info.server_current_state = server_syn_ack_sent;
conn_info.state.server_current_state = server_syn_ack_sent;
conn_info.last_state_time = get_current_time();
}
else if(raw_mode==mode_udp||raw_mode==mode_icmp)
@ -3125,7 +3153,7 @@ int server_on_raw_recv_multi()
mylog(log_info,"changed state to server_heartbeat_sent_sent\n");
conn_info.server_current_state = server_handshake_sent;
conn_info.state.server_current_state = server_handshake_sent;
conn_info.last_state_time = get_current_time();
}
return 0;
@ -3137,7 +3165,7 @@ int server_on_raw_recv_multi()
packet_info_t &recv_info=conn_info.raw_info.recv_info;
raw_info_t &raw_info=conn_info.raw_info;
if(conn_info.server_current_state==server_ready)
if(conn_info.state.server_current_state==server_ready)
{
return server_on_raw_ready(conn_info);
}
@ -3146,7 +3174,7 @@ int server_on_raw_recv_multi()
return -1;
if(conn_info.server_current_state==server_syn_ack_sent)
if(conn_info.state.server_current_state==server_syn_ack_sent)
{
if(raw_mode==mode_faketcp&&!( recv_info.syn==0&&recv_info.ack==1 &&data_len==0)) return 0;
if(recv_info.src_ip!=send_info.dst_ip||recv_info.src_port!=send_info.dst_port)
@ -3164,10 +3192,10 @@ int server_on_raw_recv_multi()
mylog(log_info,"changed state to server_handshake_sent\n");
conn_info.server_current_state=server_handshake_sent;
conn_info.state.server_current_state=server_handshake_sent;
conn_info.last_state_time=get_current_time();
}
else if(conn_info.server_current_state==server_handshake_sent)//heart beat received
else if(conn_info.state.server_current_state==server_handshake_sent)//heart beat received
{
if(( raw_mode==mode_faketcp&& (recv_info.syn==1||recv_info.ack!=1)) ||data_len==0)
{
@ -3209,11 +3237,15 @@ int server_on_raw_recv_multi()
mylog(log_info,"received handshake %x %x\n",conn_info.oppsite_id,conn_info.my_id);
if(conn_manager.const_id_mp.find(tmp_oppsite_const_id)==conn_manager.const_id_mp.end())
{
//conn_manager.const_id_mp=
conn_info.server_current_state=server_ready;
conn_info.state.server_current_state = server_ready;
conn_info.oppsite_const_id=tmp_oppsite_const_id;
//conn_info.last_state_time=get_current_time(); //dont change this
//conn_info.last_state_time=get_current_time(); //dont change this!!!!!!!!!!!!!!!!!!!!!!!!!
//conn_manager.current_ready_ip=ip;
//conn_manager.current_ready_port=port;
@ -3232,7 +3264,7 @@ int server_on_raw_recv_multi()
//g_conn_info=conn_info;
int new_timer_fd;
set_timer_server(epollfd, new_timer_fd);
timer_fd_mp[new_timer_fd]=pack_u64(ip,port);
conn_manager.timer_fd_mp[new_timer_fd] = &conn_info;//pack_u64(ip,port);
//timer_fd_mp[new_timer_fd]
/*
if(oppsite_const_id!=0&&tmp_oppsite_const_id!=oppsite_const_id) //TODO MOVE TO READY
@ -3241,6 +3273,40 @@ int server_on_raw_recv_multi()
conv_manager.clear();
}*/
//oppsite_const_id=tmp_oppsite_const_id;
}
else
{
conn_info_t &ori_conn_info=*conn_manager.const_id_mp[tmp_oppsite_const_id];
if(ori_conn_info.state.server_current_state==server_ready)
{
if(conn_info.last_state_time<ori_conn_info.last_state_time)
{
mylog(log_info,"conn_info.last_state_time<ori_conn_info.last_state_time. ignored new handshake\n");
conn_info.state.server_current_state=server_nothing;
return 0;
}
mylog(log_info,"grabbed a connection\n");
//ori_conn_info.anti_replay
ori_conn_info.state.server_current_state=server_ready;
ori_conn_info.raw_info=conn_info.raw_info;
ori_conn_info.last_state_time=conn_info.last_state_time;
ori_conn_info.last_hb_recv_time=conn_info.last_hb_recv_time;
ori_conn_info.last_hb_sent_time=conn_info.last_hb_sent_time;
ori_conn_info.my_id=conn_info.my_id;
ori_conn_info.oppsite_id=conn_info.oppsite_id;
conn_info.state.server_current_state=server_nothing;
}
else
{
mylog(log_fatal,"this should never happen\n");
myexit(-1);
}
}
}
return 0;
@ -3684,7 +3750,7 @@ int client_event_loop()
if ((recv_len = recvfrom(udp_fd, buf, buf_len, 0,
(struct sockaddr *) &udp_new_addr_in, &slen)) == -1) {
mylog(log_error,"recv_from error,this shouldnt happen at client\n");
exit(1);
myexit(1);
};
mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
@ -3733,7 +3799,7 @@ int client_event_loop()
conn_info.conv_manager.update_active_time(conv);
if(conn_info.client_current_state==client_ready)
if(conn_info.state.client_current_state==client_ready)
{
send_data_safer(conn_info,buf,recv_len,conv);
}
@ -3831,21 +3897,21 @@ int server_event_loop()
uint64_t dummy;
read(fd, &dummy, 8);
if(timer_fd_mp.find(fd)==timer_fd_mp.end())
if(conn_manager.timer_fd_mp.find(fd)==conn_manager.timer_fd_mp.end())
{
mylog(log_info,"timer_fd no longer exits\n", nfds);
continue;
}
uint64_t u64=timer_fd_mp[fd];
uint32_t ip=get_u64_h(u64);
uint32_t port=get_u64_l(u64);
conn_info_t* p_conn_info=conn_manager.timer_fd_mp[fd];
uint32_t ip=p_conn_info->raw_info.recv_info.src_ip;
uint32_t port=p_conn_info->raw_info.recv_info.src_port;
if(!conn_manager.exist(ip,port))
{
mylog(log_info,"ip port no longer exits\n", nfds);
continue;
}
conn_info_t &conn_info=conn_manager.find(ip,port);
keep_connection_server_multi(conn_info);
//conn_info_t &conn_info=conn_manager.find(ip,port);
keep_connection_server_multi(*p_conn_info);
}
else if (events[n].data.u64 == raw_recv_fd)
{
@ -3859,23 +3925,23 @@ int server_event_loop()
int fd=int((events[n].data.u64<<32u)>>32u);
if(udp_fd_mp.find(fd)==udp_fd_mp.end())
if(conn_manager.udp_fd_mp.find(fd)==conn_manager.udp_fd_mp.end())
{
mylog(log_debug,"fd no longer exists in udp_fd_mp,udp fd %d\n",fd);
recv(fd,0,0,0);
continue;
}
uint64_t u64=udp_fd_mp[fd];
uint32_t ip=get_u64_h(u64);
uint32_t port=get_u64_l(u64);
conn_info_t* p_conn_info=conn_manager.udp_fd_mp[fd];
uint32_t ip=p_conn_info->raw_info.recv_info.src_ip;
uint32_t port=p_conn_info->raw_info.recv_info.src_port;
/*
if(conn_manager.exist(ip,port)==0)
{
mylog(log_debug,"conn_info no longer exists,udp fd %d\n",fd);
recv(fd,0,0,0);
continue;
}
conn_info_t &conn_info=conn_manager.find(ip,port);
}*/
conn_info_t &conn_info=*p_conn_info;
if(!conn_info.conv_manager.is_u64_used(fd))
{
@ -3901,7 +3967,7 @@ int server_event_loop()
conn_info.conv_manager.update_active_time(conv_id);
if(conn_info.server_current_state==server_ready)
if(conn_info.state.server_current_state==server_ready)
{
send_data_safer(conn_info,buf,recv_len,conv_id);
//send_data(g_packet_info_send,buf,recv_len,my_id,oppsite_id,conv_id);