diff --git a/common.cpp b/common.cpp index d7648fd..160e5e7 100644 --- a/common.cpp +++ b/common.cpp @@ -488,3 +488,41 @@ int round_up_div(int a,int b) { return (a+b-1)/b; } + +int create_fifo(char * file) +{ + if(mkfifo (file, 0666)!=0) + { + if(errno==EEXIST) + { + mylog(log_warn,"warning fifo file %s exist\n",file); + } + else + { + mylog(log_fatal,"create fifo file %s failed\n",file); + myexit(-1); + } + } + int fifo_fd=open (file, O_RDWR); + if(fifo_fd<0) + { + mylog(log_fatal,"create fifo file %s failed\n",file); + myexit(-1); + } + struct stat st; + if (fstat(fifo_fd, &st)!=0) + { + mylog(log_fatal,"fstat failed for fifo file %s\n",file); + myexit(-1); + } + + if(!S_ISFIFO(st.st_mode)) + { + mylog(log_fatal,"%s is not a fifo\n",file); + myexit(-1); + } + + setnonblocking(fifo_fd); + return fifo_fd; +} + diff --git a/common.h b/common.h index 5994d69..8eb69af 100644 --- a/common.h +++ b/common.h @@ -21,6 +21,7 @@ #include #include //for socket ofcourse #include +#include #include //for exit(0); #include //For errno - the error number #include //Provides declarations for tcp header @@ -211,6 +212,7 @@ int set_timer_ms(int epollfd,int &timer_fd,u32_t timer_interval); int round_up_div(int a,int b); +int create_fifo(char * file); /* int create_new_udp(int &new_udp_fd,int remote_address_uint32,int remote_port); */ diff --git a/fec_manager.cpp b/fec_manager.cpp index 2e2686b..aeed512 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -11,6 +11,15 @@ #include "lib/rs.h" #include "fd_manager.h" +int g_fec_data_num=20; +int g_fec_redundant_num=10; +int g_fec_mtu=1250; +int g_fec_pending_num=200; +int g_fec_pending_time=8*1000; //8ms +int g_fec_type=1; + +int dynamic_update_fec=1; + const int encode_fast_send=1; const int decode_fast_send=1; @@ -128,7 +137,8 @@ fec_encode_manager_t::fec_encode_manager_t() } timer_fd64=fd_manager.create(timer_fd); - re_init(4,2,1200,100,10000,0); + re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); + seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug. } fec_encode_manager_t::~fec_encode_manager_t() @@ -146,7 +156,7 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pen fec_mtu=mtu; fec_pending_num=pending_num; fec_pending_time=pending_time; - this->type=type; + fec_type=type; assert(data_num+redundant_num=0); @@ -199,17 +209,27 @@ int fec_encode_manager_t::append(char *s,int len/*,int &is_first_packet*/) } int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) { + if(counter==0&&dynamic_update_fec) + { + fec_data_num=g_fec_data_num; + fec_redundant_num=g_fec_redundant_num; + fec_mtu=g_fec_mtu; + fec_pending_num=g_fec_pending_num; + fec_pending_time=g_fec_pending_time; + fec_type=g_fec_type; + } + int about_to_fec=0; int delayed_append=0; //int counter_back=counter; - assert(type==0||type==1); + assert(fec_type==0||fec_type==1); - if(type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) + if(fec_type==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) { mylog(log_warn,"message too long len=%d,ignored\n",len); return -1; } - if(type==1&&s!=0&&len>=fec_mtu) + if(fec_type==1&&s!=0&&len>=fec_mtu) { mylog(log_warn,"message too long len=%d fec_mtu=%d,ignored\n",len,fec_mtu); return -1; @@ -221,10 +241,10 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) } if(s==0) about_to_fec=1;//now - if(type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet + if(fec_type==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet - if(type==0) assert(counter=1); assert(counter<=255); @@ -422,7 +442,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) write_u32(input_buf[input_buf_idx]+tmp_idx,seq); tmp_idx+=sizeof(u32_t); - input_buf[input_buf_idx][tmp_idx++]=(unsigned char)type; + input_buf[input_buf_idx][tmp_idx++]=(unsigned char)fec_type; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)0; input_buf[input_buf_idx][tmp_idx++]=(unsigned char)((u32_t)input_buf_idx); @@ -444,7 +464,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) if(s!=0&&delayed_append) { - assert(type!=1); + assert(fec_type!=1); append(s,len); } diff --git a/fec_manager.h b/fec_manager.h index f0f8fbc..cfbdd91 100644 --- a/fec_manager.h +++ b/fec_manager.h @@ -19,6 +19,17 @@ const int max_fec_packet_num=255;// this is the limitation of the rs lib extern u32_t fec_buff_num; +/*begin for first time init or dynamic update*/ +extern int g_fec_data_num; +extern int g_fec_redundant_num; +extern int g_fec_mtu; +extern int g_fec_pending_num; +extern int g_fec_pending_time; //8ms +extern int g_fec_type; +extern int dynamic_update_fec; +/*end for first time init or dynamic update*/ + + struct anti_replay_t { @@ -98,7 +109,7 @@ class fec_encode_manager_t private: u32_t seq; - int type; + int fec_type; int fec_data_num,fec_redundant_num; int fec_mtu; int fec_pending_num; @@ -134,9 +145,14 @@ public: return first_packet_time_for_output; } + int get_pending_time() + { + return fec_pending_time; + } + int get_type() { - return type; + return fec_type; } u64_t get_timer_fd64(); int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type); diff --git a/main.cpp b/main.cpp index 50b182e..2df0500 100644 --- a/main.cpp +++ b/main.cpp @@ -24,6 +24,8 @@ typedef int i32_t; //int random_number_fd=-1; +char fifo_file[1000]=""; + int mtu_warn=1350; int disable_mtu_warn=1; @@ -31,12 +33,6 @@ int disable_fec=0; int debug_force_flush_fec=0; -int fec_data_num=20; -int fec_redundant_num=10; -int fec_mtu=1250; -int fec_pending_num=200; -int fec_pending_time=8*1000; //8ms -int fec_type=1; int jitter_min=0*1000; int jitter_max=0*1000; @@ -196,9 +192,9 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch my_time_t tmp; assert(first_packet_time!=0); //mylog(log_info,"current_time=%llu first_packlet_time=%llu fec_pending_time=%llu\n",current_time,first_packet_time,(my_time_t)fec_pending_time); - if((my_time_t)fec_pending_time >=(current_time - first_packet_time)) + if((my_time_t)conn_info.fec_encode_manager.get_pending_time() >=(current_time - first_packet_time)) { - tmp=(my_time_t)fec_pending_time-(current_time - first_packet_time); + tmp=(my_time_t)conn_info.fec_encode_manager.get_pending_time()-(current_time - first_packet_time); //mylog(log_info,"tmp=%llu\n",tmp); } else @@ -318,7 +314,7 @@ int client_event_loop() conn_info_t *conn_info_p=new conn_info_t; conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack //conn_info.conv_manager.reserve(); - conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); + //conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); init_listen_socket(); @@ -383,6 +379,22 @@ int client_event_loop() + int fifo_fd=-1; + + if(fifo_file[0]!=0) + { + fifo_fd=create_fifo(fifo_file); + ev.events = EPOLLIN; + ev.data.u64 = fifo_fd; + + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev); + if (ret!= 0) { + mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); + myexit(-1); + } + mylog(log_info,"fifo_file=%s\n",fifo_file); + } + while(1)//////////////////////// { if(about_to_exit) myexit(0); @@ -425,6 +437,24 @@ int client_event_loop() } } } + else if (events[idx].data.u64 == (u64_t)fifo_fd) + { + char buf[buf_len]; + int len=read (fifo_fd, buf, sizeof (buf)); + assert(len>=0); + buf[len]=0; + while(len>=1&&buf[len-1]=='\n') + buf[len-1]=0; + mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf); + if(0) + { + } + else + { + mylog(log_info,"unknown command\n"); + } + + } else if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) { char data[buf_len]; @@ -651,10 +681,24 @@ int server_event_loop() timer.add_fd_to_epoll(epoll_fd); timer.set_timer_repeat_us(timer_interval*1000); - - mylog(log_debug," timer.get_timer_fd() =%d\n",timer.get_timer_fd()); + int fifo_fd=-1; + + if(fifo_file[0]!=0) + { + fifo_fd=create_fifo(fifo_file); + ev.events = EPOLLIN; + ev.data.u64 = fifo_fd; + + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev); + if (ret!= 0) { + mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno)); + myexit(-1); + } + mylog(log_info,"fifo_file=%s\n",fifo_file); + } + while(1)//////////////////////// { @@ -693,6 +737,26 @@ int server_event_loop() mylog(log_trace,"events[idx].data.u64==(u64_t)timer.get_timer_fd()\n"); //conn_info.conv_manager.clear_inactive(); } + + else if (events[idx].data.u64 == (u64_t)fifo_fd) + { + char buf[buf_len]; + int len=read (fifo_fd, buf, sizeof (buf)); + assert(len>=0); + buf[len]=0; + while(len>=1&&buf[len-1]=='\n') + buf[len-1]=0; + mylog(log_info,"got data from fifo,len=%d,s=[%s]\n",len,buf); + if(0) + { + } + else + { + mylog(log_info,"unknown command\n"); + } + + } + else if (events[idx].data.u64 == (u64_t)local_listen_fd) { @@ -738,7 +802,7 @@ int server_event_loop() conn_manager.insert(ip_port); conn_info_t &conn_info=conn_manager.find(ip_port); - conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); + //conn_info.fec_encode_manager.re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_pending_time,fec_type); //conn_info.conv_manager.reserve(); //already reserved in constructor u64_t fec_fd64=conn_info.fec_encode_manager.get_timer_fd64(); @@ -1101,7 +1165,7 @@ int unit_test() int * len; fec_decode_manager.output(n,s_arr,len); - fec_encode_manager.re_init(3,2,fec_mtu,fec_pending_num,fec_pending_time,1); + fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_pending_num,g_fec_pending_time,1); fec_encode_manager.input((char *) a.c_str(), a.length()); fec_encode_manager.output(n,s_arr,len); @@ -1233,7 +1297,7 @@ void process_arg(int argc, char *argv[]) {"queue-len", required_argument, 0,'q'}, {"fec", required_argument, 0,'f'}, {"jitter", required_argument, 0,'j'}, - + {"fifo", required_argument, 0, 1}, {NULL, 0, 0, 0} }; int option_index = 0; @@ -1369,8 +1433,8 @@ void process_arg(int argc, char *argv[]) } else { - sscanf(optarg,"%d:%d\n",&fec_data_num,&fec_redundant_num); - if(fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254) + sscanf(optarg,"%d:%d\n",&g_fec_data_num,&g_fec_redundant_num); + if(g_fec_data_num<1 ||g_fec_redundant_num<0||g_fec_data_num+g_fec_redundant_num>254) { mylog(log_fatal,"fec_data_num<1 ||fec_redundant_num<0||fec_data_num+fec_redundant_num>254\n"); myexit(-1); @@ -1378,8 +1442,8 @@ void process_arg(int argc, char *argv[]) } break; case 'q': - sscanf(optarg,"%d",&fec_pending_num); - if(fec_pending_num<1||fec_pending_num>10000) + sscanf(optarg,"%d",&g_fec_pending_num); + if(g_fec_pending_num<1||g_fec_pending_num>10000) { mylog(log_fatal,"fec_pending_num should be between 1 and 10000\n"); @@ -1518,8 +1582,8 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"mode")==0) { - sscanf(optarg,"%d",&fec_type); - if(fec_type!=0&&fec_type!=1) + sscanf(optarg,"%d",&g_fec_type); + if(g_fec_type!=0&&g_fec_type!=1) { mylog(log_fatal,"mode should be 0 or 1\n"); myexit(-1); @@ -1527,8 +1591,8 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"mtu")==0) { - sscanf(optarg,"%d",&fec_mtu); - if(fec_mtu<100||fec_mtu>2000) + sscanf(optarg,"%d",&g_fec_mtu); + if(g_fec_mtu<100||g_fec_mtu>2000) { mylog(log_fatal,"fec_mtu should be between 100 and 2000\n"); myexit(-1); @@ -1536,14 +1600,20 @@ void process_arg(int argc, char *argv[]) } else if(strcmp(long_options[option_index].name,"timeout")==0) { - sscanf(optarg,"%d",&fec_pending_time); - if(fec_pending_time<0||fec_pending_time>1000) + sscanf(optarg,"%d",&g_fec_pending_time); + if(g_fec_pending_time<0||g_fec_pending_time>1000) { mylog(log_fatal,"fec_pending_time should be between 0 and 1000(1s)\n"); myexit(-1); } - fec_pending_time*=1000; + g_fec_pending_time*=1000; + } + else if(strcmp(long_options[option_index].name,"fifo")==0) + { + sscanf(optarg,"%s",fifo_file); + + mylog(log_info,"fifo_file =%s \n",fifo_file); } else { @@ -1583,8 +1653,8 @@ void process_arg(int argc, char *argv[]) } mylog(log_info,"jitter_min=%d jitter_max=%d output_interval_min=%d output_interval_max=%d fec_pending_time=%d fec_data_num=%d fec_redundant_num=%d fec_mtu=%d fec_pending_num=%d fec_type=%d\n", - jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,fec_pending_time/1000, - fec_data_num,fec_redundant_num,fec_mtu,fec_pending_num,fec_type); + jitter_min/1000,jitter_max/1000,output_interval_min/1000,output_interval_max/1000,g_fec_pending_time/1000, + g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_pending_num,g_fec_type); } int main(int argc, char *argv[])