Files
injoyai-tdx/extend/pull-kline.go
2025-03-18 14:55:01 +08:00

255 lines
6.2 KiB
Go

package extend
import (
"context"
_ "github.com/glebarez/go-sqlite"
"github.com/injoyai/base/chans"
"github.com/injoyai/logs"
"github.com/injoyai/tdx"
"github.com/injoyai/tdx/protocol"
"path/filepath"
"sort"
"xorm.io/core"
"xorm.io/xorm"
)
var (
KlineTableMap = map[string]*KlineTable{
"minute": NewKlineTable("MinuteKline", func(c *tdx.Client) KlineHandler { return c.GetKlineMinuteUntil }),
"5minute": NewKlineTable("Minute5Kline", func(c *tdx.Client) KlineHandler { return c.GetKline5MinuteUntil }),
"15minute": NewKlineTable("Minute15Kline", func(c *tdx.Client) KlineHandler { return c.GetKline15MinuteUntil }),
"30minute": NewKlineTable("Minute30Kline", func(c *tdx.Client) KlineHandler { return c.GetKline30MinuteUntil }),
"hour": NewKlineTable("HourKline", func(c *tdx.Client) KlineHandler { return c.GetKlineHourUntil }),
"day": NewKlineTable("DayKline", func(c *tdx.Client) KlineHandler { return c.GetKlineDayUntil }),
"week": NewKlineTable("WeekKline", func(c *tdx.Client) KlineHandler { return c.GetKlineWeekUntil }),
"month": NewKlineTable("MonthKline", func(c *tdx.Client) KlineHandler { return c.GetKlineMonthUntil }),
"quarter": NewKlineTable("QuarterKline", func(c *tdx.Client) KlineHandler { return c.GetKlineQuarterUntil }),
"year": NewKlineTable("YearKline", func(c *tdx.Client) KlineHandler { return c.GetKlineYearUntil }),
}
)
func NewPullKline(codes, tables []string, dir string, limit int) *PullKline {
_tables := []*KlineTable(nil)
for _, v := range tables {
_tables = append(_tables, KlineTableMap[v])
}
return &PullKline{
tables: _tables,
dir: dir,
Codes: codes,
limit: limit,
}
}
type PullKline struct {
tables []*KlineTable
dir string //数据目录
Codes []string //指定的代码
limit int //并发数量
}
func (this *PullKline) Name() string {
return "拉取k线数据"
}
func (this *PullKline) Run(ctx context.Context, m *tdx.Manage) error {
limit := chans.NewWaitLimit(uint(this.limit))
//1. 获取所有股票代码
codes := this.Codes
if len(codes) == 0 {
codes = m.Codes.GetStocks()
}
for _, v := range codes {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
limit.Add()
go func(code string) {
defer limit.Done()
//连接数据库
db, err := xorm.NewEngine("sqlite", filepath.Join(this.dir, code+".db"))
if err != nil {
logs.Err(err)
return
}
db.SetMapper(core.SameMapper{})
db.DB().SetMaxOpenConns(1)
for _, table := range this.tables {
if table == nil {
continue
}
select {
case <-ctx.Done():
return
default:
}
db.Sync2(table)
//2. 获取最后一条数据
last := new(Kline)
if _, err = db.Table(table).Desc("Date").Get(last); err != nil {
logs.Err(err)
return
}
//3. 从服务器获取数据
insert := Klines{}
err = m.Do(func(c *tdx.Client) error {
insert, err = this.pull(code, last.Date, table.Handler(c))
return err
})
if err != nil {
logs.Err(err)
return
}
//4. 插入数据库
err = tdx.NewSessionFunc(db, func(session *xorm.Session) error {
for i, v := range insert {
if i == 0 {
if _, err := session.Table(table).Where("Date >= ?", v.Date).Delete(); err != nil {
return err
}
}
if _, err := session.Table(table).Insert(v); err != nil {
return err
}
}
return nil
})
logs.PrintErr(err)
}
}(v)
}
limit.Wait()
return nil
}
func (this *PullKline) pull(code string, lastDate int64, f func(code string, f func(k *protocol.Kline) bool) (*protocol.KlineResp, error)) (Klines, error) {
if lastDate == 0 {
lastDate = protocol.ExchangeEstablish.Unix()
}
resp, err := f(code, func(k *protocol.Kline) bool {
return k.Time.Unix() <= lastDate
})
if err != nil {
return nil, err
}
ks := Klines{}
for _, v := range resp.List {
ks = append(ks, &Kline{
Code: code,
Date: v.Time.Unix(),
Open: v.Open,
High: v.High,
Low: v.Low,
Close: v.Close,
Volume: v.Volume,
Amount: v.Amount,
})
}
return ks, nil
}
type Kline struct {
Code string `json:"code" xorm:"-"` //代码
Date int64 `json:"date"` //时间节点 2006-01-02 15:00
Open protocol.Price `json:"open"` //开盘价
High protocol.Price `json:"high"` //最高价
Low protocol.Price `json:"low"` //最低价
Close protocol.Price `json:"close"` //收盘价
Volume int64 `json:"volume"` //成交量
Amount protocol.Price `json:"amount"` //成交额
InDate int64 `json:"inDate" xorm:"created"` //创建时间
}
type Klines []*Kline
func (this Klines) Less(i, j int) bool { return this[i].Code > this[j].Code }
func (this Klines) Swap(i, j int) { this[i], this[j] = this[j], this[i] }
func (this Klines) Len() int { return len(this) }
func (this Klines) Sort() { sort.Sort(this) }
// Kline 计算多个K线,成一个K线
func (this Klines) Kline() *Kline {
if this == nil {
return new(Kline)
}
k := new(Kline)
for i, v := range this {
switch i {
case 0:
k.Open = v.Open
k.High = v.High
k.Low = v.Low
k.Close = v.Close
case len(this) - 1:
k.Close = v.Close
k.Date = v.Date
}
if v.High > k.High {
k.High = v.High
}
if v.Low < k.Low {
k.Low = v.Low
}
k.Volume += v.Volume
k.Amount += v.Amount
}
return k
}
// Merge 合并K线
func (this Klines) Merge(n int) Klines {
if this == nil {
return nil
}
ks := []*Kline(nil)
for i := 0; i < len(this); i += n {
if i+n > len(this) {
ks = append(ks, this[i:].Kline())
} else {
ks = append(ks, this[i:i+n].Kline())
}
}
return ks
}
type KlineHandler func(code string, f func(k *protocol.Kline) bool) (*protocol.KlineResp, error)
func NewKlineTable(tableName string, handler func(c *tdx.Client) KlineHandler) *KlineTable {
return &KlineTable{
tableName: tableName,
Handler: handler,
}
}
type KlineTable struct {
Kline `xorm:"extends"`
tableName string
Handler func(c *tdx.Client) KlineHandler `xorm:"-"`
}
func (this *KlineTable) TableName() string {
return this.tableName
}