diff --git a/extend/pull-kline-mysql.go b/extend/pull-kline-mysql.go new file mode 100644 index 0000000..81a0220 --- /dev/null +++ b/extend/pull-kline-mysql.go @@ -0,0 +1,148 @@ +package extend + +import ( + "context" + _ "github.com/go-sql-driver/mysql" + "github.com/injoyai/base/chans" + "github.com/injoyai/logs" + "github.com/injoyai/tdx" + "github.com/injoyai/tdx/protocol" + "xorm.io/core" + "xorm.io/xorm" +) + +func NewPullKlineMysql(cfg PullKlineConfig) (*PullKlineMysql, error) { + db, err := xorm.NewEngine("mysql", cfg.Dir) + if err != nil { + return nil, err + } + db.SetMapper(core.SameMapper{}) + _tables := []*KlineTable(nil) + for _, v := range cfg.Tables { + table := KlineTableMap[v] + if err = db.Sync2(table); err != nil { + return nil, err + } + _tables = append(_tables, table) + } + return &PullKlineMysql{ + tables: _tables, + Config: cfg, + DB: db, + }, nil +} + +type PullKlineMysql struct { + tables []*KlineTable + Config PullKlineConfig + DB *xorm.Engine +} + +func (this *PullKlineMysql) Name() string { + return "拉取k线数据" +} + +func (this *PullKlineMysql) Run(ctx context.Context, m *tdx.Manage) error { + limit := chans.NewWaitLimit(uint(this.Config.Limit)) + + //1. 获取所有股票代码 + codes := this.Config.Codes + if len(codes) == 0 { + codes = m.Codes.GetStocks() + } + + for _, v := range codes { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + limit.Add() + go func(code string) { + defer limit.Done() + + for _, table := range this.tables { + if table == nil { + continue + } + + select { + case <-ctx.Done(): + return + default: + } + + var err error + + //2. 获取最后一条数据 + last := new(Kline) + if _, err = this.DB.Table(table).Where("Code=?", code).Desc("Date").Get(last); err != nil { + logs.Err(err) + return + } + + //3. 从服务器获取数据 + insert := Klines{} + err = m.Do(func(c *tdx.Client) error { + insert, err = this.pull(code, last.Date, table.Handler(c)) + return err + }) + if err != nil { + logs.Err(err) + return + } + + //4. 插入数据库 + err = tdx.NewSessionFunc(this.DB, func(session *xorm.Session) error { + for i, v := range insert { + if i == 0 { + if _, err := session.Table(table).Where("Code=? and Date >= ?", code, v.Date).Delete(); err != nil { + return err + } + } + if _, err := session.Table(table).Insert(v); err != nil { + return err + } + } + return nil + }) + logs.PrintErr(err) + + } + + }(v) + } + limit.Wait() + return nil +} + +func (this *PullKlineMysql) pull(code string, lastDate int64, f func(code string, f func(k *protocol.Kline) bool) (*protocol.KlineResp, error)) (Klines, error) { + + if lastDate == 0 { + lastDate = protocol.ExchangeEstablish.Unix() + } + + resp, err := f(code, func(k *protocol.Kline) bool { + return k.Time.Unix() <= lastDate || k.Time.Unix() <= this.Config.StartAt.Unix() + }) + if err != nil { + return nil, err + } + + ks := Klines{} + for _, v := range resp.List { + ks = append(ks, &Kline{ + Code: code, + Date: v.Time.Unix(), + Open: v.Open, + High: v.High, + Low: v.Low, + Close: v.Close, + Volume: v.Volume, + Amount: v.Amount, + }) + } + + return ks, nil +} diff --git a/extend/pull-kline.go b/extend/pull-kline.go index cf33197..33e739f 100644 --- a/extend/pull-kline.go +++ b/extend/pull-kline.go @@ -186,7 +186,7 @@ func (this *PullKline) pull(code string, lastDate int64, f func(code string, f f } type Kline struct { - Code string `json:"code" xorm:"-"` //代码 + Code string `json:"code"` //代码 Date int64 `json:"date"` //时间节点 2006-01-02 15:00 Open protocol.Price `json:"open"` //开盘价 High protocol.Price `json:"high"` //最高价