From 44e7fb94f45eaaa3ffb2394b3d7458969d7b0eef Mon Sep 17 00:00:00 2001 From: wangyu- Date: Fri, 13 Oct 2017 07:56:53 -0500 Subject: [PATCH] add log --- common.cpp | 43 +++++++---- common.h | 5 +- connection.cpp | 1 + connection.h | 5 +- fec_manager.cpp | 2 +- main.cpp | 192 ++++++++++++++++++++++++------------------------ packet.cpp | 6 ++ 7 files changed, 138 insertions(+), 116 deletions(-) diff --git a/common.cpp b/common.cpp index f23c4c1..c2cdf7f 100644 --- a/common.cpp +++ b/common.cpp @@ -16,12 +16,34 @@ raw_mode_t raw_mode=mode_faketcp; unordered_map raw_mode_tostring = {{mode_faketcp, "faketcp"}, {mode_udp, "udp"}, {mode_icmp, "icmp"}}; int max_pending_packet=0; -static int random_number_fd=-1; +//static int random_number_fd=-1; char iptables_rule[200]=""; //int is_client = 0, is_server = 0; program_mode_t program_mode=unset_mode;//0 unset; 1client 2server + + +struct random_fd_t +{ + int random_number_fd; + random_fd_t() + { + random_number_fd=open("/dev/urandom",O_RDONLY); + + if(random_number_fd==-1) + { + mylog(log_fatal,"error open /dev/urandom\n"); + myexit(-1); + } + setnonblocking(random_number_fd); + } + int get_fd() + { + return random_number_fd; + } +}random_fd; + u64_t get_current_time()//ms { timespec tmp_time; @@ -138,22 +160,11 @@ int clear_iptables_rule() } -void init_random_number_fd() -{ - random_number_fd=open("/dev/urandom",O_RDONLY); - - if(random_number_fd==-1) - { - mylog(log_fatal,"error open /dev/urandom\n"); - myexit(-1); - } - setnonblocking(random_number_fd); -} u64_t get_true_random_number_64() { u64_t ret; - int size=read(random_number_fd,&ret,sizeof(ret)); + int size=read(random_fd.get_fd(),&ret,sizeof(ret)); if(size!=sizeof(ret)) { mylog(log_fatal,"get random number failed %d\n",size); @@ -166,7 +177,7 @@ u64_t get_true_random_number_64() u32_t get_true_random_number() { u32_t ret; - int size=read(random_number_fd,&ret,sizeof(ret)); + int size=read(random_fd.get_fd(),&ret,sizeof(ret)); if(size!=sizeof(ret)) { mylog(log_fatal,"get random number failed %d\n",size); @@ -280,7 +291,7 @@ void myexit(int a) { if(enable_log_color) printf("%s\n",RESET); - clear_iptables_rule(); + // clear_iptables_rule(); exit(a); } void signal_handler(int sig) @@ -380,7 +391,7 @@ bool larger_than_u16(uint16_t a,uint16_t b) void get_true_random_chars(char * s,int len) { - int size=read(random_number_fd,s,len); + int size=read(random_fd.get_fd(),s,len); if(size!=len) { printf("get random number failed\n"); diff --git a/common.h b/common.h index e12f48e..0911165 100644 --- a/common.h +++ b/common.h @@ -63,7 +63,7 @@ typedef short i16_t; typedef u64_t my_time_t; -const int max_data_len=1600; +const int max_data_len=2000; const int buf_len=max_data_len+200; const u32_t conv_clear_interval=200; @@ -71,6 +71,7 @@ const u32_t conv_clear_interval=200; ////const u32_t conv_timeout=180000; const u32_t conv_timeout=40000;//for test const int max_conv_num=10000; +const int max_conn_num=200; /* const u32_t max_handshake_conn_num=10000; @@ -96,7 +97,7 @@ const i32_t max_fail_time=0;//disable const u32_t heartbeat_interval=1000; -const u32_t timer_interval=50;//this should be smaller than heartbeat_interval and retry interval; +const u32_t timer_interval=500;//this should be smaller than heartbeat_interval and retry interval; //const uint32_t conv_timeout=120000; //120 second //const u32_t conv_timeout=120000; //for test diff --git a/connection.cpp b/connection.cpp index 7ac7519..fee6b60 100644 --- a/connection.cpp +++ b/connection.cpp @@ -27,6 +27,7 @@ conv_manager_t::conv_manager_t() { clear_it=conv_last_active_time.begin(); long long last_clear_time=0; + reserve(); } conv_manager_t::~conv_manager_t() { diff --git a/connection.h b/connection.h index a68e029..3344c13 100644 --- a/connection.h +++ b/connection.h @@ -66,8 +66,7 @@ struct conv_manager_t // manage the udp connections int erase_conv(u32_t conv); int clear_inactive(char * ip_port=0); int clear_inactive0(char * ip_port); -};//g_conv_manager; - +}; struct conn_info_t //stores info for a raw connection.for client ,there is only one connection,for server there can be thousand of connection since server can //handle multiple clients { @@ -88,7 +87,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o { assert(0==1); } -};//g_conn_info; +}; struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections { diff --git a/fec_manager.cpp b/fec_manager.cpp index 45066e2..8de3594 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -301,7 +301,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) //output_len=blob_len+sizeof(u32_t)+4*sizeof(char);/////remember to change this 4,if modified the protocol rs_encode2(actual_data_num,actual_data_num+actual_redundant_num,tmp_output_buf,fec_len); - mylog(log_trace,"!!! s= %d\n"); + //mylog(log_trace,"!!! s= %d\n"); ready_for_output=1; seq++; diff --git a/main.cpp b/main.cpp index e2a972a..5ff198c 100644 --- a/main.cpp +++ b/main.cpp @@ -27,6 +27,8 @@ int jitter_max=0; int mtu_warn=1350; +int disable_mtu_warn=0; + int fec_data_num=20; int fec_redundant_num=8; int fec_mtu=1300; @@ -76,6 +78,8 @@ int init_listen_socket() setnonblocking(local_listen_fd); set_buf_size(local_listen_fd,socket_buf_size); + mylog(log_debug,"local_listen_fd=%d\n,",local_listen_fd); + return 0; } int new_connected_socket(int &fd,u32_t ip,int port) @@ -219,12 +223,13 @@ int client_event_loop() conn_info_t *conn_info_p=new conn_info_t; conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack - conn_info.conv_manager.reserve(); + //conn_info.conv_manager.reserve(); conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); init_listen_socket(); epoll_fd = epoll_create1(0); + assert(epoll_fd>0); const int max_events = 4096; struct epoll_event ev, events[max_events]; @@ -244,6 +249,8 @@ int client_event_loop() assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0); remote_fd64=fd_manager.create(remote_fd); + mylog(log_debug,"remote_fd64=%llu\n",remote_fd64); + ev.events = EPOLLIN; ev.data.u64 = remote_fd64; @@ -255,6 +262,8 @@ int client_event_loop() ev.events = EPOLLIN; ev.data.u64 = delay_manager.get_timer_fd(); + + mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd()); ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev); if (ret!= 0) { mylog(log_fatal,"add delay_manager.get_timer_fd() error\n"); @@ -264,6 +273,8 @@ int client_event_loop() u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64(); ev.events = EPOLLIN; ev.data.u64 = fd64; + + mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64()); ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); if (ret!= 0) { mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); @@ -274,6 +285,10 @@ int client_event_loop() conn_info.timer.add_fd_to_epoll(epoll_fd); conn_info.timer.set_timer_repeat_us(timer_interval*1000); + mylog(log_debug,"conn_info.timer.get_timer_fd()=%d\n",conn_info.timer.get_timer_fd()); + + + while(1)//////////////////////// { if(about_to_exit) myexit(0); @@ -298,6 +313,7 @@ int client_event_loop() uint64_t value; read(conn_info.timer.get_timer_fd(), &value, 8); conn_info.conv_manager.clear_inactive(); + mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n"); } else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) { @@ -313,28 +329,32 @@ int client_event_loop() if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) { + mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n"); + //mylog(log_info,"timer!!!\n"); uint64_t value; if(!fd_manager.exist(fd64)) //fd64 has been closed { + mylog(log_trace,"!fd_manager.exist(fd64)"); continue; } - if(read(fd_manager.to_fd(fd64), &value, 8)!=8) + if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8) { - mylog(log_info,"unknow!!!\n"); + mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret); continue; } if(value==0) { - mylog(log_info,"cancel!!!\n"); + mylog(log_debug,"value==0\n"); continue; } assert(value==1); from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); //from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); } - else + else//events[idx].data.u64 == (u64_t)local_listen_fd { + mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n"); struct sockaddr_in udp_new_addr_in={0}; socklen_t udp_new_addr_len = sizeof(sockaddr_in); if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0, @@ -343,18 +363,18 @@ int client_event_loop() myexit(1); }; - if(data_len>=mtu_warn) + if(!disable_mtu_warn&&data_len>=mtu_warn) { mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), ntohs(udp_new_addr_in.sin_port),data_len); + ip_port.ip=udp_new_addr_in.sin_addr.s_addr; ip_port.port=ntohs(udp_new_addr_in.sin_port); u64_t u64=ip_port.to_u64(); - if(!conn_info.conv_manager.is_u64_used(u64)) { if(conn_info.conv_manager.get_size() >=max_conv_num) @@ -369,6 +389,7 @@ int client_event_loop() else { conv=conn_info.conv_manager.find_conv_by_u64(u64); + mylog(log_trace,"conv=%d\n",conv); } conn_info.conv_manager.update_active_time(conv); char * new_data; @@ -380,11 +401,7 @@ int client_event_loop() from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); } - - if(out_n!=-1) - { - mylog(log_debug,"n=%d\n",out_n); - } + mylog(log_trace,"out_n=%d\n",out_n); for(int i=0;i",delay_mp.size()); //fflush(stdout); } @@ -402,6 +420,7 @@ int client_event_loop() char data[buf_len]; if(!fd_manager.exist(events[idx].data.u64)) //fd64 has been closed { + mylog(log_trace,"!fd_manager.exist(events[idx].data.u64)"); continue; } assert(events[idx].data.u64==remote_fd64); @@ -419,7 +438,7 @@ int client_event_loop() mylog(log_warn, "recv failed %d ,udp_fd%d,errno:%s\n", data_len,remote_fd,strerror(errno)); continue; } - if(data_len>mtu_warn) + if(!disable_mtu_warn&&data_len>mtu_warn) { mylog(log_warn,"huge packet,data len=%d (>%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } @@ -432,14 +451,22 @@ int client_event_loop() int out_n;char **out_arr;int *out_len;int *out_delay; from_fec_to_normal(conn_info,data,data_len,out_n,out_arr,out_len,out_delay); + mylog(log_trace,"out_n=%d\n",out_n); + for(int i=0;i0); const int max_events = 4096; struct epoll_event ev, events[max_events]; @@ -513,12 +529,18 @@ int server_event_loop() myexit(-1); } + mylog(log_debug," delay_manager.get_timer_fd() =%d\n", delay_manager.get_timer_fd()); + mylog(log_info,"now listening at %s:%d\n",my_ntoa(local_ip_uint32),local_port); my_timer_t timer; timer.add_fd_to_epoll(epoll_fd); timer.set_timer_repeat_us(timer_interval*1000); + + + mylog(log_debug," timer.get_timer_fd() =%d\n",timer.get_timer_fd()); + while(1)//////////////////////// { @@ -554,10 +576,13 @@ int server_event_loop() uint64_t value; read(timer.get_timer_fd(), &value, 8); conn_manager.clear_inactive(); + mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n"); //conn_info.conv_manager.clear_inactive(); } else if (events[idx].data.u64 == (u64_t)local_listen_fd) { + + mylog(log_trace,"events[idx].data.u64 == (u64_t)local_listen_fd\n"); //int recv_len; char data[buf_len]; int data_len; @@ -568,45 +593,56 @@ int server_event_loop() mylog(log_error,"recv_from error,this shouldnt happen at client\n"); myexit(1); }; + mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), + ntohs(udp_new_addr_in.sin_port),data_len); - if(data_len>=mtu_warn) + if(!disable_mtu_warn&&data_len>=mtu_warn)///////////////////////delete this for type 0 in furture { mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } + if(rm_crc32(data,data_len)!=0) { mylog(log_debug,"crc32 check error"); + continue; } - mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), - ntohs(udp_new_addr_in.sin_port),data_len); + ip_port_t ip_port; ip_port.ip=udp_new_addr_in.sin_addr.s_addr; ip_port.port=ntohs(udp_new_addr_in.sin_port); + mylog(log_trace,"ip_port= %s\n",ip_port.to_s()); if(!conn_manager.exist(ip_port)) { + if(conn_manager.mp.size() >=max_conn_num) + { + mylog(log_warn,"new connection %s ignored bc max_conn_num exceed\n",ip_port.to_s()); + continue; + } + conn_manager.insert(ip_port); conn_info_t &conn_info=conn_manager.find(ip_port); conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); - conn_info.conv_manager.reserve(); - + //conn_info.conv_manager.reserve(); //already reserved in constructor u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64(); + mylog(log_debug,"fec_fd64=%llu\n",fec_fd64); ev.events = EPOLLIN; ev.data.u64 = fec_fd64; ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fec_fd64), &ev); fd_manager.get_info(fec_fd64).ip_port=ip_port; - - conn_info.timer.add_fd64_to_epoll(epoll_fd); - conn_info.timer.set_timer_repeat_us(timer_interval*1000); + conn_info.timer.set_timer_repeat_us(timer_interval*100); + + mylog(log_debug,"conn_info.timer.get_timer_fd64()=%llu\n",conn_info.timer.get_timer_fd64()); + u64_t timer_fd64=conn_info.timer.get_timer_fd64(); fd_manager.get_info(timer_fd64).ip_port=ip_port; - mylog(log_info,"new connection from %s\n",ip_port.to_s()); + mylog(log_info,"new connection from %s,conn_info.timer_fd %lld,\n",ip_port.to_s(),timer.get_timer_fd64()); } conn_info_t &conn_info=conn_manager.find(ip_port); @@ -622,7 +658,10 @@ int server_event_loop() char *new_data; int new_len; if(get_conv(conv,out_arr[i],out_len[i],new_data,new_len)!=0) + { + mylog(log_debug,"get_conv failed"); continue; + } /* id_t tmp_conv_id; @@ -631,11 +670,17 @@ int server_event_loop() if (!conn_info.conv_manager.is_conv_used(conv)) { + if(conn_info.conv_manager.get_size() >=max_conv_num) + { + mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); + continue; + } + int new_udp_fd; ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port); if (ret != 0) { - mylog(log_warn, "[%s:%d]new_connected_socket failed\n",my_ntoa(ip_port.ip),ip_port.port); + mylog(log_warn, "[%s]new_connected_socket failed\n",ip_port.to_s()); continue; } @@ -648,7 +693,7 @@ int server_event_loop() fd_manager.get_info(fd64).ip_port=ip_port; - mylog(log_info,"[%s]new conv %x,fd %d created\n",ip_port.to_s(),conv,new_udp_fd); + mylog(log_info,"[%s]new conv %x,fd %d created,fd64=%llu\n",ip_port.to_s(),conv,new_udp_fd,fd64); //assert(!conn_manager.exist_fd64(fd64)); //conn_manager.insert_fd64(fd64,ip_port); @@ -659,53 +704,14 @@ int server_event_loop() dest_t dest; dest.type=type_fd64; dest.inner.fd64=fd64; - //dest.conv=conv; delay_send(out_delay[i],dest,new_data,new_len); } - //int fd = int((u64 << 32u) >> 32u); - //////////////////////////////todo - - //u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port); } - /* - else if ((events[idx].data.u64 >>32u) == 2u) - { - if(debug_flag)begin_time=get_current_time(); - int fd=get_u64_l(events[idx].data.u64); - u64_t dummy; - read(fd, &dummy, 8); - - if(conn_manager.timer_fd_mp.find(fd)==conn_manager.timer_fd_mp.end()) //this can happen,when fd is a just closed fd - { - mylog(log_info,"timer_fd no longer exits\n"); - continue; - } - conn_info_t* p_conn_info=conn_manager.timer_fd_mp[fd]; - u32_t ip=p_conn_info->raw_info.recv_info.src_ip; - u32_t port=p_conn_info->raw_info.recv_info.src_port; - assert(conn_manager.exist(ip,port));//TODO remove this for peformance - - assert(p_conn_info->state.server_current_state == server_ready); //TODO remove this for peformance - - //conn_info_t &conn_info=conn_manager.find(ip,port); - char ip_port[40]; - - sprintf(ip_port,"%s:%d",my_ntoa(ip),port); - - server_on_timer_multi(*p_conn_info,ip_port); - - if(debug_flag) - { - end_time=get_current_time(); - mylog(log_debug,"(events[idx].data.u64 >>32u) == 2u ,%llu,%llu,%llu \n",begin_time,end_time,end_time-begin_time); - } - }*/ else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()) { uint64_t value; read(delay_manager.get_timer_fd(), &value, 8); - //printf("",delay_mp.size()); - //fflush(stdout); + mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n"); } else if (events[idx].data.u64 >u32_t(-1)) { @@ -713,24 +719,21 @@ int server_event_loop() int data_len; u32_t conv; fd64_t fd64=events[idx].data.u64; + mylog(log_trace,"events[idx].data.u64 >u32_t(-1),%llu\n",(u64_t)events[idx].data.u64); if(!fd_manager.exist(fd64)) //fd64 has been closed { + mylog(log_trace,"!fd_manager.exist(fd64)\n"); continue; } - //assert(conn_manager.exist_fd64(fd64)); - assert(fd_manager.exist_info(fd64)); ip_port_t ip_port=fd_manager.get_info(fd64).ip_port; - assert(conn_manager.exist(ip_port)); - //conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port); - conn_info_t &conn_info=conn_manager.find(ip_port); - conn_info.update_active_time(); + //conn_info.update_active_time(); //cant put it here - int out_n;char **out_arr;int *out_len;int *out_delay; + int out_n=-2;char **out_arr;int *out_len;int *out_delay; dest_t dest; dest.type=type_ip_port; //dest.conv=conv; @@ -739,14 +742,16 @@ int server_event_loop() if(fd64==conn_info.fec_encode_manager.get_timer_fd64()) { - //mylog(log_info,"timer!!!\n"); + //mylog(log_infol,"timer!!!\n"); uint64_t value; - if(read(fd_manager.to_fd(fd64), &value, 8)!=8) + if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8) { + mylog(log_trace,"fd_manager.to_fd(fd64), &value, 8)!=8 ,%d\n",ret); continue; } if(value==0) { + mylog(log_trace,"value==0\n"); continue; } assert(value==1); @@ -757,20 +762,20 @@ int server_event_loop() uint64_t value; read(conn_info.timer.get_timer_fd(), &value, 8); conn_info.conv_manager.clear_inactive(); + continue; } else { - - assert(conn_info.conv_manager.is_u64_used(fd64)); conv=conn_info.conv_manager.find_conv_by_u64(fd64); conn_info.conv_manager.update_active_time(conv); + conn_info.update_active_time(); int fd=fd_manager.to_fd(fd64); data_len=recv(fd,data,max_data_len,0); - mylog(log_trace,"received a packet from udp_fd,len:%d\n",data_len); + mylog(log_trace,"received a packet from udp_fd,len:%d,conv=%d\n",data_len,conv); if(data_len<0) { @@ -779,8 +784,7 @@ int server_event_loop() continue; } - - if(data_len>=mtu_warn) + if(!disable_mtu_warn&&data_len>=mtu_warn) { mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } @@ -789,11 +793,10 @@ int server_event_loop() int new_len; put_conv(conv,data,data_len,new_data,new_len); - - from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); } + mylog(log_trace,"out_n=%d\n",out_n); for(int i=0;i