Compare commits

...

17 Commits

Author SHA1 Message Date
injoyai
5fa572f298 把集合竞价合并到931里面 2025-10-28 08:41:04 +08:00
injoyai
61b2e737b3 把集合竞价从931剥离出来到930 2025-10-27 09:09:19 +08:00
injoyai
5654065954 把集合竞价从931剥离出来到930 2025-10-27 08:49:22 +08:00
injoyai
d7dd7fe0bf 优化extend.GetBjCodes,直接输出代码 2025-10-22 14:25:30 +08:00
injoyai
9fb1a3b651 增加SetTimeout 设置超时时间 2025-10-21 09:57:03 +08:00
injoyai
f8a24c0cf1 优化GetHistoryTradeBefore,增加失败重试,共尝试3次,都失败则返回错误 2025-10-21 09:39:16 +08:00
injoyai
ad0abbe6ba 优化GetHistoryTradeFull,需要传入Workday,减少非工作日的查询操作 2025-10-20 16:00:09 +08:00
injoyai
0b35006323 优化GetHistoryTradeFull,需要传入Workday,减少非工作日的查询操作 2025-10-20 15:37:04 +08:00
injoyai
3b823e2e54 优化GetHistoryTradeFull,需要传入Workday,减少非工作日的查询操作 2025-10-20 15:35:20 +08:00
injoyai
5c8091ac26 修复深圳指数历史分时成交价格小10倍的问题 2025-10-20 09:49:01 +08:00
钱纯净
d7b6963bd6 修复workday没有要更新时会报错的问题 "no element on slice when insert" 2025-10-19 22:46:06 +08:00
injoyai
456a0af9a5 增加Manage对mysql的支持 2025-10-16 14:00:31 +08:00
injoyai
84404bcb2c 增加Manage对mysql的支持 2025-10-16 11:00:58 +08:00
injoyai
716e35122f 增加Manage对mysql的支持 2025-10-16 10:54:44 +08:00
injoyai
c4866a2f2e 增加Manage对mysql的支持 2025-10-16 10:47:24 +08:00
injoyai
fa98199dae 增加Codes和Workday对mysql的支持 2025-10-16 10:33:04 +08:00
injoyai
29882ea5c0 修复IsSZStock的判断 2025-10-14 13:07:59 +08:00
11 changed files with 207 additions and 96 deletions

View File

@@ -181,6 +181,11 @@ func (this *Client) handlerDealMessage(c *client.Client, msg ios.Acker) {
} }
// SetTimeout 设置超时时间
func (this *Client) SetTimeout(t time.Duration) {
this.Wait.SetTimeout(t)
}
// SendFrame 发送数据,并等待响应 // SendFrame 发送数据,并等待响应
func (this *Client) SendFrame(f *protocol.Frame, cache ...any) (any, error) { func (this *Client) SendFrame(f *protocol.Frame, cache ...any) (any, error) {
f.MsgID = atomic.AddUint32(&this.msgID, 1) f.MsgID = atomic.AddUint32(&this.msgID, 1)
@@ -440,7 +445,12 @@ func (this *Client) GetHistoryMinuteTrade(date, code string, start, count uint16
} }
// GetHistoryTradeFull 获取上市至今的分时成交 // GetHistoryTradeFull 获取上市至今的分时成交
func (this *Client) GetHistoryTradeFull(code string) (protocol.Trades, error) { func (this *Client) GetHistoryTradeFull(code string, w *Workday) (protocol.Trades, error) {
return this.GetHistoryTradeBefore(code, w, time.Now())
}
// GetHistoryTradeBefore 获取上市至今的分时成交
func (this *Client) GetHistoryTradeBefore(code string, w *Workday, before time.Time) (protocol.Trades, error) {
ls := protocol.Trades(nil) ls := protocol.Trades(nil)
resp, err := this.GetKlineMonthAll(code) resp, err := this.GetKlineMonthAll(code)
if err != nil { if err != nil {
@@ -451,14 +461,20 @@ func (this *Client) GetHistoryTradeFull(code string) (protocol.Trades, error) {
} }
start := time.Date(resp.List[0].Time.Year(), resp.List[0].Time.Month(), 1, 0, 0, 0, 0, resp.List[0].Time.Location()) start := time.Date(resp.List[0].Time.Year(), resp.List[0].Time.Month(), 1, 0, 0, 0, 0, resp.List[0].Time.Location())
var res *protocol.TradeResp var res *protocol.TradeResp
for ; start.Before(time.Now()); start = start.Add(time.Hour * 24) { w.Range(start, before, func(t time.Time) bool {
res, err = this.GetHistoryTradeDay(start.Format("20060102"), code) for i := 0; i < 3; i++ {
res, err = this.GetHistoryTradeDay(t.Format("20060102"), code)
if err == nil {
break
}
}
if err != nil { if err != nil {
return nil, err return false
} }
ls = append(ls, res.List...) ls = append(ls, res.List...)
} return true
return ls, nil })
return ls, err
} }
// GetHistoryTradeDay 获取历史某天分时全部交易,通过多次请求来拼接,只能获取昨天及之前的数据 // GetHistoryTradeDay 获取历史某天分时全部交易,通过多次请求来拼接,只能获取昨天及之前的数据

100
codes.go
View File

@@ -23,10 +23,22 @@ func DialCodes(filename string, op ...client.Option) (*Codes, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewCodes(c, filename) return NewCodesSqlite(c, filename)
} }
func NewCodes(c *Client, filenames ...string) (*Codes, error) { func NewCodesMysql(c *Client, dsn string) (*Codes, error) {
//连接数据库
db, err := xorm.NewEngine("mysql", dsn)
if err != nil {
return nil, err
}
db.SetMapper(core.SameMapper{})
return NewCodes(c, db)
}
func NewCodesSqlite(c *Client, filenames ...string) (*Codes, error) {
//如果没有指定文件名,则使用默认 //如果没有指定文件名,则使用默认
defaultFilename := filepath.Join(DefaultDatabaseDir, "codes.db") defaultFilename := filepath.Join(DefaultDatabaseDir, "codes.db")
@@ -44,6 +56,12 @@ func NewCodes(c *Client, filenames ...string) (*Codes, error) {
} }
db.SetMapper(core.SameMapper{}) db.SetMapper(core.SameMapper{})
db.DB().SetMaxOpenConns(1) db.DB().SetMaxOpenConns(1)
return NewCodes(c, db)
}
func NewCodes(c *Client, db *xorm.Engine) (*Codes, error) {
if err := db.Sync2(new(CodeModel)); err != nil { if err := db.Sync2(new(CodeModel)); err != nil {
return nil, err return nil, err
} }
@@ -53,10 +71,11 @@ func NewCodes(c *Client, filenames ...string) (*Codes, error) {
update := new(UpdateModel) update := new(UpdateModel)
{ //查询或者插入一条数据 { //查询或者插入一条数据
has, err := db.Get(update) has, err := db.Where("`Key`=?", "codes").Get(update)
if err != nil { if err != nil {
return nil, err return nil, err
} else if !has { } else if !has {
update.Key = "codes"
if _, err := db.Insert(update); err != nil { if _, err := db.Insert(update); err != nil {
return nil, err return nil, err
} }
@@ -72,7 +91,8 @@ func NewCodes(c *Client, filenames ...string) (*Codes, error) {
task := cron.New(cron.WithSeconds()) task := cron.New(cron.WithSeconds())
task.AddFunc("10 0 9 * * *", func() { task.AddFunc("10 0 9 * * *", func() {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if err := cc.Update(); err == nil { err := cc.Update()
if err == nil {
return return
} }
logs.Err(err) logs.Err(err)
@@ -155,53 +175,8 @@ func (this *Codes) Get(code string) *CodeModel {
return this.Map[code] return this.Map[code]
} }
//// GetExchange 获取股票交易所,这里的参数不需要带前缀
//func (this *Codes) GetExchange(code string) protocol.Exchange {
// if len(code) == 6 {
// switch {
// case code[:1] == "6":
// return protocol.ExchangeSH
// case code[:1] == "0":
// return protocol.ExchangeSZ
// case code[:2] == "30":
// return protocol.ExchangeSZ
// }
// }
// var exchange string
// exchanges := this.exchanges[code]
// if len(exchanges) >= 1 {
// exchange = exchanges[0]
// }
// if len(code) == 8 {
// exchange = code[0:2]
// }
// switch exchange {
// case protocol.ExchangeSH.String():
// return protocol.ExchangeSH
// case protocol.ExchangeSZ.String():
// return protocol.ExchangeSZ
// default:
// return protocol.ExchangeSH
// }
//}
func (this *Codes) AddExchange(code string) string { func (this *Codes) AddExchange(code string) string {
return protocol.AddPrefix(code) return protocol.AddPrefix(code)
//if exchanges := this.exchanges[code]; len(exchanges) == 1 {
// return exchanges[0] + code
//}
//if len(code) == 6 {
// switch {
// case code[:1] == "6":
// return protocol.ExchangeSH.String() + code
// case code[:1] == "0":
// return protocol.ExchangeSZ.String() + code
// case code[:2] == "30":
// return protocol.ExchangeSZ.String() + code
// }
// return this.GetExchange(code).String() + code
//}
//return code
} }
// Update 更新数据,从服务器或者数据库 // Update 更新数据,从服务器或者数据库
@@ -220,7 +195,7 @@ func (this *Codes) Update(byDB ...bool) error {
this.list = codes this.list = codes
this.exchanges = exchanges this.exchanges = exchanges
//更新时间 //更新时间
_, err = this.db.Update(&UpdateModel{Time: time.Now().Unix()}) _, err = this.db.Where("`Key`=?", "codes").Update(&UpdateModel{Time: time.Now().Unix()})
return err return err
} }
@@ -283,6 +258,28 @@ func (this *Codes) GetCodes(byDatabase bool) ([]*CodeModel, error) {
} }
} }
switch this.db.Dialect().URI().DBType {
case "mysql":
// 1⃣ 清空
if _, err := this.db.Exec("TRUNCATE TABLE codes"); err != nil {
return nil, err
}
data := append(insert, update...)
// 2⃣ 直接批量插入
batchSize := 3000 // 8000(2m16s) 5000(43s) 3000(11s) 1000(59s)
for i := 0; i < len(data); i += batchSize {
end := i + batchSize
if end > len(data) {
end = len(data)
}
slice := conv.Array(data[i:end])
if _, err := this.db.Insert(slice); err != nil {
return nil, err
}
}
case "sqlite3":
//4. 插入或者更新数据库 //4. 插入或者更新数据库
err := NewSessionFunc(this.db, func(session *xorm.Session) error { err := NewSessionFunc(this.db, func(session *xorm.Session) error {
for _, v := range insert { for _, v := range insert {
@@ -300,12 +297,13 @@ func (this *Codes) GetCodes(byDatabase bool) ([]*CodeModel, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
return list, nil return list, nil
} }
type UpdateModel struct { type UpdateModel struct {
Key string
Time int64 //更新时间 Time int64 //更新时间
} }

View File

@@ -10,7 +10,7 @@ func main() {
c, err := tdx.Dial("124.71.187.122:7709", tdx.WithDebug()) c, err := tdx.Dial("124.71.187.122:7709", tdx.WithDebug())
logs.PanicErr(err) logs.PanicErr(err)
tdx.DefaultCodes, err = tdx.NewCodes(c, "./codes.db") tdx.DefaultCodes, err = tdx.NewCodesSqlite(c, "./codes.db")
logs.PanicErr(err) logs.PanicErr(err)
_ = c _ = c

View File

@@ -0,0 +1,17 @@
package main
import (
"github.com/injoyai/logs"
"github.com/injoyai/tdx"
)
func main() {
_, err := tdx.NewManageMysql(&tdx.ManageConfig{
Number: 2,
CodesFilename: "root:root@tcp(192.168.1.105:3306)/stock?charset=utf8mb4&parseTime=True&loc=Local",
WorkdayFileName: "root:root@tcp(192.168.1.105:3306)/stock?charset=utf8mb4&parseTime=True&loc=Local",
Dial: nil,
})
logs.PanicErr(err)
logs.Debug("done")
}

View File

@@ -9,10 +9,10 @@ import (
func main() { func main() {
common.Test(func(c *tdx.Client) { common.Test(func(c *tdx.Client) {
_, err := tdx.NewWorkday(c) //"./workday.db" _, err := tdx.NewWorkdaySqlite(c) //"./workday.db"
logs.PanicErr(err) logs.PanicErr(err)
_, err = tdx.NewCodes(c) //"./codes.db" _, err = tdx.NewCodesSqlite(c) //"./codes.db"
logs.PanicErr(err) logs.PanicErr(err)
c.Close() c.Close()

View File

@@ -4,6 +4,14 @@ import (
"github.com/injoyai/tdx" "github.com/injoyai/tdx"
) )
func GetBjCodes() ([]*tdx.BjCode, error) { func GetBjCodes() ([]string, error) {
return tdx.GetBjCodes() cs, err := tdx.GetBjCodes()
if err != nil {
return nil, err
}
ls := []string(nil)
for _, v := range cs {
ls = append(ls, "bj"+v.Code)
}
return ls, nil
} }

View File

@@ -1,6 +1,7 @@
package tdx package tdx
import ( import (
"errors"
"github.com/injoyai/ios/client" "github.com/injoyai/ios/client"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"time" "time"
@@ -10,6 +11,57 @@ const (
DefaultDatabaseDir = "./data/database" DefaultDatabaseDir = "./data/database"
) )
func NewManageMysql(cfg *ManageConfig, op ...client.Option) (*Manage, error) {
//初始化配置
if cfg == nil {
cfg = &ManageConfig{}
}
if cfg.CodesFilename == "" {
return nil, errors.New("未配置Codes的数据库")
}
if cfg.WorkdayFileName == "" {
return nil, errors.New("未配置Workday的数据库")
}
if cfg.Dial == nil {
cfg.Dial = DialDefault
}
//通用客户端
commonClient, err := cfg.Dial(op...)
if err != nil {
return nil, err
}
commonClient.Wait.SetTimeout(time.Second * 5)
//代码管理
codes, err := NewCodesMysql(commonClient, cfg.CodesFilename)
if err != nil {
return nil, err
}
//工作日管理
workday, err := NewWorkdayMysql(commonClient, cfg.WorkdayFileName)
if err != nil {
return nil, err
}
//连接池
p, err := NewPool(func() (*Client, error) {
return cfg.Dial(op...)
}, cfg.Number)
if err != nil {
return nil, err
}
return &Manage{
Pool: p,
Config: cfg,
Codes: codes,
Workday: workday,
Cron: cron.New(cron.WithSeconds()),
}, nil
}
func NewManage(cfg *ManageConfig, op ...client.Option) (*Manage, error) { func NewManage(cfg *ManageConfig, op ...client.Option) (*Manage, error) {
//初始化配置 //初始化配置
if cfg == nil { if cfg == nil {
@@ -33,13 +85,13 @@ func NewManage(cfg *ManageConfig, op ...client.Option) (*Manage, error) {
commonClient.Wait.SetTimeout(time.Second * 5) commonClient.Wait.SetTimeout(time.Second * 5)
//代码管理 //代码管理
codes, err := NewCodes(commonClient, cfg.CodesFilename) codes, err := NewCodesSqlite(commonClient, cfg.CodesFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
//工作日管理 //工作日管理
workday, err := NewWorkday(commonClient, cfg.WorkdayFileName) workday, err := NewWorkdaySqlite(commonClient, cfg.WorkdayFileName)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -293,6 +293,7 @@ func (this Klines) Merge(n int) Klines {
if n <= 1 { if n <= 1 {
return this return this
} }
ks := Klines(nil) ks := Klines(nil)
ls := Klines(nil) ls := Klines(nil)
for i := 0; ; i++ { for i := 0; ; i++ {

View File

@@ -210,7 +210,7 @@ func (this Trades) klinesForDay(date time.Time) Klines {
//分组,按 //分组,按
for _, v := range this { for _, v := range this {
ms := minutes(v.Time) ms := minutes(v.Time)
t := conv.Select(ms <= _930, _930, ms) t := conv.Select(ms < _930, _930, ms)
t++ t++
t = conv.Select(t > _1130 && t <= _1300, _1130, t) t = conv.Select(t > _1130 && t <= _1300, _1130, t)
t = conv.Select(t > _1500, _1500, t) t = conv.Select(t > _1500, _1500, t)

View File

@@ -135,10 +135,10 @@ func basePrice(code string) Price {
return 1 return 1
} }
switch code[:2] { switch code[:2] {
case "60", "30", "68", "00", "92", "43": case "60", "30", "68", "00", "92", "43", "39":
return 1 return 1
default: default:
return 10 return 1
} }
} }
@@ -262,7 +262,7 @@ func IsStock(code string) bool {
} }
func IsSZStock(code string) bool { func IsSZStock(code string) bool {
return len(code) == 8 && strings.ToLower(code[0:2]) == ExchangeSZ.String() && code[2:3] == "0" return len(code) == 8 && strings.ToLower(code[0:2]) == ExchangeSZ.String() && (code[2:3] == "0" || code[2:4] == "30")
} }
func IsSHStock(code string) bool { func IsSHStock(code string) bool {

View File

@@ -3,6 +3,7 @@ package tdx
import ( import (
"errors" "errors"
_ "github.com/glebarez/go-sqlite" _ "github.com/glebarez/go-sqlite"
_ "github.com/go-sql-driver/mysql"
"github.com/injoyai/base/maps" "github.com/injoyai/base/maps"
"github.com/injoyai/conv" "github.com/injoyai/conv"
"github.com/injoyai/logs" "github.com/injoyai/logs"
@@ -15,7 +16,19 @@ import (
"xorm.io/xorm" "xorm.io/xorm"
) )
func NewWorkday(c *Client, filenames ...string) (*Workday, error) { func NewWorkdayMysql(c *Client, dsn string) (*Workday, error) {
//连接数据库
db, err := xorm.NewEngine("mysql", dsn)
if err != nil {
return nil, err
}
db.SetMapper(core.SameMapper{})
return NewWorkday(c, db)
}
func NewWorkdaySqlite(c *Client, filenames ...string) (*Workday, error) {
defaultFilename := filepath.Join(DefaultDatabaseDir, "workday.db") defaultFilename := filepath.Join(DefaultDatabaseDir, "workday.db")
filename := conv.Default(defaultFilename, filenames...) filename := conv.Default(defaultFilename, filenames...)
@@ -31,6 +44,11 @@ func NewWorkday(c *Client, filenames ...string) (*Workday, error) {
} }
db.SetMapper(core.SameMapper{}) db.SetMapper(core.SameMapper{})
db.DB().SetMaxOpenConns(1) db.DB().SetMaxOpenConns(1)
return NewWorkday(c, db)
}
func NewWorkday(c *Client, db *xorm.Engine) (*Workday, error) {
if err := db.Sync2(new(WorkdayModel)); err != nil { if err := db.Sync2(new(WorkdayModel)); err != nil {
return nil, err return nil, err
} }
@@ -40,12 +58,12 @@ func NewWorkday(c *Client, filenames ...string) (*Workday, error) {
db: db, db: db,
cache: maps.NewBit(), cache: maps.NewBit(),
} }
//设置定时器,每天早上9点更新数据,8点多获取不到今天的数据 //设置定时器,每天早上9点更新数据,8点多获取不到今天的数据
task := cron.New(cron.WithSeconds()) task := cron.New(cron.WithSeconds())
task.AddFunc("0 0 9 * * *", func() { task.AddFunc("0 0 9 * * *", func() {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if err := w.Update(); err == nil { err := w.Update()
if err == nil {
return return
} }
logs.Err(err) logs.Err(err)
@@ -53,7 +71,6 @@ func NewWorkday(c *Client, filenames ...string) (*Workday, error) {
} }
}) })
task.Start() task.Start()
return w, w.Update() return w, w.Update()
} }
@@ -87,27 +104,30 @@ func (this *Workday) Update() error {
} }
now := time.Now() now := time.Now()
if lastWorkday == nil || lastWorkday.Unix < IntegerDay(now).Unix() { if lastWorkday.Unix < IntegerDay(now).Unix() {
resp, err := this.Client.GetIndexDayAll("sh000001") resp, err := this.Client.GetIndexDayAll("sh000001")
if err != nil { if err != nil {
logs.Err(err) logs.Err(err)
return err return err
} }
return NewSessionFunc(this.db, func(session *xorm.Session) error { inserts := []any(nil)
for _, v := range resp.List { for _, v := range resp.List {
if unix := v.Time.Unix(); unix > lastWorkday.Unix { if unix := v.Time.Unix(); unix > lastWorkday.Unix {
_, err = session.Insert(&WorkdayModel{Unix: unix, Date: v.Time.Format("20060102"), Is: true}) inserts = append(inserts, &WorkdayModel{Unix: unix, Date: v.Time.Format("20060102")})
if err != nil {
return err
}
this.cache.Set(uint64(unix), true) this.cache.Set(uint64(unix), true)
} }
} }
if len(inserts) == 0 {
return nil return nil
}) }
_, err = this.db.Insert(inserts)
return err
} }
return nil return nil
} }
@@ -161,7 +181,6 @@ type WorkdayModel struct {
ID int64 `json:"id"` //主键 ID int64 `json:"id"` //主键
Unix int64 `json:"unix"` //时间戳 Unix int64 `json:"unix"` //时间戳
Date string `json:"date"` //日期 Date string `json:"date"` //日期
Is bool `json:"is"` //是否是工作日
} }
func (this *WorkdayModel) TableName() string { func (this *WorkdayModel) TableName() string {