just commit

This commit is contained in:
wangyu- 2017-10-27 05:23:46 -05:00
parent d1c88bbc07
commit ba81aa14d4
7 changed files with 272 additions and 33 deletions

View File

@ -107,7 +107,7 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o
fec_encode_manager_t fec_encode_manager;
fec_decode_manager_t fec_decode_manager;
my_timer_t timer;
ip_port_t ip_port;
//ip_port_t ip_port;
u64_t last_active_time;
stat_t stat;
conn_info_t()

View File

@ -146,11 +146,11 @@ fec_encode_manager_t::fec_encode_manager_t()
}
timer_fd64=fd_manager.create(timer_fd);
re_init(fec_data_num,fec_redundant_num,fec_mtu,fec_queue_len,fec_timeout,fec_mode);
reset_fec_parameter(g_fec_data_num,g_fec_redundant_num,g_fec_mtu,g_fec_queue_len,g_fec_timeout,g_fec_mode);
seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
}
int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode)
int fec_encode_manager_t::reset_fec_parameter(int data_num,int redundant_num,int mtu,int queue_len,int timeout,int mode)
{
fec_data_num=data_num;
fec_redundant_num=redundant_num;
@ -160,15 +160,8 @@ int fec_encode_manager_t::re_init(int data_num,int redundant_num,int mtu,int que
fec_mode=mode;
assert(data_num+redundant_num<max_fec_packet_num);
counter=0;
blob_encode.clear();
ready_for_output=0;
//seq=0;
itimerspec zero_its;
memset(&zero_its, 0, sizeof(zero_its));
timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
clear();
return 0;
}
@ -489,15 +482,12 @@ int fec_encode_manager_t::output(int &n,char ** &s_arr,int *&len)
}
return 0;
}
/*
int fec_decode_manager_t::re_init()
{
for(int i=0;i<(int)fec_buff_num;i++)
fec_data[i].used=0;
ready_for_output=0;
index=0;
clear();
return 0;
}
}*/
int fec_decode_manager_t::input(char *s,int len)
{

View File

@ -37,10 +37,16 @@ struct anti_replay_t
unordered_set<u32_t> st;
int index;
anti_replay_t()
{
clear();
}
int clear()
{
memset(replay_buffer,-1,sizeof(replay_buffer));
st.clear();
st.rehash(anti_replay_buff_size*3);
index=0;
return 0;
}
void set_invaild(u32_t seq)
{
@ -140,6 +146,22 @@ public:
fec_encode_manager_t();
~fec_encode_manager_t();
int clear()
{
counter=0;
blob_encode.clear();
ready_for_output=0;
itimerspec zero_its;
memset(&zero_its, 0, sizeof(zero_its));
timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
seq=(u32_t)get_true_random_number(); //TODO temp solution for a bug.
return 0;
}
my_time_t get_first_packet_time()
{
return first_packet_time_for_output;
@ -155,7 +177,7 @@ public:
return fec_mode;
}
u64_t get_timer_fd64();
int re_init(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);
int reset_fec_parameter(int data_num,int redundant_num,int mtu,int pending_num,int pending_time,int type);
int input(char *s,int len/*,int &is_first_packet*/);
int output(int &n,char ** &s_arr,int *&len);
};
@ -200,7 +222,7 @@ public:
fec_decode_manager_t()
{
fec_data=new fec_data_t[fec_buff_num+5];
re_init();
clear();
}
fec_decode_manager_t(const fec_decode_manager_t &b)
{
@ -210,7 +232,21 @@ public:
{
delete fec_data;
}
int re_init();
int clear()
{
anti_replay.clear();
mp.clear();
mp.rehash(fec_buff_num*3);
for(int i=0;i<(int)fec_buff_num;i++)
fec_data[i].used=0;
ready_for_output=0;
index=0;
return 0;
}
//int re_init();
int input(char *s,int len);
int output(int &n,char ** &s_arr,int* &len_arr);
};

View File

@ -472,7 +472,7 @@ int unit_test()
int * len;
fec_decode_manager.output(n,s_arr,len);
fec_encode_manager.re_init(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1);
fec_encode_manager.reset_fec_parameter(3,2,g_fec_mtu,g_fec_queue_len,g_fec_timeout,1);
fec_encode_manager.input((char *) a.c_str(), a.length());
fec_encode_manager.output(n,s_arr,len);

View File

@ -177,9 +177,7 @@ int my_send(const dest_t &dest,char *data,int len)
{
if(dest.cook)
{
put_crc32(data,len);
if(!disable_obscure)do_obscure(data,len);
if(!disable_xor)encrypt_0(data,len,key_string);
do_cook(data,len);
}
switch(dest.type)
{
@ -322,6 +320,14 @@ int put_crc32(char * s,int &len)
return 0;
}
int do_cook(char * data,int &len)
{
put_crc32(data,len);
if(!disable_obscure)do_obscure(data,len);
if(!disable_xor)encrypt_0(data,len,key_string);
return 0;
}
int de_cook(char * s,int &len)
{
if(!disable_xor)decrypt_0(s,len,key_string);

View File

@ -42,5 +42,6 @@ int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out
int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out );
int put_crc32(char * s,int &len);
int rm_crc32(char * s,int &len);
int do_cook(char * data,int &len);
int de_cook(char * s,int &len);
#endif /* PACKET_H_ */

View File

@ -145,13 +145,73 @@ int tun_dev_client_event_loop()
myexit(-1);
}
dest_t dest;
dest.type=type_fd64;
dest.inner.fd64=remote_fd64;
ev.events = EPOLLIN;
ev.data.u64 = delay_manager.get_timer_fd();
mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd());
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
if (ret!= 0) {
mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
myexit(-1);
}
conn_info_t *conn_info_p=new conn_info_t;
conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack
u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
ev.events = EPOLLIN;
ev.data.u64 = fd64;
mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64());
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
if (ret!= 0) {
mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n");
myexit(-1);
}
//my_timer_t timer;
conn_info.timer.add_fd_to_epoll(epoll_fd);
conn_info.timer.set_timer_repeat_us(timer_interval*1000);
int fifo_fd=-1;
if(fifo_file[0]!=0)
{
fifo_fd=create_fifo(fifo_file);
ev.events = EPOLLIN;
ev.data.u64 = fifo_fd;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev);
if (ret!= 0) {
mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno));
myexit(-1);
}
mylog(log_info,"fifo_file=%s\n",fifo_file);
}
//dest.conv=conv;
//dest.inner.ip_port=dest_ip_port;
//dest.cook=1;
dest_t udp_dest;
udp_dest.type=type_fd64;
udp_dest.inner.fd64=remote_fd64;
dest_t tun_dest;
tun_dest.type=type_fd;
tun_dest.inner.fd=tun_fd;
int got_feed_back=0;
@ -176,7 +236,14 @@ int tun_dev_client_event_loop()
int idx;
for (idx = 0; idx < nfds; ++idx)
{
if(events[idx].data.u64==(u64_t)tun_fd)
if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd())
{
uint64_t value;
read(conn_info.timer.get_timer_fd(), &value, 8);
mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
conn_info.stat.report_as_client();
}
else if(events[idx].data.u64==(u64_t)tun_fd)
{
len=read(tun_fd,data,max_data_len);
assert(len>=0);
@ -188,7 +255,9 @@ int tun_dev_client_event_loop()
else
put_header(header_normal,data,len);
delay_manager.add(0,dest,data,len);;
do_cook(data,len);
delay_manager.add(0,udp_dest,data,len);
}
else if(events[idx].data.u64==(u64_t)remote_fd64)
{
@ -203,6 +272,14 @@ int tun_dev_client_event_loop()
continue;
}
if(de_cook(data,len)<0)
{
mylog(log_warn,"de_cook(data,len)failed \n");
continue;
}
char header=0;
if(get_header(header,data,len)!=0)
{
@ -228,8 +305,33 @@ int tun_dev_client_event_loop()
mylog(log_trace,"Received packet from udp,len: %d\n",len);
assert(len>=0);
//delay_manager.add(0,tun_dest,data,len);
assert(write(tun_fd,data,len)>=0);
}
else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd())
{
uint64_t value;
read(delay_manager.get_timer_fd(), &value, 8);
mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
//printf("<timerfd_triggered, %d>",delay_mp.size());
//fflush(stdout);
}
else if (events[idx].data.u64 == (u64_t)fifo_fd)
{
char buf[buf_len];
int len=read (fifo_fd, buf, sizeof (buf));
if(len<0)
{
mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno));
continue;
}
buf[len]=0;
handle_command(buf);
}
else
{
assert(0==1);
}
}
delay_manager.check();
}
@ -283,6 +385,59 @@ int tun_dev_server_event_loop()
myexit(-1);
}
ev.events = EPOLLIN;
ev.data.u64 = delay_manager.get_timer_fd();
mylog(log_debug,"delay_manager.get_timer_fd()=%d\n",delay_manager.get_timer_fd());
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, delay_manager.get_timer_fd(), &ev);
if (ret!= 0) {
mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
myexit(-1);
}
conn_info_t *conn_info_p=new conn_info_t;
conn_info_t &conn_info=*conn_info_p; //huge size of conn_info,do not allocate on stack
u64_t fd64=conn_info.fec_encode_manager.get_timer_fd64();
ev.events = EPOLLIN;
ev.data.u64 = fd64;
mylog(log_debug,"conn_info.fec_encode_manager.get_timer_fd64()=%llu\n",conn_info.fec_encode_manager.get_timer_fd64());
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd_manager.to_fd(fd64), &ev);
if (ret!= 0) {
mylog(log_fatal,"add fec_encode_manager.get_timer_fd64() error\n");
myexit(-1);
}
//my_timer_t timer;
conn_info.timer.add_fd_to_epoll(epoll_fd);
conn_info.timer.set_timer_repeat_us(timer_interval*1000);
int fifo_fd=-1;
if(fifo_file[0]!=0)
{
fifo_fd=create_fifo(fifo_file);
ev.events = EPOLLIN;
ev.data.u64 = fifo_fd;
ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fifo_fd, &ev);
if (ret!= 0) {
mylog(log_fatal,"add fifo_fd to epoll error %s\n",strerror(errno));
myexit(-1);
}
mylog(log_info,"fifo_file=%s\n",fifo_file);
}
//ip_port_t dest_ip_port;
dest_t dest;
@ -316,17 +471,37 @@ int tun_dev_server_event_loop()
int idx;
for (idx = 0; idx < nfds; ++idx)
{
if(events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd())
{
uint64_t value;
read(conn_info.timer.get_timer_fd(), &value, 8);
if(events[idx].data.u64==(u64_t)local_listen_fd)
//mylog(log_trace,"events[idx].data.u64==(u64_t)conn_info.timer.get_timer_fd()\n");
if(dest.inner.fd64_ip_port.ip_port.to_u64()==0)
{
continue;
}
conn_info.stat.report_as_server(dest.inner.fd_ip_port.ip_port);
}
else if(events[idx].data.u64==(u64_t)local_listen_fd)
{
struct sockaddr_in udp_new_addr_in={0};
socklen_t udp_new_addr_len = sizeof(sockaddr_in);
if ((len = recvfrom(local_listen_fd, data, max_data_len, 0,
(struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) == -1) {
(struct sockaddr *) &udp_new_addr_in, &udp_new_addr_len)) < 0) {
mylog(log_error,"recv_from error,this shouldnt happen,err=%s,but we can try to continue\n",strerror(errno));
continue;
//myexit(1);
};
if(de_cook(data,len)<0)
{
mylog(log_warn,"de_cook(data,len)failed \n");
continue;
}
char header=0;
if(get_header(header,data,len)!=0)
{
@ -350,6 +525,10 @@ int tun_dev_server_event_loop()
ntohs(udp_new_addr_in.sin_port));
dest.inner.fd_ip_port.ip_port.ip=udp_new_addr_in.sin_addr.s_addr;
dest.inner.fd_ip_port.ip_port.port=ntohs(udp_new_addr_in.sin_port);
conn_info.fec_decode_manager.clear();
conn_info.fec_encode_manager.clear();
memset(&conn_info.stat,0,sizeof(conn_info.stat));
}
else
{
@ -359,6 +538,8 @@ int tun_dev_server_event_loop()
len=1;
data[0]=header_reject;
do_cook(data,len);
dest_t tmp_dest;
tmp_dest.type=type_fd_ip_port;
@ -398,9 +579,34 @@ int tun_dev_server_event_loop()
put_header(header_normal,data,len);
do_cook(data,len);
delay_manager.add(0,dest,data,len);;
}
else if (events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd())
{
uint64_t value;
read(delay_manager.get_timer_fd(), &value, 8);
mylog(log_trace,"events[idx].data.u64 == (u64_t)delay_manager.get_timer_fd()\n");
//printf("<timerfd_triggered, %d>",delay_mp.size());
//fflush(stdout);
}
else if (events[idx].data.u64 == (u64_t)fifo_fd)
{
char buf[buf_len];
int len=read (fifo_fd, buf, sizeof (buf));
if(len<0)
{
mylog(log_warn,"fifo read failed len=%d,errno=%s\n",len,strerror(errno));
continue;
}
buf[len]=0;
handle_command(buf);
}
else
{
assert(0==1);
}
}
delay_manager.check();