aloww len==fec_mtu

This commit is contained in:
wangyu- 2017-10-27 21:16:59 -05:00
parent ba81aa14d4
commit a3b5187b1f
7 changed files with 190 additions and 80 deletions

View File

@ -135,7 +135,7 @@ typedef u64_t anti_replay_seq_t;
typedef u64_t fd64_t;
//enum dest_type{none=0,type_fd64_ip_port,type_fd64,type_fd64_ip_port_conv,type_fd64_conv/*,type_fd*/};
enum dest_type{none=0,type_fd64_ip_port,type_fd64,type_fd,type_fd_ip_port/*,type_fd*/};
enum dest_type{none=0,type_fd64_ip_port,type_fd64,type_fd,type_write_fd,type_fd_ip_port/*,type_fd*/};
struct ip_port_t
{

View File

@ -218,12 +218,12 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
//int counter_back=counter;
assert(fec_mode==0||fec_mode==1);
if(fec_mode==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
if(fec_mode==0&& s!=0 &&counter==0&&blob_encode.get_shard_len(fec_data_num,len)>fec_mtu)
{
mylog(log_warn,"message too long len=%d,ignored\n",len);
return -1;
}
if(fec_mode==1&&s!=0&&len>=fec_mtu)
if(fec_mode==1&&s!=0&&len>fec_mtu)
{
mylog(log_warn,"message too long len=%d fec_mtu=%d,ignored\n",len,fec_mtu);
return -1;
@ -235,7 +235,7 @@ int fec_encode_manager_t::input(char *s,int len/*,int &is_first_packet*/)
}
if(s==0) about_to_fec=1;//now
if(fec_mode==0&& blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
if(fec_mode==0&& blob_encode.get_shard_len(fec_data_num,len)>fec_mtu) {about_to_fec=1; delayed_append=1;}//fec then add packet
if(fec_mode==0) assert(counter<fec_queue_len);//counter will never equal fec_pending_num,if that happens fec should already been done.
if(fec_mode==1) assert(counter<fec_data_num);

View File

@ -30,6 +30,12 @@ int main(int argc, char *argv[])
delay_manager.set_capacity(delay_capacity);
local_ip_uint32=inet_addr(local_ip);
remote_ip_uint32=inet_addr(remote_ip);
sub_net_uint32=inet_addr(sub_net);
if(strlen(tun_dev)==0)
{
sprintf(tun_dev,"tun%u",get_true_random_number()%1000);
}
if(working_mode==tunnel_mode)
{
@ -44,6 +50,7 @@ int main(int argc, char *argv[])
}
else
{
//disable_fec=1;
if(client_or_server==client_mode)
{
tun_dev_client_event_loop();

View File

@ -40,7 +40,10 @@ int time_mono_test=0;
int delay_capacity=0;
char sub_net[100]="10.0.0.0";
u32_t sub_net_uint32=0;
char tun_dev[100]="";
@ -55,13 +58,17 @@ int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,ch
inner_stat_t &inner_stat=conn_info.stat.normal_to_fec;
if(disable_fec)
{
assert(data!=0);
if(data==0)
{
out_n=0;
return 0;
}
//assert(data!=0);
inner_stat.input_packet_num++;
inner_stat.input_packet_size+=len;
inner_stat.output_packet_num++;
inner_stat.output_packet_size+=len;
if(data==0) return 0;
out_n=1;
static char *data_static;
data_static=data;
@ -554,6 +561,7 @@ void process_arg(int argc, char *argv[])
{"fec", required_argument, 0,'f'},
{"jitter", required_argument, 0,'j'},
{"fifo", required_argument, 0, 1},
{"sub-net", required_argument, 0, 1},
{"tun-dev", optional_argument, 0, 1},
{NULL, 0, 0, 0}
};
@ -878,6 +886,15 @@ void process_arg(int argc, char *argv[])
mylog(log_info,"enabled tun-dev mode\n");
working_mode=tun_dev_mode;
}
else if(strcmp(long_options[option_index].name,"tun-dev")==0)
{
if(optarg!=0)
{
sscanf(optarg,"%s",tun_dev);
mylog(log_info,"tun_dev =%s \n",tun_dev);
}
mylog(log_info,"running at tun-dev mode\n");
}
else
{
mylog(log_fatal,"unknown option\n");

5
misc.h
View File

@ -47,8 +47,6 @@ extern int time_mono_test;
extern int delay_capacity;
int from_normal_to_fec(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,my_time_t *&out_delay);
int from_fec_to_normal(conn_info_t & conn_info,char *data,int len,int & out_n,char **&out_arr,int *&out_len,my_time_t *&out_delay);
@ -62,5 +60,8 @@ void print_help();
void process_arg(int argc, char *argv[]);
extern char sub_net[100];
extern u32_t sub_net_uint32;
extern char tun_dev[100];
#endif /* MISC_H_ */

View File

@ -199,6 +199,11 @@ int my_send(const dest_t &dest,char *data,int len)
return send_fd(dest.inner.fd,data,len,0);
break;
}
case type_write_fd:
{
return write(dest.inner.fd,data,len);
break;
}
case type_fd64:
{

View File

@ -34,7 +34,7 @@ int get_tun_fd(char * dev_name)
return tun_fd;
}
int set_if(char *if_name,char * local_ip,char * remote_ip,int mtu)
int set_if(char *if_name,u32_t local_ip,u32_t remote_ip,int mtu)
{
//printf("i m here1\n");
struct ifreq ifr;
@ -47,19 +47,22 @@ int set_if(char *if_name,char * local_ip,char * remote_ip,int mtu)
sai.sin_family = AF_INET;
sai.sin_port = 0;
sai.sin_addr.s_addr = inet_addr(local_ip);
sai.sin_addr.s_addr = local_ip;
memcpy(&ifr.ifr_addr,&sai, sizeof(struct sockaddr));
assert(ioctl(sockfd, SIOCSIFADDR, &ifr)==0);
sai.sin_addr.s_addr = inet_addr(local_ip);
memcpy(&ifr.ifr_addr,&sai, sizeof(struct sockaddr));
assert(ioctl(sockfd, SIOCSIFADDR, &ifr)==0);
//sai.sin_addr.s_addr = local_ip;
//memcpy(&ifr.ifr_addr,&sai, sizeof(struct sockaddr));
//assert(ioctl(sockfd, SIOCSIFADDR, &ifr)==0);
sai.sin_addr.s_addr = inet_addr(remote_ip);
sai.sin_addr.s_addr = remote_ip;
memcpy(&ifr.ifr_addr,&sai, sizeof(struct sockaddr));
assert(ioctl(sockfd, SIOCSIFDSTADDR, &ifr)==0);
@ -81,24 +84,59 @@ const char header_normal=1;
const char header_new_connect=2;
const char header_reject=3;
int put_header(char header,char *& data,int &len)
int put_header(char header,char * data,int &len)
{
assert(len>=0);
data=data-1;
data[0]=header;
data[len]=header;
len+=1;
//data=data-1;
//data[0]=header;
//len+=1;
return 0;
}
int get_header(char &header,char *& data,int &len)
int get_header(char &header,char * data,int &len)
{
assert(len>=0);
if(len<1) return -1;
header=data[0];
data=data+1;
len-=1;
header=data[len];
return 0;
}
int from_normal_to_fec2(conn_info_t & conn_info,dest_t &dest,char * data,int len,char header)
{
int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
from_normal_to_fec(conn_info,data,len,out_n,out_arr,out_len,out_delay);
for(int i=0;i<out_n;i++)
{
//put_header(header,out_arr[i],out_len[i]);
char tmp_buf[buf_len];
int tmp_len=out_len[i];
memcpy(tmp_buf,out_arr[i],out_len[i]);
put_header(header,tmp_buf,tmp_len);
delay_send(out_delay[i],dest,tmp_buf,tmp_len);
//put_header(header,out_arr[i],out_len[i]);
//delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
}
return 0;
}
int from_fec_to_normal2(conn_info_t & conn_info,dest_t &dest,char * data,int len)
{
int out_n;char **out_arr;int *out_len;my_time_t *out_delay;
from_fec_to_normal(conn_info,data,len,out_n,out_arr,out_len,out_delay);
for(int i=0;i<out_n;i++)
{
delay_send(out_delay[i],dest,out_arr[i],out_len[i]);
}
return 0;
}
int tun_dev_client_event_loop()
{
@ -111,13 +149,13 @@ int tun_dev_client_event_loop()
int remote_fd;
fd64_t remote_fd64;
tun_fd=get_tun_fd("tun11");
tun_fd=get_tun_fd(tun_dev);
assert(tun_fd>0);
assert(new_connected_socket(remote_fd,remote_ip_uint32,remote_port)==0);
remote_fd64=fd_manager.create(remote_fd);
assert(set_if("tun11","10.0.0.2","10.0.0.1",1000)==0);
assert(set_if(tun_dev,htonl((ntohl(sub_net_uint32)&0xFFFFFF00)|2),htonl((ntohl(sub_net_uint32)&0xFFFFFF00 )|1),g_fec_mtu)==0);
epoll_fd = epoll_create1(0);
assert(epoll_fd>0);
@ -206,11 +244,12 @@ int tun_dev_client_event_loop()
//dest.cook=1;
dest_t udp_dest;
udp_dest.cook=1;
udp_dest.type=type_fd64;
udp_dest.inner.fd64=remote_fd64;
dest_t tun_dest;
tun_dest.type=type_fd;
tun_dest.type=type_write_fd;
tun_dest.inner.fd=tun_fd;
int got_feed_back=0;
@ -243,21 +282,49 @@ int tun_dev_client_event_loop()
mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
conn_info.stat.report_as_client();
}
else if(events[idx].data.u64==conn_info.fec_encode_manager.get_timer_fd64())
{
mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n");
//mylog(log_info,"timer!!!\n");
uint64_t value;
if(!fd_manager.exist(fd64)) //fd64 has been closed
{
mylog(log_trace,"!fd_manager.exist(fd64)");
continue;
}
if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
{
mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret);
continue;
}
if(value==0)
{
mylog(log_debug,"value==0\n");
continue;
}
assert(value==1);
char header=(got_feed_back==0?header_new_connect:header_normal);
from_normal_to_fec2(conn_info,udp_dest,0,0,header);
}
else if(events[idx].data.u64==(u64_t)tun_fd)
{
len=read(tun_fd,data,max_data_len);
assert(len>=0);
if(len<0)
{
mylog(log_warn,"read from tun_fd return %d,errno=%s\n",len,strerror(errno));
continue;
}
mylog(log_trace,"Received packet from tun,len: %d\n",len);
if(got_feed_back==0)
put_header(header_new_connect,data,len);
else
put_header(header_normal,data,len);
char header=(got_feed_back==0?header_new_connect:header_normal);
from_normal_to_fec2(conn_info,udp_dest,data,len,header);
do_cook(data,len);
delay_manager.add(0,udp_dest,data,len);
}
else if(events[idx].data.u64==(u64_t)remote_fd64)
{
@ -279,42 +346,42 @@ int tun_dev_client_event_loop()
}
char header=0;
if(get_header(header,data,len)!=0)
{
mylog(log_warn,"get_header failed\n");
continue;
}
if(header==header_reject)
{
mylog(log_fatal,"server switched to handle another client,exit\n");
mylog(log_fatal,"server restarted or switched to handle another client,exited\n");
myexit(-1);
continue;
}
else if(header==header_normal)
{
if(got_feed_back==0)
mylog(log_info,"connect accepted by server %d\n",int(header));
got_feed_back=1;
}
else
{
mylog(log_warn,"invalid header\n");
mylog(log_warn,"invalid header %d %d\n",int(header),len);
continue;
}
mylog(log_trace,"Received packet from udp,len: %d\n",len);
assert(len>=0);
//delay_manager.add(0,tun_dest,data,len);
assert(write(tun_fd,data,len)>=0);
from_fec_to_normal2(conn_info,tun_dest,data,len);
}
else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd())
{
uint64_t value;
read(delay_manager.get_timer_fd(), &value, 8);
mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
//printf("<timerfd_triggered, %d>",delay_mp.size());
//fflush(stdout);
}
else if (events[idx].data.u64 == (u64_t)fifo_fd)
{
@ -330,7 +397,7 @@ int tun_dev_client_event_loop()
}
else
{
assert(0==1);
//assert(0==1);
}
}
delay_manager.check();
@ -351,13 +418,14 @@ int tun_dev_server_event_loop()
int local_listen_fd;
//fd64_t local_listen_fd64;
tun_fd=get_tun_fd("tun11");
tun_fd=get_tun_fd(tun_dev);
assert(tun_fd>0);
assert(new_listen_socket(local_listen_fd,local_ip_uint32,local_port)==0);
// local_listen_fd64=fd_manager.create(local_listen_fd);
assert(set_if("tun11","10.0.0.1","10.0.0.2",1000)==0);
//assert(set_if("tun11","10.0.0.1","10.0.0.2",1000)==0);
assert(set_if(tun_dev,htonl((ntohl(sub_net_uint32)&0xFFFFFF00)|1),htonl((ntohl(sub_net_uint32)&0xFFFFFF00 )|2),g_fec_mtu)==0);
epoll_fd = epoll_create1(0);
assert(epoll_fd>0);
@ -419,9 +487,6 @@ int tun_dev_server_event_loop()
int fifo_fd=-1;
if(fifo_file[0]!=0)
@ -438,17 +503,17 @@ int tun_dev_server_event_loop()
mylog(log_info,"fifo_file=%s\n",fifo_file);
}
//ip_port_t dest_ip_port;
dest_t udp_dest;
udp_dest.cook=1;
udp_dest.type=type_fd_ip_port;
dest_t dest;
dest.type=type_fd_ip_port;
udp_dest.inner.fd_ip_port.fd=local_listen_fd;
udp_dest.inner.fd_ip_port.ip_port.ip=0;
udp_dest.inner.fd_ip_port.ip_port.port=0;
dest.inner.fd_ip_port.fd=local_listen_fd;
dest.inner.fd_ip_port.ip_port.ip=0;
dest.inner.fd_ip_port.ip_port.port=0;
//dest.conv=conv;
//dest.inner.ip_port=dest_ip_port;
//dest.cook=1;
dest_t tun_dest;
tun_dest.type=type_write_fd;
tun_dest.inner.fd=tun_fd;
while(1)////////////////////////
{
@ -476,12 +541,36 @@ int tun_dev_server_event_loop()
uint64_t value;
read(conn_info.timer.get_timer_fd(), &value, 8);
//mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
if(dest.inner.fd64_ip_port.ip_port.to_u64()==0)
if(udp_dest.inner.fd64_ip_port.ip_port.to_u64()==0)
{
continue;
}
conn_info.stat.report_as_server(dest.inner.fd_ip_port.ip_port);
conn_info.stat.report_as_server(udp_dest.inner.fd_ip_port.ip_port);
}
else if(events[idx].data.u64==conn_info.fec_encode_manager.get_timer_fd64())
{
mylog(log_trace,"events[idx].data.u64 == conn_info.fec_encode_manager.get_timer_fd64()\n");
//mylog(log_info,"timer!!!\n");
uint64_t value;
if(!fd_manager.exist(fd64)) //fd64 has been closed
{
mylog(log_trace,"!fd_manager.exist(fd64)");
continue;
}
if((ret=read(fd_manager.to_fd(fd64), &value, 8))!=8)
{
mylog(log_trace,"(ret=read(fd_manager.to_fd(fd64), &value, 8))!=8,ret=%d\n",ret);
continue;
}
if(value==0)
{
mylog(log_debug,"value==0\n");
continue;
}
assert(value==1);
from_normal_to_fec2(conn_info,udp_dest,0,0,header_normal);
}
else if(events[idx].data.u64==(u64_t)local_listen_fd)
{
@ -494,7 +583,6 @@ int tun_dev_server_event_loop()
//myexit(1);
};
if(de_cook(data,len)<0)
{
mylog(log_warn,"de_cook(data,len)failed \n");
@ -509,7 +597,7 @@ int tun_dev_server_event_loop()
continue;
}
if((dest.inner.fd_ip_port.ip_port.ip==udp_new_addr_in.sin_addr.s_addr) && (dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port)))
if((udp_dest.inner.fd_ip_port.ip_port.ip==udp_new_addr_in.sin_addr.s_addr) && (udp_dest.inner.fd_ip_port.ip_port.port==ntohs(udp_new_addr_in.sin_port)))
{
if(header!=header_new_connect&& header!=header_normal)
{
@ -523,23 +611,23 @@ int tun_dev_server_event_loop()
{
mylog(log_info,"new connection from %s:%d \n", inet_ntoa(udp_new_addr_in.sin_addr),
ntohs(udp_new_addr_in.sin_port));
dest.inner.fd_ip_port.ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port);
udp_dest.inner.fd_ip_port.ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
udp_dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port);
conn_info.fec_decode_manager.clear();
conn_info.fec_encode_manager.clear();
memset(&conn_info.stat,0,sizeof(conn_info.stat));
}
else
else if(header==header_normal)
{
mylog(log_info,"rejected connection from %s:%d\n", inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port));
len=1;
data[0]=header_reject;
do_cook(data,len);
dest_t tmp_dest;
tmp_dest.type=type_fd_ip_port;
@ -550,18 +638,16 @@ int tun_dev_server_event_loop()
delay_manager.add(0,tmp_dest,data,len);;
continue;
}
else
{
mylog(log_warn,"invalid header\n");
}
}
mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr),
ntohs(udp_new_addr_in.sin_port),len);
ret=write(tun_fd,data,len);
if( ret<0 )
{
mylog(log_warn,"write to tun failed len=%d ret=%d\n errno=%s\n",len,ret,strerror(errno));
}
from_fec_to_normal2(conn_info,tun_dest,data,len);
}
else if(events[idx].data.u64==(u64_t)tun_fd)
@ -571,17 +657,13 @@ int tun_dev_server_event_loop()
mylog(log_trace,"Received packet from tun,len: %d\n",len);
if(dest.inner.fd64_ip_port.ip_port.to_u64()==0)
if(udp_dest.inner.fd64_ip_port.ip_port.to_u64()==0)
{
mylog(log_warn,"there is no client yet\n");
mylog(log_debug,"received packet from tun,but there is no client yet,dropped packet\n");
continue;
}
put_header(header_normal,data,len);
do_cook(data,len);
delay_manager.add(0,dest,data,len);;
from_normal_to_fec2(conn_info,udp_dest,data,len,header_normal);
}
else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd())
@ -589,8 +671,6 @@ int tun_dev_server_event_loop()
uint64_t value;
read(delay_manager.get_timer_fd(), &value, 8);
mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
//printf("<timerfd_triggered, %d>",delay_mp.size());
//fflush(stdout);
}
else if (events[idx].data.u64 == (u64_t)fifo_fd)
{
@ -606,7 +686,7 @@ int tun_dev_server_event_loop()
}
else
{
assert(0==1);
//assert(0==1);
}
}
delay_manager.check();