增加Manage对mysql的支持

This commit is contained in:
injoyai
2025-10-16 14:00:31 +08:00
parent 84404bcb2c
commit 456a0af9a5
3 changed files with 66 additions and 148 deletions

129
codes.go
View File

@@ -71,7 +71,7 @@ func NewCodes(c *Client, db *xorm.Engine) (*Codes, error) {
update := new(UpdateModel) update := new(UpdateModel)
{ //查询或者插入一条数据 { //查询或者插入一条数据
has, err := db.Where("Key=?", "codes").Get(update) has, err := db.Where("`Key`=?", "codes").Get(update)
if err != nil { if err != nil {
return nil, err return nil, err
} else if !has { } else if !has {
@@ -123,83 +123,6 @@ func NewCodes(c *Client, db *xorm.Engine) (*Codes, error) {
return cc, cc.Update(true) return cc, cc.Update(true)
} }
//func NewCodes(c *Client, filenames ...string) (*Codes, error) {
//
// //如果没有指定文件名,则使用默认
// defaultFilename := filepath.Join(DefaultDatabaseDir, "codes.db")
// filename := conv.Default(defaultFilename, filenames...)
// filename = conv.Select(filename == "", defaultFilename, filename)
//
// //如果文件夹不存在就创建
// dir, _ := filepath.Split(filename)
// _ = os.MkdirAll(dir, 0777)
//
// //连接数据库
// db, err := xorm.NewEngine("sqlite", filename)
// if err != nil {
// return nil, err
// }
// db.SetMapper(core.SameMapper{})
// db.DB().SetMaxOpenConns(1)
// if err := db.Sync2(new(CodeModel)); err != nil {
// return nil, err
// }
// if err := db.Sync2(new(UpdateModel)); err != nil {
// return nil, err
// }
//
// update := new(UpdateModel)
// { //查询或者插入一条数据
// has, err := db.Get(update)
// if err != nil {
// return nil, err
// } else if !has {
// if _, err := db.Insert(update); err != nil {
// return nil, err
// }
// }
// }
//
// cc := &Codes{
// Client: c,
// db: db,
// }
//
// { //设置定时器,每天早上9点更新数据
// task := cron.New(cron.WithSeconds())
// task.AddFunc("10 0 9 * * *", func() {
// for i := 0; i < 3; i++ {
// if err := cc.Update(); err == nil {
// return
// }
// logs.Err(err)
// <-time.After(time.Minute * 5)
// }
// })
// task.Start()
// }
//
// { //判断是否更新过,更新过则不更新
// now := time.Now()
// node := time.Date(now.Year(), now.Month(), now.Day(), 9, 0, 0, 0, time.Local)
// updateTime := time.Unix(update.Time, 0)
// if now.Sub(node) > 0 {
// //当前时间在9点之后,且更新时间在9点之前,需要更新
// if updateTime.Sub(node) < 0 {
// return cc, cc.Update()
// }
// } else {
// //当前时间在9点之前,且更新时间在上个节点之前
// if updateTime.Sub(node.Add(time.Hour*24)) < 0 {
// return cc, cc.Update()
// }
// }
// }
//
// //从缓存中加载
// return cc, cc.Update(true)
//}
type Codes struct { type Codes struct {
*Client //客户端 *Client //客户端
db *xorm.Engine //数据库实例 db *xorm.Engine //数据库实例
@@ -272,7 +195,7 @@ func (this *Codes) Update(byDB ...bool) error {
this.list = codes this.list = codes
this.exchanges = exchanges this.exchanges = exchanges
//更新时间 //更新时间
_, err = this.db.Where("Key=?", "codes").Update(&UpdateModel{Time: time.Now().Unix()}) _, err = this.db.Where("`Key`=?", "codes").Update(&UpdateModel{Time: time.Now().Unix()})
return err return err
} }
@@ -335,26 +258,48 @@ func (this *Codes) GetCodes(byDatabase bool) ([]*CodeModel, error) {
} }
} }
//4. 插入或者更新数据库 switch this.db.Dialect().URI().DBType {
err := NewSessionFunc(this.db, func(session *xorm.Session) error { case "mysql":
for _, v := range insert { // 1⃣ 清空
if _, err := session.Insert(v); err != nil { if _, err := this.db.Exec("TRUNCATE TABLE codes"); err != nil {
return err return nil, err
}
data := append(insert, update...)
// 2⃣ 直接批量插入
batchSize := 3000 // 8000(2m16s) 5000(43s) 3000(11s) 1000(59s)
for i := 0; i < len(data); i += batchSize {
end := i + batchSize
if end > len(data) {
end = len(data)
}
slice := conv.Array(data[i:end])
if _, err := this.db.Insert(slice); err != nil {
return nil, err
} }
} }
for _, v := range update { case "sqlite3":
if _, err := session.Where("Exchange=? and Code=? ", v.Exchange, v.Code).Cols("Name,LastPrice").Update(v); err != nil { //4. 插入或者更新数据库
return err err := NewSessionFunc(this.db, func(session *xorm.Session) error {
for _, v := range insert {
if _, err := session.Insert(v); err != nil {
return err
}
} }
for _, v := range update {
if _, err := session.Where("Exchange=? and Code=? ", v.Exchange, v.Code).Cols("Name,LastPrice").Update(v); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
} }
return nil
})
if err != nil {
return nil, err
} }
return list, nil return list, nil
} }
type UpdateModel struct { type UpdateModel struct {

View File

@@ -0,0 +1,17 @@
package main
import (
"github.com/injoyai/logs"
"github.com/injoyai/tdx"
)
func main() {
_, err := tdx.NewManageMysql(&tdx.ManageConfig{
Number: 2,
CodesFilename: "root:root@tcp(192.168.1.105:3306)/stock?charset=utf8mb4&parseTime=True&loc=Local",
WorkdayFileName: "root:root@tcp(192.168.1.105:3306)/stock?charset=utf8mb4&parseTime=True&loc=Local",
Dial: nil,
})
logs.PanicErr(err)
logs.Debug("done")
}

View File

@@ -74,48 +74,6 @@ func NewWorkday(c *Client, db *xorm.Engine) (*Workday, error) {
return w, w.Update() return w, w.Update()
} }
//func NewWorkday(c *Client, filenames ...string) (*Workday, error) {
//
// defaultFilename := filepath.Join(DefaultDatabaseDir, "workday.db")
// filename := conv.Default(defaultFilename, filenames...)
//
// //如果文件夹不存在就创建
// dir, _ := filepath.Split(filename)
// _ = os.MkdirAll(dir, 0777)
//
// //连接数据库
// db, err := xorm.NewEngine("sqlite", filename)
// if err != nil {
// return nil, err
// }
// db.SetMapper(core.SameMapper{})
// db.DB().SetMaxOpenConns(1)
// if err := db.Sync2(new(WorkdayModel)); err != nil {
// return nil, err
// }
//
// w := &Workday{
// Client: c,
// db: db,
// cache: maps.NewBit(),
// }
//
// //设置定时器,每天早上9点更新数据,8点多获取不到今天的数据
// task := cron.New(cron.WithSeconds())
// task.AddFunc("0 0 9 * * *", func() {
// for i := 0; i < 3; i++ {
// if err := w.Update(); err == nil {
// return
// }
// logs.Err(err)
// <-time.After(time.Minute * 5)
// }
// })
// task.Start()
//
// return w, w.Update()
//}
type Workday struct { type Workday struct {
*Client *Client
db *xorm.Engine db *xorm.Engine
@@ -131,7 +89,7 @@ func (this *Workday) Update() error {
//获取沪市指数的日K线,用作历史是否节假日的判断依据 //获取沪市指数的日K线,用作历史是否节假日的判断依据
//判断日K线是否拉取过 //判断日K线是否拉取过
//获取全部工作日 //获取全部工作日
all := []*WorkdayModel(nil) all := []*WorkdayModel(nil)
if err := this.db.Find(&all); err != nil { if err := this.db.Find(&all); err != nil {
@@ -146,27 +104,26 @@ func (this *Workday) Update() error {
} }
now := time.Now() now := time.Now()
if lastWorkday == nil || lastWorkday.Unix < IntegerDay(now).Unix() { if lastWorkday.Unix < IntegerDay(now).Unix() {
resp, err := this.Client.GetIndexDayAll("sh000001") resp, err := this.Client.GetIndexDayAll("sh000001")
if err != nil { if err != nil {
logs.Err(err) logs.Err(err)
return err return err
} }
return NewSessionFunc(this.db, func(session *xorm.Session) error { inserts := []any(nil)
for _, v := range resp.List { for _, v := range resp.List {
if unix := v.Time.Unix(); unix > lastWorkday.Unix { if unix := v.Time.Unix(); unix > lastWorkday.Unix {
_, err = session.Insert(&WorkdayModel{Unix: unix, Date: v.Time.Format("20060102"), Is: true}) inserts = append(inserts, &WorkdayModel{Unix: unix, Date: v.Time.Format("20060102")})
if err != nil { this.cache.Set(uint64(unix), true)
return err
}
this.cache.Set(uint64(unix), true)
}
} }
return nil }
})
_, err = this.db.Insert(inserts)
return err
} }
return nil return nil
} }
@@ -220,7 +177,6 @@ type WorkdayModel struct {
ID int64 `json:"id"` //主键 ID int64 `json:"id"` //主键
Unix int64 `json:"unix"` //时间戳 Unix int64 `json:"unix"` //时间戳
Date string `json:"date"` //日期 Date string `json:"date"` //日期
Is bool `json:"is"` //是否是工作日
} }
func (this *WorkdayModel) TableName() string { func (this *WorkdayModel) TableName() string {