diff --git a/fec_manager.cpp b/fec_manager.cpp index 2515c65..dc47aad 100644 --- a/fec_manager.cpp +++ b/fec_manager.cpp @@ -136,6 +136,7 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int pen fec_redundant_num=redundant_num; fec_mtu=mtu; fec_pending_num=pending_num; + fec_pending_time=pending_time; counter=0; blob_encode.clear(); @@ -180,6 +181,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) output_len=blob_len+sizeof(u32_t)+4*sizeof(char);/////remember to change this 4,if modified the protocol rs_encode2(fec_data_num,fec_data_num+fec_redundant_num,output_buf,blob_len); + for(int i=0;i<fec_data_num+fec_redundant_num;i++) { output_buf[i]=buf[i]; @@ -192,7 +194,15 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/) } if(s!=0) { - //if(counter==0) is_first_packet=1; + if(counter==0) + { + itimerspec its; + memset(&its.it_interval,0,sizeof(its.it_interval)); + my_time_t tmp_time=fec_pending_time+get_current_time_us(); + its.it_value.tv_sec=tmp_time/1000000llu; + its.it_value.tv_nsec=(tmp_time%1000000llu)*1000llu; + timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0); + } blob_encode.input(s,len); counter++; } diff --git a/fec_manager.h b/fec_manager.h index b064d9c..656d358 100644 --- a/fec_manager.h +++ b/fec_manager.h @@ -111,6 +111,7 @@ class fec_encode_manager_t blob_encode_t blob_encode; public: fec_encode_manager_t(); + ~fec_encode_manager_t(); u64_t get_timer_fd64(); int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time); diff --git a/main.cpp b/main.cpp index 7bc1b59..4a6491b 100644 --- a/main.cpp +++ b/main.cpp @@ -31,7 +31,7 @@ int fec_data_num=3; int fec_redundant_num=2; int fec_mtu=30; int fec_pending_num=5; -int fec_pending_time=10000; +int fec_pending_time=2000000; u32_t local_ip_uint32,remote_ip_uint32=0; char local_ip[100], remote_ip[100]; int local_port = -1, remote_port = -1; @@ -259,6 +259,10 @@ int client_event_loop() ev.events = EPOLLIN; ev.data.u64 = fd64; ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev); + if (ret!= 0) { + mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n"); + myexit(-1); + } while(1)//////////////////////// { @@ -279,30 +283,60 @@ int client_event_loop() } int idx; for (idx = 0; idx < nfds; ++idx) { - if (events[idx].data.u64 == (u64_t)local_listen_fd) + if (events[idx].data.u64 == (u64_t)local_listen_fd||events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) { char data[buf_len]; int data_len; - struct sockaddr_in udp_new_addr_in={0}; - socklen_t udp_new_addr_len = sizeof(sockaddr_in); - if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0, - (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { - mylog(log_error,"recv_from error,this shouldnt happen at client\n"); - myexit(1); - }; - - if(data_len>=mtu_warn) - { - mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); - } - mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), - ntohs(udp_new_addr_in.sin_port),data_len); - ip_port_t ip_port; - ip_port.ip=udp_new_addr_in.sin_addr.s_addr; - ip_port.port=ntohs(udp_new_addr_in.sin_port); - u64_t u64=ip_port.to_u64(); u32_t conv; + int out_n;char **out_arr;int *out_len;int *out_delay; + dest_t dest; + dest.type=type_fd64; + dest.inner.fd64=remote_fd64; + + if(events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()) + { + mylog(log_info,"timer!!!\n"); + uint64_t value; + if(!fd_manager.exist(fd64)) //fd64 has been closed + { + continue; + } + if(read(fd_manager.to_fd(fd64), &value, 8)!=8) + { + mylog(log_info,"unknow!!!\n"); + continue; + } + if(value==0) + { + mylog(log_info,"cancel!!!\n"); + continue; + } + assert(value==1); + from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); + //from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); + } + else + { + struct sockaddr_in udp_new_addr_in={0}; + socklen_t udp_new_addr_len = sizeof(sockaddr_in); + if ((data_len = recvfrom(local_listen_fd, data, max_data_len, 0, + (struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) { + mylog(log_error,"recv_from error,this shouldnt happen at client\n"); + myexit(1); + }; + + if(data_len>=mtu_warn) + { + mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); + } + mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), + ntohs(udp_new_addr_in.sin_port),data_len); + ip_port.ip=udp_new_addr_in.sin_addr.s_addr; + ip_port.port=ntohs(udp_new_addr_in.sin_port); + + u64_t u64=ip_port.to_u64(); + if(!conn_info.conv_manager.is_u64_used(u64)) { @@ -320,19 +354,16 @@ int client_event_loop() conv=conn_info.conv_manager.find_conv_by_u64(u64); } conn_info.conv_manager.update_active_time(conv); - - - dest_t dest; - dest.type=type_fd64; - dest.inner.fd64=remote_fd64; - char * new_data; int new_len; put_conv(conv,data,data_len,new_data,new_len); - int out_n;char **out_arr;int *out_len;int *out_delay; + //dest.conv=conv; from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); + + } + for(int i=0;i<out_n;i++) { delay_send(out_delay[i],dest,out_arr[i],out_len[i]); @@ -635,6 +666,30 @@ int server_event_loop() conn_info_t &conn_info=conn_manager.find(ip_port); + int out_n;char **out_arr;int *out_len;int *out_delay; + dest_t dest; + dest.type=type_ip_port; + //dest.conv=conv; + dest.inner.ip_port=ip_port; + + if(fd64==conn_info.fec_encode_manager.get_timer_fd64()) + { + mylog(log_info,"timer!!!\n"); + uint64_t value; + if(read(fd_manager.to_fd(fd64), &value, 8)!=8) + { + continue; + } + if(value==0) + { + continue; + } + assert(value==1); + from_normal_to_fec(conn_info,0,0,out_n,out_arr,out_len,out_delay); + } + else + { + assert(conn_info.conv_manager.is_u64_used(fd64)); conv=conn_info.conv_manager.find_conv_by_u64(fd64); @@ -657,18 +712,17 @@ int server_event_loop() mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } - dest_t dest; - dest.type=type_ip_port; - //dest.conv=conv; - dest.inner.ip_port=ip_port; + char * new_data; int new_len; put_conv(conv,data,data_len,new_data,new_len); - int out_n;char **out_arr;int *out_len;int *out_delay; + from_normal_to_fec(conn_info,new_data,new_len,out_n,out_arr,out_len,out_delay); + } + for(int i=0;i<out_n;i++) { delay_send(out_delay[i],dest,out_arr[i],out_len[i]);