diff --git a/extend/pull-kline.go b/extend/pull-kline.go new file mode 100644 index 0000000..fb9c8e2 --- /dev/null +++ b/extend/pull-kline.go @@ -0,0 +1,254 @@ +package extend + +import ( + "context" + _ "github.com/glebarez/go-sqlite" + "github.com/injoyai/base/chans" + "github.com/injoyai/logs" + "github.com/injoyai/tdx" + "github.com/injoyai/tdx/protocol" + "path/filepath" + "sort" + "xorm.io/core" + "xorm.io/xorm" +) + +var ( + KlineTableMap = map[string]*KlineTable{ + "minute": NewKlineTable("MinuteKline", func(c *tdx.Client) KlineHandler { return c.GetKlineMinuteUntil }), + "5minute": NewKlineTable("Minute5Kline", func(c *tdx.Client) KlineHandler { return c.GetKline5MinuteUntil }), + "15minute": NewKlineTable("Minute15Kline", func(c *tdx.Client) KlineHandler { return c.GetKline15MinuteUntil }), + "30minute": NewKlineTable("Minute30Kline", func(c *tdx.Client) KlineHandler { return c.GetKline30MinuteUntil }), + "hour": NewKlineTable("HourKline", func(c *tdx.Client) KlineHandler { return c.GetKlineHourUntil }), + "day": NewKlineTable("DayKline", func(c *tdx.Client) KlineHandler { return c.GetKlineDayUntil }), + "week": NewKlineTable("WeekKline", func(c *tdx.Client) KlineHandler { return c.GetKlineWeekUntil }), + "month": NewKlineTable("MonthKline", func(c *tdx.Client) KlineHandler { return c.GetKlineMonthUntil }), + "quarter": NewKlineTable("QuarterKline", func(c *tdx.Client) KlineHandler { return c.GetKlineQuarterUntil }), + "year": NewKlineTable("YearKline", func(c *tdx.Client) KlineHandler { return c.GetKlineYearUntil }), + } +) + +func NewPullKline(codes, tables []string, dir string, limit int) *PullKline { + _tables := []*KlineTable(nil) + for _, v := range tables { + _tables = append(_tables, KlineTableMap[v]) + } + return &PullKline{ + tables: _tables, + dir: dir, + Codes: codes, + limit: limit, + } +} + +type PullKline struct { + tables []*KlineTable + dir string //数据目录 + Codes []string //指定的代码 + limit int //并发数量 +} + +func (this *PullKline) Name() string { + return "拉取k线数据" +} + +func (this *PullKline) Run(ctx context.Context, m *tdx.Manage) error { + limit := chans.NewWaitLimit(uint(this.limit)) + + //1. 获取所有股票代码 + codes := this.Codes + if len(codes) == 0 { + codes = m.Codes.GetStocks() + } + + for _, v := range codes { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + limit.Add() + go func(code string) { + defer limit.Done() + + //连接数据库 + db, err := xorm.NewEngine("sqlite", filepath.Join(this.dir, code+".db")) + if err != nil { + logs.Err(err) + return + } + db.SetMapper(core.SameMapper{}) + db.DB().SetMaxOpenConns(1) + + for _, table := range this.tables { + if table == nil { + continue + } + + select { + case <-ctx.Done(): + return + default: + } + + db.Sync2(table) + + //2. 获取最后一条数据 + last := new(Kline) + if _, err = db.Table(table).Desc("Date").Get(last); err != nil { + logs.Err(err) + return + } + + //3. 从服务器获取数据 + insert := Klines{} + err = m.Do(func(c *tdx.Client) error { + insert, err = this.pull(code, last.Date, table.Handler(c)) + return err + }) + if err != nil { + logs.Err(err) + return + } + + //4. 插入数据库 + err = tdx.NewSessionFunc(db, func(session *xorm.Session) error { + for i, v := range insert { + if i == 0 { + if _, err := session.Table(table).Where("Date >= ?", v.Date).Delete(); err != nil { + return err + } + } + if _, err := session.Table(table).Insert(v); err != nil { + return err + } + } + return nil + }) + logs.PrintErr(err) + + } + + }(v) + } + limit.Wait() + return nil +} + +func (this *PullKline) pull(code string, lastDate int64, f func(code string, f func(k *protocol.Kline) bool) (*protocol.KlineResp, error)) (Klines, error) { + + if lastDate == 0 { + lastDate = protocol.ExchangeEstablish.Unix() + } + + resp, err := f(code, func(k *protocol.Kline) bool { + return k.Time.Unix() <= lastDate + }) + if err != nil { + return nil, err + } + + ks := Klines{} + for _, v := range resp.List { + ks = append(ks, &Kline{ + Code: code, + Date: v.Time.Unix(), + Open: v.Open, + High: v.High, + Low: v.Low, + Close: v.Close, + Volume: v.Volume, + Amount: v.Amount, + }) + } + + return ks, nil +} + +type Kline struct { + Code string `json:"code" xorm:"-"` //代码 + Date int64 `json:"date"` //时间节点 2006-01-02 15:00 + Open protocol.Price `json:"open"` //开盘价 + High protocol.Price `json:"high"` //最高价 + Low protocol.Price `json:"low"` //最低价 + Close protocol.Price `json:"close"` //收盘价 + Volume int64 `json:"volume"` //成交量 + Amount protocol.Price `json:"amount"` //成交额 + InDate int64 `json:"inDate" xorm:"created"` //创建时间 +} + +type Klines []*Kline + +func (this Klines) Less(i, j int) bool { return this[i].Code > this[j].Code } + +func (this Klines) Swap(i, j int) { this[i], this[j] = this[j], this[i] } + +func (this Klines) Len() int { return len(this) } + +func (this Klines) Sort() { sort.Sort(this) } + +// Kline 计算多个K线,成一个K线 +func (this Klines) Kline() *Kline { + if this == nil { + return new(Kline) + } + k := new(Kline) + for i, v := range this { + switch i { + case 0: + k.Open = v.Open + k.High = v.High + k.Low = v.Low + k.Close = v.Close + case len(this) - 1: + k.Close = v.Close + k.Date = v.Date + } + if v.High > k.High { + k.High = v.High + } + if v.Low < k.Low { + k.Low = v.Low + } + k.Volume += v.Volume + k.Amount += v.Amount + } + + return k +} + +// Merge 合并K线 +func (this Klines) Merge(n int) Klines { + if this == nil { + return nil + } + ks := []*Kline(nil) + for i := 0; i < len(this); i += n { + if i+n > len(this) { + ks = append(ks, this[i:].Kline()) + } else { + ks = append(ks, this[i:i+n].Kline()) + } + } + return ks +} + +type KlineHandler func(code string, f func(k *protocol.Kline) bool) (*protocol.KlineResp, error) + +func NewKlineTable(tableName string, handler func(c *tdx.Client) KlineHandler) *KlineTable { + return &KlineTable{ + tableName: tableName, + Handler: handler, + } +} + +type KlineTable struct { + Kline `xorm:"extends"` + tableName string + Handler func(c *tdx.Client) KlineHandler `xorm:"-"` +} + +func (this *KlineTable) TableName() string { + return this.tableName +}