优化PullKline

This commit is contained in:
钱纯净
2025-03-26 19:21:31 +08:00
parent ce6718831c
commit 0f75b402bc
2 changed files with 25 additions and 18 deletions

View File

@@ -6,6 +6,7 @@ import (
"github.com/injoyai/tdx" "github.com/injoyai/tdx"
"github.com/injoyai/tdx/extend" "github.com/injoyai/tdx/extend"
"path/filepath" "path/filepath"
"time"
) )
func main() { func main() {
@@ -13,12 +14,13 @@ func main() {
m, err := tdx.NewManage(nil) m, err := tdx.NewManage(nil)
logs.PanicErr(err) logs.PanicErr(err)
err = extend.NewPullKline( err = extend.NewPullKline(extend.PullKlineConfig{
[]string{"sz000001"}, Codes: []string{"sz000001"},
[]string{extend.Year}, Tables: []string{extend.Year},
filepath.Join(tdx.DefaultDatabaseDir, "kline"), Dir: filepath.Join(tdx.DefaultDatabaseDir, "kline"),
1, Limit: 1,
).Run(context.Background(), m) StartAt: time.Time{},
}).Run(context.Background(), m)
logs.PanicErr(err) logs.PanicErr(err)
} }

View File

@@ -9,6 +9,7 @@ import (
"github.com/injoyai/tdx/protocol" "github.com/injoyai/tdx/protocol"
"path/filepath" "path/filepath"
"sort" "sort"
"time"
"xorm.io/core" "xorm.io/core"
"xorm.io/xorm" "xorm.io/xorm"
) )
@@ -42,24 +43,28 @@ var (
} }
) )
func NewPullKline(codes, tables []string, dir string, limit int) *PullKline { type PullKlineConfig struct {
Codes []string //操作代码
Tables []string //数据类型
Dir string //数据位置
Limit int //协程数量
StartAt time.Time //数据开始时间
}
func NewPullKline(cfg PullKlineConfig) *PullKline {
_tables := []*KlineTable(nil) _tables := []*KlineTable(nil)
for _, v := range tables { for _, v := range cfg.Tables {
_tables = append(_tables, KlineTableMap[v]) _tables = append(_tables, KlineTableMap[v])
} }
return &PullKline{ return &PullKline{
tables: _tables, tables: _tables,
dir: dir, Config: cfg,
Codes: codes,
limit: limit,
} }
} }
type PullKline struct { type PullKline struct {
tables []*KlineTable tables []*KlineTable
dir string //数据目录 Config PullKlineConfig
Codes []string //指定的代码
limit int //并发数量
} }
func (this *PullKline) Name() string { func (this *PullKline) Name() string {
@@ -67,10 +72,10 @@ func (this *PullKline) Name() string {
} }
func (this *PullKline) Run(ctx context.Context, m *tdx.Manage) error { func (this *PullKline) Run(ctx context.Context, m *tdx.Manage) error {
limit := chans.NewWaitLimit(uint(this.limit)) limit := chans.NewWaitLimit(uint(this.Config.Limit))
//1. 获取所有股票代码 //1. 获取所有股票代码
codes := this.Codes codes := this.Config.Codes
if len(codes) == 0 { if len(codes) == 0 {
codes = m.Codes.GetStocks() codes = m.Codes.GetStocks()
} }
@@ -87,7 +92,7 @@ func (this *PullKline) Run(ctx context.Context, m *tdx.Manage) error {
defer limit.Done() defer limit.Done()
//连接数据库 //连接数据库
db, err := xorm.NewEngine("sqlite", filepath.Join(this.dir, code+".db")) db, err := xorm.NewEngine("sqlite", filepath.Join(this.Config.Dir, code+".db"))
if err != nil { if err != nil {
logs.Err(err) logs.Err(err)
return return
@@ -157,7 +162,7 @@ func (this *PullKline) pull(code string, lastDate int64, f func(code string, f f
} }
resp, err := f(code, func(k *protocol.Kline) bool { resp, err := f(code, func(k *protocol.Kline) bool {
return k.Time.Unix() <= lastDate return k.Time.Unix() <= lastDate || k.Time.Unix() <= this.Config.StartAt.Unix()
}) })
if err != nil { if err != nil {
return nil, err return nil, err