mirror of
https://github.com/injoyai/tdx.git
synced 2025-11-26 21:25:35 +08:00
149 lines
3.0 KiB
Go
149 lines
3.0 KiB
Go
package extend
|
|
|
|
import (
|
|
"context"
|
|
_ "github.com/go-sql-driver/mysql"
|
|
"github.com/injoyai/base/chans"
|
|
"github.com/injoyai/logs"
|
|
"github.com/injoyai/tdx"
|
|
"github.com/injoyai/tdx/protocol"
|
|
"xorm.io/core"
|
|
"xorm.io/xorm"
|
|
)
|
|
|
|
func NewPullKlineMysql(cfg PullKlineConfig) (*PullKlineMysql, error) {
|
|
db, err := xorm.NewEngine("mysql", cfg.Dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
db.SetMapper(core.SameMapper{})
|
|
_tables := []*KlineTable(nil)
|
|
for _, v := range cfg.Tables {
|
|
table := KlineTableMap[v]
|
|
if err = db.Sync2(table); err != nil {
|
|
return nil, err
|
|
}
|
|
_tables = append(_tables, table)
|
|
}
|
|
return &PullKlineMysql{
|
|
tables: _tables,
|
|
Config: cfg,
|
|
DB: db,
|
|
}, nil
|
|
}
|
|
|
|
type PullKlineMysql struct {
|
|
tables []*KlineTable
|
|
Config PullKlineConfig
|
|
DB *xorm.Engine
|
|
}
|
|
|
|
func (this *PullKlineMysql) Name() string {
|
|
return "拉取k线数据"
|
|
}
|
|
|
|
func (this *PullKlineMysql) Run(ctx context.Context, m *tdx.Manage) error {
|
|
limit := chans.NewWaitLimit(uint(this.Config.Limit))
|
|
|
|
//1. 获取所有股票代码
|
|
codes := this.Config.Codes
|
|
if len(codes) == 0 {
|
|
codes = m.Codes.GetStocks()
|
|
}
|
|
|
|
for _, v := range codes {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
limit.Add()
|
|
go func(code string) {
|
|
defer limit.Done()
|
|
|
|
for _, table := range this.tables {
|
|
if table == nil {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
var err error
|
|
|
|
//2. 获取最后一条数据
|
|
last := new(Kline)
|
|
if _, err = this.DB.Table(table).Where("Code=?", code).Desc("Date").Get(last); err != nil {
|
|
logs.Err(err)
|
|
return
|
|
}
|
|
|
|
//3. 从服务器获取数据
|
|
insert := Klines{}
|
|
err = m.Do(func(c *tdx.Client) error {
|
|
insert, err = this.pull(code, last.Date, table.Handler(c))
|
|
return err
|
|
})
|
|
if err != nil {
|
|
logs.Err(err)
|
|
return
|
|
}
|
|
|
|
//4. 插入数据库
|
|
err = tdx.NewSessionFunc(this.DB, func(session *xorm.Session) error {
|
|
for i, v := range insert {
|
|
if i == 0 {
|
|
if _, err := session.Table(table).Where("Code=? and Date >= ?", code, v.Date).Delete(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if _, err := session.Table(table).Insert(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
logs.PrintErr(err)
|
|
|
|
}
|
|
|
|
}(v)
|
|
}
|
|
limit.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (this *PullKlineMysql) pull(code string, lastDate int64, f func(code string, f func(k *protocol.Kline) bool) (*protocol.KlineResp, error)) (Klines, error) {
|
|
|
|
if lastDate == 0 {
|
|
lastDate = protocol.ExchangeEstablish.Unix()
|
|
}
|
|
|
|
resp, err := f(code, func(k *protocol.Kline) bool {
|
|
return k.Time.Unix() <= lastDate || k.Time.Unix() <= this.Config.StartAt.Unix()
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ks := Klines{}
|
|
for _, v := range resp.List {
|
|
ks = append(ks, &Kline{
|
|
Code: code,
|
|
Date: v.Time.Unix(),
|
|
Open: v.Open,
|
|
High: v.High,
|
|
Low: v.Low,
|
|
Close: v.Close,
|
|
Volume: v.Volume,
|
|
Amount: v.Amount,
|
|
})
|
|
}
|
|
|
|
return ks, nil
|
|
}
|