diff --git a/client.go b/client.go new file mode 100644 index 0000000..386ac87 --- /dev/null +++ b/client.go @@ -0,0 +1,140 @@ +package gotdx + +import ( + "bytes" + "compress/zlib" + "encoding/binary" + "gotdx/proto" + "io" + "log" + "net" + "strconv" + "strings" +) + +type Client struct { + conn net.Conn + opt *Opt + complete chan bool + sending chan bool +} + +type Opt struct { + Host string + Port int + MaxRetryTimes int +} + +func NewClient(opt *Opt) *Client { + client := &Client{} + if opt.MaxRetryTimes <= 0 { + opt.MaxRetryTimes = DefaultRetryTimes + } + + client.opt = opt + client.sending = make(chan bool, 1) + client.complete = make(chan bool, 1) + + return client +} + +func (client *Client) connect() error { + addr := strings.Join([]string{client.opt.Host, strconv.Itoa(client.opt.Port)}, ":") + conn, err := net.Dial("tcp", addr) + if err != nil { + return err + } + client.conn = conn + return err +} + +func (client *Client) do(msg proto.Msg) error { + sendData, err := msg.Serialize() + if err != nil { + return err + } + + retryTimes := 0 + + for { + n, err := client.conn.Write(sendData) + if n < len(sendData) { + retryTimes++ + if retryTimes <= client.opt.MaxRetryTimes { + log.Printf("第%d次重试\n", retryTimes) + } else { + return err + } + } else { + if err != nil { + return err + } + break + } + } + + headerBytes := make([]byte, proto.MessageHeaderBytes) + _, err = io.ReadFull(client.conn, headerBytes) + if err != nil { + return err + } + + headerBuf := bytes.NewReader(headerBytes) + var header proto.RespHeader + if err := binary.Read(headerBuf, binary.LittleEndian, &header); err != nil { + return err + } + + if header.ZipSize > proto.MessageMaxBytes { + log.Printf("msgData has bytes(%d) beyond max %d\n", header.ZipSize, proto.MessageMaxBytes) + return ErrBadData + } + + msgData := make([]byte, header.ZipSize) + _, err = io.ReadFull(client.conn, msgData) + if err != nil { + return err + } + + var out bytes.Buffer + if header.ZipSize != header.UnZipSize { + b := bytes.NewReader(msgData) + r, _ := zlib.NewReader(b) + io.Copy(&out, r) + err = msg.UnSerialize(header, out.Bytes()) + } else { + err = msg.UnSerialize(header, msgData) + } + + return err +} + +// Connect 连接券商行情服务器 +func (client *Client) Connect() (*proto.Hello1Reply, error) { + err := client.connect() + if err != nil { + return nil, err + } + obj := proto.NewHello1() + err = client.do(obj) + if err != nil { + return nil, err + } + return obj.Reply, err +} + +// Disconnect 断开服务器 +func (client *Client) Disconnect() error { + return client.conn.Close() +} + +// GetSecurityCount 获取指定市场内的证券数目 +func (client *Client) GetSecurityCount(market uint16) (*proto.SecurityCountReply, error) { + obj := proto.NewSecurityCount() + obj.SetParams(market) + err := client.do(obj) + if err != nil { + return nil, err + } + return obj.Reply, err +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..7bada05 --- /dev/null +++ b/client_test.go @@ -0,0 +1,54 @@ +package gotdx + +import ( + "fmt" + "testing" +) + +var opt = &Opt{ + Host: "119.147.212.81", + //Host: "58.63.254.191", + //Host: "218.16.117.138", + //Host: "222.85.139.177", + Port: 7709, +} + +func prepare() *Client { + api := NewClient(opt) + r, err := api.Connect() + if err != nil { + fmt.Println(err) + } + fmt.Println(r) + return api +} + +func Test_tdx_Connect(t *testing.T) { + api := NewClient(opt) + reply, err := api.Connect() + if err != nil { + fmt.Println(err) + } + fmt.Println(reply) + + _ = api.Disconnect() + +} + +func Test_tdx_GetSecurityCount(t *testing.T) { + api := prepare() + reply, err := api.GetSecurityCount(MarketSh) + if err != nil { + fmt.Println(err) + } + fmt.Println(reply) + + reply, err = api.GetSecurityCount(MarketSz) + if err != nil { + fmt.Println(err) + } + fmt.Println(reply) + + _ = api.Disconnect() + +} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..0d9fc76 --- /dev/null +++ b/constants.go @@ -0,0 +1,32 @@ +package gotdx + +import "errors" + +const ( + MarketSz = 0 // 深圳 + MarketSh = 1 // 上海 + MarketBj = 2 // 北京 +) + +const ( + KLINE_TYPE_5MIN = 0 // 5 分钟K 线 + KLINE_TYPE_15MIN = 1 // 15 分钟K 线 + KLINE_TYPE_30MIN = 2 // 30 分钟K 线 + KLINE_TYPE_1HOUR = 3 // 1 小时K 线 + KLINE_TYPE_DAILY = 4 // 日K 线 + KLINE_TYPE_WEEKLY = 5 // 周K 线 + KLINE_TYPE_MONTHLY = 6 // 月K 线 + KLINE_TYPE_EXHQ_1MIN = 7 // 1 分钟 + KLINE_TYPE_1MIN = 8 // 1 分钟K 线 + KLINE_TYPE_RI_K = 9 // 日K 线 + KLINE_TYPE_3MONTH = 10 // 季K 线 + KLINE_TYPE_YEARLY = 11 // 年K 线 +) + +const ( + DefaultRetryTimes = 3 // 重试次数 +) + +var ( + ErrBadData = errors.New("more than 8M data") +) diff --git a/docs/TdxProtocol.md b/docs/TdxProtocol.md new file mode 100644 index 0000000..214239b --- /dev/null +++ b/docs/TdxProtocol.md @@ -0,0 +1,52 @@ + +API + +``` +头部数据包含 流水号、命令字、包类型、压缩包类型、包长度、数据长度、数据内容 +响应数据包含 流水号、命令字、包类型、压缩包类型、包长度、数据长度、数据内容 +``` + +解析 +``` +通过协议头的解析,获取长度、获取数据,数据解压成标准的byte数据,二次封装为标准对象。 +数据的格式是 小端在前的GBK格式。 +根据 命令字 以及流水号 实现多线程异步处理,命令字可知道是什么请求,流水号可以进行业务处理。 +压缩包的解压方式为 Inflater 类解压响应内容会携带通达信标准协议字段,用来区分协议的类型。 + +``` + +连接 +``` +socket连接上后需要进行2次连接 +发送内容为监听招商证券的连接的二进制数据 +连接成功后需要发送心跳连接(用来判断连接是否正常) +``` + + +通信 +``` +正式建立连接后可以通信,可以建立多个socket同时通信 +socket的端口和地址 在通达信的主站行情中可以获取命令字 +``` + + +``` +public int LOGIN_ONE = 0x000d;//第一次登录 +public int LOGIN_TWO = 0x0fdb;//第二次登录 +public int HEART = 0x0004;//心跳维持 +public int STOCK_COUNT = 0x044e;//股票数目 +public int STOCK_LIST = 0x0450;//股票列表 +public int KMINUTE = 0x0537;//当天分时K线 +public int KMINUTE_OLD = 0x0fb4;//指定日期分时K线 +public int KLINE = 0x052d;//股票K线 +public int BIDD = 0x056a;//当日的竞价 +public int QUOTE = 0x053e;//实时五笔报价 +public int QUOTE_SORT = 0x053e;//沪深排序 +public int TRANSACTION = 0x0fc5;//分笔成交明细 +public int TRANSACTION_OLD = 0x0fb5;//历史分笔成交明细 +public int FINANCE = 0x0010;//财务数据 +public int COMPANY = 0x02d0;//公司数据 F10 +public int EXDIVIDEND = 0x000f;//除权除息 +public int FILE_DIRECTORY = 0x02cf;//公司文件目录 +public int FILE_CONTENT = 0x02d0;//公司文件内容 +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..70ff4d0 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module gotdx + +go 1.18 + +require golang.org/x/text v0.3.7 diff --git a/proto/get_security_count.go b/proto/get_security_count.go new file mode 100644 index 0000000..97b0360 --- /dev/null +++ b/proto/get_security_count.go @@ -0,0 +1,61 @@ +package proto + +import ( + "bytes" + "encoding/binary" +) + +// SecurityCount +type SecurityCount struct { + ReqHeader + content []byte + Reply *SecurityCountReply + + market uint16 +} + +type SecurityCountReply struct { + Count uint16 +} + +func NewSecurityCount() *SecurityCount { + obj := &SecurityCount{} + obj.Reply = new(SecurityCountReply) + obj.Zip = 0x0c + obj.SeqID = seqID() + obj.PacketType = 0x01 + obj.Method = KMSG_SECURITYCOUNT + obj.content = []byte{0x75, 0xc7, 0x33, 0x01} + return obj +} +func (obj *SecurityCount) SetParams(market uint16) { + obj.market = market +} + +func (obj *SecurityCount) Serialize() ([]byte, error) { + obj.PkgLen1 = 2 + uint16(len(obj.content)) + 2 + obj.PkgLen2 = 2 + uint16(len(obj.content)) + 2 + + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, obj.ReqHeader) + err = binary.Write(buf, binary.LittleEndian, obj.market) + buf.Write(obj.content) + return buf.Bytes(), err +} + +/* +0100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000011f85e34068747470733a2f2f626967352e6e65776f6e652e636f6d2e636e2f7a797968742f7a645f7a737a712e7a6970000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004150503a414c4c0d0a54494d453a303a30312d31353a30352c31353a30362d32333a35390d0a20202020c4facab9d3c3b5c4b0e6b1bebcb4bdabcda3d3c3a3acceaac1cbc4fab5c4d5fdb3a3cab9d3c32cc7ebbea1bfecc9fdd6c1d5d0c9ccd6a4c8af5043b0e6a1a30d0a20202020c8e7b9fbb2bbc4dcd7d4b6afc9fdbcb6a3acc7ebb5bdb9d9cdf868747470733a2f2f7777772e636d736368696e612e636f6d2fcfc2d4d8b0b2d7b0a3acd0bbd0bbc4fab5c4d6a7b3d6a3a100 年月日 年月日 +*/ +func (obj *SecurityCount) UnSerialize(header interface{}, data []byte) error { + //serverInfo := utils.Utf8ToGbk(data[58:]) + //fmt.Println(fmt.Sprintf("服务器:%s;", serverInfo)) + //fmt.Println(hex.EncodeToString(data)) + //obj.Reply.Info = serverInfo + + //var tmp uint16 + //bytesBuffer := bytes.NewBuffer(data) + //err := binary.Write(bytesBuffer, binary.LittleEndian, &tmp) + //binary.LittleEndian.Uint16(data) + obj.Reply.Count = binary.LittleEndian.Uint16(data[:2]) + return nil +} diff --git a/proto/hello1.go b/proto/hello1.go new file mode 100644 index 0000000..094a430 --- /dev/null +++ b/proto/hello1.go @@ -0,0 +1,51 @@ +package proto + +import ( + "bytes" + "encoding/binary" + "gotdx/utils" +) + +// Hello1 创建握手消息1 +type Hello1 struct { + ReqHeader + content []byte + Reply *Hello1Reply +} +type Hello1Reply struct { + Info string + serverTime string +} + +func NewHello1() *Hello1 { + obj := &Hello1{} + obj.Reply = new(Hello1Reply) + obj.Zip = 0x0c + obj.SeqID = seqID() + obj.PacketType = 0x01 + obj.Method = KMSG_CMD1 + obj.content = []byte{0x01} + return obj +} + +func (obj *Hello1) Serialize() ([]byte, error) { + obj.PkgLen1 = 2 + uint16(len(obj.content)) + obj.PkgLen2 = 2 + uint16(len(obj.content)) + + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, obj.ReqHeader) + buf.Write(obj.content) + return buf.Bytes(), err +} + +/* +00e60708051 50 f0 00 d3 a02b2020c03840384038403840384033a02b2020c0384038403840384038403 00 5a8a3401 f94a0100 5a8a3401 fd4a0100ff00e 700000101013f + 分 时 秒 日期 +*/ +func (obj *Hello1) UnSerialize(header interface{}, data []byte) error { + serverInfo := utils.Utf8ToGbk(data[68:]) + //fmt.Println(fmt.Sprintf("服务器:%s;", serverInfo)) + //fmt.Println(hex.EncodeToString(data)) + obj.Reply.Info = serverInfo + return nil +} diff --git a/proto/hello2.go b/proto/hello2.go new file mode 100644 index 0000000..5e57347 --- /dev/null +++ b/proto/hello2.go @@ -0,0 +1,50 @@ +package proto + +import ( + "bytes" + "encoding/binary" + "gotdx/utils" +) + +// Hello2 创建握手消息2 +type Hello2 struct { + ReqHeader + content []byte + Reply *Hello2Reply +} +type Hello2Reply struct { + Info string + serverTime string +} + +func NewHello2() *Hello2 { + obj := &Hello2{} + obj.Reply = new(Hello2Reply) + obj.Zip = 0x0c + obj.SeqID = seqID() + obj.PacketType = 0x01 + obj.Method = KMSG_CMD2 + obj.content = []byte{0xd5, 0xd0, 0xc9, 0xcc, 0xd6, 0xa4, 0xa8, 0xaf, 0x00, 0x00, 0x00, 0x8f, 0xc2, 0x25, 0x40, 0x13, 0x00, 0x00, 0xd5, 0x00, 0xc9, 0xcc, 0xbd, 0xf0, 0xd7, 0xea, 0x00, 0x00, 0x00, 0x02} + return obj +} + +func (obj *Hello2) Serialize() ([]byte, error) { + obj.PkgLen1 = 2 + uint16(len(obj.content)) + obj.PkgLen2 = 2 + uint16(len(obj.content)) + + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.LittleEndian, obj.ReqHeader) + buf.Write(obj.content) + return buf.Bytes(), err +} + +/* +0100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000011f85e34068747470733a2f2f626967352e6e65776f6e652e636f6d2e636e2f7a797968742f7a645f7a737a712e7a6970000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004150503a414c4c0d0a54494d453a303a30312d31353a30352c31353a30362d32333a35390d0a20202020c4facab9d3c3b5c4b0e6b1bebcb4bdabcda3d3c3a3acceaac1cbc4fab5c4d5fdb3a3cab9d3c32cc7ebbea1bfecc9fdd6c1d5d0c9ccd6a4c8af5043b0e6a1a30d0a20202020c8e7b9fbb2bbc4dcd7d4b6afc9fdbcb6a3acc7ebb5bdb9d9cdf868747470733a2f2f7777772e636d736368696e612e636f6d2fcfc2d4d8b0b2d7b0a3acd0bbd0bbc4fab5c4d6a7b3d6a3a100 年月日 年月日 +*/ +func (obj *Hello2) UnSerialize(header interface{}, data []byte) error { + serverInfo := utils.Utf8ToGbk(data[58:]) + //fmt.Println(fmt.Sprintf("服务器:%s;", serverInfo)) + //fmt.Println(hex.EncodeToString(data)) + obj.Reply.Info = serverInfo + return nil +} diff --git a/proto/proto.go b/proto/proto.go new file mode 100644 index 0000000..ae2970a --- /dev/null +++ b/proto/proto.go @@ -0,0 +1,65 @@ +package proto + +import "sync/atomic" + +const ( + MessageHeaderBytes = 0x10 + MessageMaxBytes = 1 << 15 +) + +const ( + KMSG_CMD1 = 0x000d // 建立链接 + KMSG_CMD2 = 0x0fdb // 建立链接 + KMSG_PING = 0x0015 // 测试连接 + KMSG_HEARTBEAT = 0xFFFF // 心跳(自定义) + KMSG_SECURITYCOUNT = 0x044e // 证券数量 + KMSG_BLOCKINFOMETA = 0x02c5 // 板块文件信息 + KMSG_BLOCKINFO = 0x06b9 // 板块文件 + KMSG_COMPANYCATEGORY = 0x02cf // 公司信息文件信息 + KMSG_COMPANYCONTENT = 0x02d0 // 公司信息描述 + KMSG_FINANCEINFO = 0x0010 // 财务信息 + KMSG_HISTORYMINUTETIMEDATE = 0x0fb4 // 历史分时信息 + KMSG_HISTORYTRANSACTIONDATA = 0x0fb5 // 历史分笔成交信息 + KMSG_INDEXBARS = 0x052d // 指数K线 + KMSG_MINUTETIMEDATA = 0x0537 // 分时数据 + KMSG_SECURITYLIST = 0x0450 // 证券列表 + KMSG_SECURITYQUOTES = 0x053e // 行情信息 + KMSG_TRANSACTIONDATA = 0x0fc5 // 分笔成交信息 + KMSG_XDXRINFO = 0x000f // 除权除息信息 +) + +type Msg interface { + Serialize() ([]byte, error) + UnSerialize(head interface{}, in []byte) error +} + +var _seqId uint32 + +/* +0c 02000000 00 1c00 1c00 2d05 0100363030303030080001000000140000000000000000000000 +0c 02189300 01 0300 0300 0d00 01 +0c 00000000 00 0200 0200 1500 +*/ +type ReqHeader struct { + Zip uint8 // ZipFlag + SeqID uint32 // 请求编号 + PacketType uint8 + PkgLen1 uint16 + PkgLen2 uint16 + Method uint16 // method 请求方法 +} + +type RespHeader struct { + I1 uint32 + I2 uint8 + SeqID uint32 // 请求编号 + I3 uint8 + Method uint16 // method + ZipSize uint16 // 长度 + UnZipSize uint16 // 未压缩长度 +} + +func seqID() uint32 { + atomic.AddUint32(&_seqId, 1) + return _seqId +} diff --git a/utils/strings.go b/utils/strings.go new file mode 100644 index 0000000..35b0fcf --- /dev/null +++ b/utils/strings.go @@ -0,0 +1,20 @@ +package utils + +import ( + "bytes" + "golang.org/x/text/encoding/simplifiedchinese" + "golang.org/x/text/transform" + "io/ioutil" + "strings" +) + +func Utf8ToGbk(text []byte) string { + + r := bytes.NewReader(text) + + decoder := transform.NewReader(r, simplifiedchinese.GBK.NewDecoder()) //GB18030 + + content, _ := ioutil.ReadAll(decoder) + + return strings.ReplaceAll(string(content), string([]byte{0x00}), "") +}