mainly function done

This commit is contained in:
wangyu- 2017-10-05 12:21:06 -05:00
parent ee4e5ad4c7
commit 844eac0d3d
8 changed files with 522 additions and 113 deletions

View File

@ -52,6 +52,47 @@ u32_t get_u64_l(u64_t a)
return (a<<32u)>>32u;
}
void write_u16(char * p,u16_t w)
{
*(unsigned char*)(p + 1) = (w & 0xff);
*(unsigned char*)(p + 0) = (w >> 8);
}
u16_t read_u16(char * p)
{
u16_t res;
res = *(const unsigned char*)(p + 0);
res = *(const unsigned char*)(p + 1) + (res << 8);
return res;
}
void write_u32(char * p,u32_t l)
{
*(unsigned char*)(p + 3) = (unsigned char)((l >> 0) & 0xff);
*(unsigned char*)(p + 2) = (unsigned char)((l >> 8) & 0xff);
*(unsigned char*)(p + 1) = (unsigned char)((l >> 16) & 0xff);
*(unsigned char*)(p + 0) = (unsigned char)((l >> 24) & 0xff);
}
u32_t read_u32(char * p)
{
u32_t res;
res = *(const unsigned char*)(p + 0);
res = *(const unsigned char*)(p + 1) + (res << 8);
res = *(const unsigned char*)(p + 2) + (res << 8);
res = *(const unsigned char*)(p + 3) + (res << 8);
return res;
}
void write_u64(char * s,u64_t a)
{
assert(0==1);
}
u64_t read_u64(char * s)
{
assert(0==1);
return 0;
}
char * my_ntoa(u32_t ip)
{
in_addr a;
@ -431,3 +472,8 @@ char * ip_port_t::to_s()
sprintf(res,"%s:%d",my_ntoa(ip),port);
return res;
}
int round_up_div(int a,int b)
{
return (a+b-1)/b;
}

View File

@ -58,6 +58,9 @@ typedef long long i64_t;
typedef unsigned int u32_t;
typedef int i32_t;
typedef unsigned short u16_t;
typedef short i16_t;
typedef u64_t my_time_t;
const int max_data_len=1600;
@ -165,6 +168,15 @@ u32_t get_u64_h(u64_t a);
u32_t get_u64_l(u64_t a);
void write_u16(char *,u16_t a);
u16_t read_u16(char *);
void write_u32(char *,u32_t a);
u32_t read_u32(char *);
void write_u64(char *,u64_t a);
u64_t read_uu64(char *);
char * my_ntoa(u32_t ip);
void myexit(int a);
@ -195,6 +207,8 @@ int random_between(u32_t a,u32_t b);
int set_timer_ms(int epollfd,int &timer_fd,u32_t timer_interval);
int round_up_div(int a,int b);
/*
int create_new_udp(int &new_udp_fd,int remote_address_uint32,int remote_port);
*/

View File

@ -5,6 +5,207 @@
* Author: root
*/
#include "fec_manager.h"
#include "log.h"
#include "common.h"
#include "lib/rs.h"
u32_t seq=0;
blob_encode_t::blob_encode_t()
{
clear();
}
int blob_encode_t::clear()
{
counter=0;
current_len=(int)sizeof(u32_t);
return 0;
}
int blob_encode_t::get_num()
{
return counter;
}
int blob_encode_t::get_shard_len(int n)
{
return round_up_div(current_len,n);
}
int blob_encode_t::get_shard_len(int n,int next_packet_len)
{
return round_up_div(current_len+(int)sizeof(u16_t)+next_packet_len,n);
}
int blob_encode_t::input(char *s,int len)
{
assert(current_len+len+sizeof(u16_t) <=256*buf_len);
assert(len<=65535&&len>=0);
counter++;
assert(counter<=max_packet_num);
write_u16(buf+current_len,len);
current_len+=sizeof(u16_t);
memcpy(buf+current_len,s,len);
current_len+=len;
return 0;
}
int blob_encode_t::output(int n,char ** &s_arr,int & len)
{
static char *output_arr[256+100];
len=round_up_div(current_len,n);
write_u32(buf,counter);
for(int i=0;i<n;i++)
{
output_arr[i]=buf+len*i;
}
s_arr=output_arr;
return 0;
}
blob_decode_t::blob_decode_t()
{
clear();
}
int blob_decode_t::clear()
{
current_len=0;
last_len=-1;
counter=0;
return 0;
}
int blob_decode_t::input(char *s,int len)
{
if(last_len!=-1)
{
assert(last_len==len);
}
counter++;
assert(counter<=256);
last_len=len;
assert(current_len+len+100<(int)sizeof(buf));
memcpy(buf+current_len,s,len);
current_len+=len;
return 0;
}
int blob_decode_t::output(int &n,char ** &s_arr,int *&len_arr)
{
static char *s_buf[max_packet_num+100];
static int len_buf[max_packet_num+100];
int parser_pos=0;
if(parser_pos+(int)sizeof(u32_t)>current_len) return -1;
n=(int)read_u32(buf+parser_pos);
if(n>max_packet_num) {mylog(log_info,"failed 1\n");return -1;}
s_arr=s_buf;
len_arr=len_buf;
parser_pos+=sizeof(u32_t);
for(int i=0;i<n;i++)
{
if(parser_pos+(int)sizeof(u16_t)>current_len) {mylog(log_info,"failed2 \n");return -1;}
len_arr[i]=(int)read_u16(buf+parser_pos);
parser_pos+=(int)sizeof(u16_t);
if(parser_pos+len_arr[i]>current_len) {mylog(log_info,"failed 3 %d %d %d\n",parser_pos,len_arr[i],current_len);return -1;}
s_arr[i]=buf+parser_pos;
parser_pos+=len_arr[i];
}
return 0;
}
fec_encode_manager_t::fec_encode_manager_t()
{
re_init();
}
int fec_encode_manager_t::re_init()
{
fec_data_num=4;
fec_redundant_num=2;
fec_mtu=1200;
counter=0;
blob_encode.clear();
ready_for_output=0;
return 0;
}
int fec_encode_manager_t::input(char *s,int len,int &is_first_packet)
{
is_first_packet=0;
if(s==0 ||blob_encode.get_shard_len(fec_data_num,len)>=fec_mtu)
{
char ** blob_output;
int blob_len;
assert(counter!=0);
blob_encode.output(fec_data_num,blob_output,blob_len);
for(int i=0;i<fec_data_num+fec_redundant_num;i++)
{
int tmp_idx=0;
write_u32(buf[i]+tmp_idx,seq);
tmp_idx+=sizeof(u32_t);
buf[i][tmp_idx++]=(unsigned char)fec_data_num;
buf[i][tmp_idx++]=(unsigned char)fec_redundant_num;
buf[i][tmp_idx++]=(unsigned char)i;
buf[i][tmp_idx++]=(unsigned char)0;
if(i<fec_data_num)
{
memcpy(buf[i]+tmp_idx,blob_output[i],blob_len);
tmp_idx+=blob_len;
}
output_buf[i]=buf[i]+sizeof(u32_t)+3*sizeof(char);
}
output_len=blob_len+sizeof(u32_t)+3*sizeof(char);
rs_encode2(fec_data_num,fec_data_num+fec_redundant_num,output_buf,blob_len);
for(int i=0;i<fec_data_num+fec_redundant_num;i++)
{
output_buf[i]=buf[i];
}
ready_for_output=1;
seq++;
counter=0;
blob_encode.clear();
}
if(s!=0)
{
if(counter==0) is_first_packet=1;
blob_encode.input(s,len);
counter++;
}
return 0;
}
int fec_encode_manager_t::output(int &n,char ** &s_arr,int &len)
{
if(!ready_for_output)
{
n=-1;
len=-1;
s_arr=0;
}
else
{
n=fec_data_num+fec_redundant_num;
len=output_len;
s_arr=output_buf;
ready_for_output=0;
}
return 0;
}
/*
int fec_decode_manager_t::input(char *s,int l)
{
return 0;
}
int fec_decode_manager_t::output(int &n,char ** &s_arr,int* &l_arr)
{
return 0;
}*/

View File

@ -8,34 +8,229 @@
#ifndef FEC_MANAGER_H_
#define FEC_MANAGER_H_
#include "common.h"
#include "log.h"
#include "lib/rs.h"
const int max_packet_num=1000;
const u32_t anti_replay_buff_size=30000;
const u32_t fec_buff_size=3000;
struct fec_encode_manager_t
struct anti_replay_t
{
int input(char *,int l);
int ready_for_output();
int output(char ** &,int l,int &n);
};
struct fec_decode_manager_t
{
int input(char *,int l);
int ready_for_output();
int output(char ** &,int l,int &n);
};
u64_t replay_buffer[anti_replay_buff_size];
unordered_set<u32_t> st;
int index;
anti_replay_t()
{
memset(replay_buffer,-1,sizeof(replay_buffer));
st.rehash(anti_replay_buff_size*10);
index=0;
}
void set_invaild(u32_t seq)
{
if(st.find(seq)!=st.end() )
{
mylog(log_trace,"seq %llx exist\n",seq);
return;
//return 0;
}
if(replay_buffer[index]!=u64_t(i64_t(-1)))
{
assert(st.find(replay_buffer[index])!=st.end());
st.erase(replay_buffer[index]);
}
replay_buffer[index]=seq;
st.insert(seq);
index++;
if(index==int(anti_replay_buff_size)) index=0;
//return 1; //for complier check
}
int is_vaild(u32_t seq)
{
return st.find(seq)==st.end();
}
};
struct blob_encode_t
{
int input(char *,int l);
int ready_for_output();
int output(char ** &,int l,int &n);
char buf[(256+5)*buf_len];
int current_len;
int counter;
blob_encode_t();
int clear();
int get_num();
int get_shard_len(int n);
int get_shard_len(int n,int next_packet_len);
int input(char *s,int len); //len=use len=0 for second and following packet
int output(int n,char ** &s_arr,int & len);
};
struct blob_decode_t
{
int input(char *,int l);
int ready_for_output();
int output(char ** &,int l,int &n);
char buf[(256+5)*buf_len];
int current_len;
int last_len;
int counter;
blob_decode_t();
int clear();
int input(char *input,int len);
int output(int &n,char ** &output,int *&len_arr);
};
struct fec_encode_manager_t
{
int fec_data_num,fec_redundant_num;
int fec_mtu;
char buf[256+5][buf_len+100];
char *output_buf[256+5];
int output_len;
int ready_for_output;
int counter;
blob_encode_t blob_encode;
fec_encode_manager_t();
int re_init();
int input(char *s,int len,int &is_first_packet);
int output(int &n,char ** &s_arr,int &len);
};
struct fec_data_t
{
int used;
u32_t seq;
int data_num;
int redundant_num;
int idx;
int type;
char buf[buf_len];
int len;
};
struct fec_decode_manager_t
{
anti_replay_t anti_replay;
fec_data_t fec_data[fec_buff_size];
int index;
unordered_map<u32_t, map<int,int> > mp;
blob_decode_t blob_decode;
fec_decode_manager_t()
{
for(int i=0;i<(int)fec_buff_size;i++)
fec_data[i].used=0;
ready_for_output=0;
}
int output_n;
char ** output_s_arr;
int * output_len_arr;
int ready_for_output;
int input(char *s,int len)
{
char *ori_s=s;
u32_t seq=read_u32(s);
s+=sizeof(u32_t);
int data_num=(unsigned char)*(s++);
int redundant_num=(unsigned char)*(s++);
int innder_index=(unsigned char)*(s++);
int type=(unsigned char)*(s++);
len=len-int(s-ori_s);
if(len<0)
{
return -1;
}
if(!anti_replay.is_vaild(seq))
{
return 0;
}
if(!mp[seq].empty())
{
int tmp_idx=mp[seq].begin()->second;
int ok=1;
if(data_num+redundant_num>255)
ok=0;
if(fec_data[tmp_idx].data_num!=data_num||fec_data[tmp_idx].redundant_num!=redundant_num||fec_data[tmp_idx].len!=len)
{
ok=0;
}
if(ok==0)
{
return 0;
}
}
if(fec_data[index].used!=0)
{
int tmp_seq=fec_data[index].seq;
anti_replay.set_invaild(tmp_seq);
if(mp.find(tmp_seq)!=mp.end())
{
mp.erase(tmp_seq);
}
}
fec_data[index].used=1;
fec_data[index].seq=seq;
fec_data[index].data_num=data_num;
fec_data[index].redundant_num=redundant_num;
fec_data[index].idx=innder_index;
fec_data[index].type=type;
fec_data[index].len=len;
mp[seq][innder_index]=index;
map<int,int> &inner_mp=mp[seq];
if((int)inner_mp.size()>=data_num)
{
anti_replay.set_invaild(seq);
char *fec_tmp_arr[256+5]={0};
for(auto it=inner_mp.begin();it!=inner_mp.end();it++)
{
fec_tmp_arr[it->first]=fec_data[it->second].buf;
}
rs_decode2(data_num,redundant_num,fec_tmp_arr,len);
blob_decode.clear();
for(int i=0;i<data_num;i++)
{
blob_decode.input(fec_tmp_arr[i],len);
}
blob_decode.output(output_n,output_s_arr,output_len_arr);
ready_for_output=1;
}
index++;
if(index==int(anti_replay_buff_size)) index=0;
return 0;
}
int output(int &n,char ** &s_arr,int* &len_arr)
{
if(!ready_for_output)
{
n=-1;
s_arr=0;
len_arr=0;
}
else
{
ready_for_output=0;
n=output_n;
s_arr=output_s_arr;
len_arr=output_len_arr;
}
return 0;
}
};
#endif /* FEC_MANAGER_H_ */

View File

@ -6,6 +6,7 @@
#include "connection.h"
#include "fd_manager.h"
#include "delay_manager.h"
#include "fec_manager.h"
using namespace std;
@ -539,7 +540,6 @@ int server_event_loop()
mylog(log_fatal,"unknown fd,this should never happen\n");
myexit(-1);
}
}
delay_manager.check();
}
@ -581,6 +581,48 @@ int unit_test()
printf("<%s>",data[i]);
}
fec_free(code);
char arr2[6][100]=
{
"aaa11111","","ccc333333333"
,"ddd444","eee5555","ff6666"
};
blob_encode_t blob_encode;
for(int i=0;i<6;i++)
blob_encode.input(arr2[i],strlen(arr2[i]));
char **output;
int shard_len;
blob_encode.output(7,output,shard_len);
printf("<shard_len:%d>",shard_len);
blob_decode_t blob_decode;
for(int i=0;i<7;i++)
{
blob_decode.input(output[i],shard_len);
}
char **decode_output;
int * len_arr;
int num;
ret=blob_decode.output(num,decode_output,len_arr);
printf("<num:%d,ret:%d>\n",num,ret);
for(int i=0;i<num;i++)
{
char buf[1000]={0};
memcpy(buf,decode_output[i],len_arr[i]);
printf("<%d:%s>",len_arr[i],buf);
}
printf("\n");
fec_encode_manager_t fec_encode_manager;
fec_decode_manager_t fec_decode_manager;
return 0;
}
void print_help()

View File

@ -8,7 +8,7 @@ cc_arm= /toolchains/arm-2014.05/bin/arm-none-linux-gnueabi-g++
#cc_bcm2708=/home/wangyu/raspberry/tools/arm-bcm2708/gcc-linaro-arm-linux-gnueabihf-raspbian/bin/arm-linux-gnueabihf-g++
FLAGS= -std=c++11 -Wall -Wextra -Wno-unused-variable -Wno-unused-parameter -Wno-missing-field-initializers
SOURCES=main.cpp log.cpp common.cpp lib/fec.c lib/rs.c packet.cpp delay_manager.cpp fd_manager.cpp connection.cpp
SOURCES=main.cpp log.cpp common.cpp lib/fec.c lib/rs.c packet.cpp delay_manager.cpp fd_manager.cpp connection.cpp fec_manager.cpp
NAME=speeder
TARGETS=amd64 arm mips24kc_be x86 mips24kc_le

View File

@ -17,7 +17,6 @@ u64_t dup_packet_send_count=0;
u64_t packet_recv_count=0;
u64_t dup_packet_recv_count=0;
typedef u64_t anti_replay_seq_t;
const u32_t anti_replay_buff_size=10000;
int disable_replay_filter=0;
int random_drop=0;
@ -27,68 +26,6 @@ char key_string[1000]= "secret key";
int local_listen_fd=-1;
struct anti_replay_t
{
u64_t max_packet_received;
u64_t replay_buffer[anti_replay_buff_size];
unordered_set<u64_t> st;
u32_t const_id;
u32_t anti_replay_seq;
int index;
anti_replay_seq_t get_new_seq_for_send()
{
if(const_id==0) prepare();
anti_replay_seq_t res=const_id;
res<<=32u;
anti_replay_seq++;
res|=anti_replay_seq;
const_id=0;
return res;
}
void prepare()
{
anti_replay_seq=get_true_random_number();//random first seq
const_id=get_true_random_number_nz();
}
anti_replay_t()
{
memset(replay_buffer,0,sizeof(replay_buffer));
st.rehash(anti_replay_buff_size*10);
max_packet_received=0;
index=0;
}
int is_vaild(u64_t seq)
{
if(const_id==0) prepare();
//if(disable_replay_filter) return 1;
if(seq==0)
{
mylog(log_debug,"seq=0\n");
return 0;
}
if(st.find(seq)!=st.end() )
{
mylog(log_trace,"seq %llx exist\n",seq);
return 0;
}
if(replay_buffer[index]!=0)
{
assert(st.find(replay_buffer[index])!=st.end());
st.erase(replay_buffer[index]);
}
replay_buffer[index]=seq;
st.insert(seq);
index++;
if(index==int(anti_replay_buff_size)) index=0;
return 1; //for complier check
}
}anti_replay;
void encrypt_0(char * input,int &len,char *key)
{
int i,j;
@ -110,32 +47,6 @@ void decrypt_0(char * input,int &len,char *key)
input[i]^=key[j];
}
}
int add_seq(char * data,int &data_len )
{
if(data_len<0) return -1;
anti_replay_seq_t seq=anti_replay.get_new_seq_for_send();
seq=hton64(seq);
memcpy(data+data_len,&seq,sizeof(seq));
data_len+=sizeof(seq);
return 0;
}
int remove_seq(char * data,int &data_len)
{
anti_replay_seq_t seq;
if(data_len<int(sizeof(seq))) return -1;
data_len-=sizeof(seq);
memcpy(&seq,data+data_len,sizeof(seq));
seq=ntoh64(seq);
if(anti_replay.is_vaild(seq)==0)
{
if(disable_replay_filter==1) //todo inefficient code,why did i put it here???
return 0;
mylog(log_trace,"seq %llx dropped bc of replay-filter\n ",seq);
return -1;
}
packet_recv_count++;
return 0;
}
int do_obscure(const char * input, int in_len,char *output,int &out_len)
{
//memcpy(output,input,in_len);
@ -305,7 +216,7 @@ unsigned int crc32h(unsigned char *message,int len) {
return ~crc;
}
int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out)
int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out)
{
static char buf[buf_len];
output=buf;
@ -318,12 +229,12 @@ int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out)
memcpy(output+len_in+(int)(sizeof(n_conv)),&crc32_n,sizeof(crc32_n));
return 0;
}
int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out )
int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out )
{
u32_t n_conv;
memcpy(&n_conv,input,sizeof(n_conv));
conv=ntohl(n_conv);
output=input+sizeof(n_conv);
output=(char *)input+sizeof(n_conv);
u32_t crc32_n;
len_out=len_in-(int)sizeof(n_conv)-(int)sizeof(crc32_n);
if(len_out<0)

View File

@ -37,7 +37,7 @@ int de_obscure(const char * input, int in_len,char *output,int &out_len);
int sendto_ip_port (u32_t ip,int port,char * buf, int len,int flags);
int send_fd (int fd,char * buf, int len,int flags);
int put_conv(u32_t conv,char * input,int len_in,char *&output,int &len_out);
int get_conv(u32_t &conv,char *input,int len_in,char *&output,int &len_out );
int put_conv(u32_t conv,const char * input,int len_in,char *&output,int &len_out);
int get_conv(u32_t &conv,const char *input,int len_in,char *&output,int &len_out );
#endif /* PACKET_H_ */