From 456a0af9a5a12fa4815b2f711958a7e33911c695 Mon Sep 17 00:00:00 2001 From: injoyai <1113655791@qq.com> Date: Thu, 16 Oct 2025 14:00:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Manage=E5=AF=B9mysql=E7=9A=84?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- codes.go | 129 +++++++++++------------------------- example/ManageMysql/main.go | 17 +++++ workday.go | 68 ++++--------------- 3 files changed, 66 insertions(+), 148 deletions(-) create mode 100644 example/ManageMysql/main.go diff --git a/codes.go b/codes.go index 1ecb8c7..2e0feaa 100644 --- a/codes.go +++ b/codes.go @@ -71,7 +71,7 @@ func NewCodes(c *Client, db *xorm.Engine) (*Codes, error) { update := new(UpdateModel) { //查询或者插入一条数据 - has, err := db.Where("Key=?", "codes").Get(update) + has, err := db.Where("`Key`=?", "codes").Get(update) if err != nil { return nil, err } else if !has { @@ -123,83 +123,6 @@ func NewCodes(c *Client, db *xorm.Engine) (*Codes, error) { return cc, cc.Update(true) } -//func NewCodes(c *Client, filenames ...string) (*Codes, error) { -// -// //如果没有指定文件名,则使用默认 -// defaultFilename := filepath.Join(DefaultDatabaseDir, "codes.db") -// filename := conv.Default(defaultFilename, filenames...) -// filename = conv.Select(filename == "", defaultFilename, filename) -// -// //如果文件夹不存在就创建 -// dir, _ := filepath.Split(filename) -// _ = os.MkdirAll(dir, 0777) -// -// //连接数据库 -// db, err := xorm.NewEngine("sqlite", filename) -// if err != nil { -// return nil, err -// } -// db.SetMapper(core.SameMapper{}) -// db.DB().SetMaxOpenConns(1) -// if err := db.Sync2(new(CodeModel)); err != nil { -// return nil, err -// } -// if err := db.Sync2(new(UpdateModel)); err != nil { -// return nil, err -// } -// -// update := new(UpdateModel) -// { //查询或者插入一条数据 -// has, err := db.Get(update) -// if err != nil { -// return nil, err -// } else if !has { -// if _, err := db.Insert(update); err != nil { -// return nil, err -// } -// } -// } -// -// cc := &Codes{ -// Client: c, -// db: db, -// } -// -// { //设置定时器,每天早上9点更新数据 -// task := cron.New(cron.WithSeconds()) -// task.AddFunc("10 0 9 * * *", func() { -// for i := 0; i < 3; i++ { -// if err := cc.Update(); err == nil { -// return -// } -// logs.Err(err) -// <-time.After(time.Minute * 5) -// } -// }) -// task.Start() -// } -// -// { //判断是否更新过,更新过则不更新 -// now := time.Now() -// node := time.Date(now.Year(), now.Month(), now.Day(), 9, 0, 0, 0, time.Local) -// updateTime := time.Unix(update.Time, 0) -// if now.Sub(node) > 0 { -// //当前时间在9点之后,且更新时间在9点之前,需要更新 -// if updateTime.Sub(node) < 0 { -// return cc, cc.Update() -// } -// } else { -// //当前时间在9点之前,且更新时间在上个节点之前 -// if updateTime.Sub(node.Add(time.Hour*24)) < 0 { -// return cc, cc.Update() -// } -// } -// } -// -// //从缓存中加载 -// return cc, cc.Update(true) -//} - type Codes struct { *Client //客户端 db *xorm.Engine //数据库实例 @@ -272,7 +195,7 @@ func (this *Codes) Update(byDB ...bool) error { this.list = codes this.exchanges = exchanges //更新时间 - _, err = this.db.Where("Key=?", "codes").Update(&UpdateModel{Time: time.Now().Unix()}) + _, err = this.db.Where("`Key`=?", "codes").Update(&UpdateModel{Time: time.Now().Unix()}) return err } @@ -335,26 +258,48 @@ func (this *Codes) GetCodes(byDatabase bool) ([]*CodeModel, error) { } } - //4. 插入或者更新数据库 - err := NewSessionFunc(this.db, func(session *xorm.Session) error { - for _, v := range insert { - if _, err := session.Insert(v); err != nil { - return err + switch this.db.Dialect().URI().DBType { + case "mysql": + // 1️⃣ 清空 + if _, err := this.db.Exec("TRUNCATE TABLE codes"); err != nil { + return nil, err + } + + data := append(insert, update...) + // 2️⃣ 直接批量插入 + batchSize := 3000 // 8000(2m16s) 5000(43s) 3000(11s) 1000(59s) + for i := 0; i < len(data); i += batchSize { + end := i + batchSize + if end > len(data) { + end = len(data) + } + + slice := conv.Array(data[i:end]) + if _, err := this.db.Insert(slice); err != nil { + return nil, err } } - for _, v := range update { - if _, err := session.Where("Exchange=? and Code=? ", v.Exchange, v.Code).Cols("Name,LastPrice").Update(v); err != nil { - return err + case "sqlite3": + //4. 插入或者更新数据库 + err := NewSessionFunc(this.db, func(session *xorm.Session) error { + for _, v := range insert { + if _, err := session.Insert(v); err != nil { + return err + } } + for _, v := range update { + if _, err := session.Where("Exchange=? and Code=? ", v.Exchange, v.Code).Cols("Name,LastPrice").Update(v); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err } - return nil - }) - if err != nil { - return nil, err } return list, nil - } type UpdateModel struct { diff --git a/example/ManageMysql/main.go b/example/ManageMysql/main.go new file mode 100644 index 0000000..b11ab7f --- /dev/null +++ b/example/ManageMysql/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "github.com/injoyai/logs" + "github.com/injoyai/tdx" +) + +func main() { + _, err := tdx.NewManageMysql(&tdx.ManageConfig{ + Number: 2, + CodesFilename: "root:root@tcp(192.168.1.105:3306)/stock?charset=utf8mb4&parseTime=True&loc=Local", + WorkdayFileName: "root:root@tcp(192.168.1.105:3306)/stock?charset=utf8mb4&parseTime=True&loc=Local", + Dial: nil, + }) + logs.PanicErr(err) + logs.Debug("done") +} diff --git a/workday.go b/workday.go index 0e43990..2d258ff 100644 --- a/workday.go +++ b/workday.go @@ -74,48 +74,6 @@ func NewWorkday(c *Client, db *xorm.Engine) (*Workday, error) { return w, w.Update() } -//func NewWorkday(c *Client, filenames ...string) (*Workday, error) { -// -// defaultFilename := filepath.Join(DefaultDatabaseDir, "workday.db") -// filename := conv.Default(defaultFilename, filenames...) -// -// //如果文件夹不存在就创建 -// dir, _ := filepath.Split(filename) -// _ = os.MkdirAll(dir, 0777) -// -// //连接数据库 -// db, err := xorm.NewEngine("sqlite", filename) -// if err != nil { -// return nil, err -// } -// db.SetMapper(core.SameMapper{}) -// db.DB().SetMaxOpenConns(1) -// if err := db.Sync2(new(WorkdayModel)); err != nil { -// return nil, err -// } -// -// w := &Workday{ -// Client: c, -// db: db, -// cache: maps.NewBit(), -// } -// -// //设置定时器,每天早上9点更新数据,8点多获取不到今天的数据 -// task := cron.New(cron.WithSeconds()) -// task.AddFunc("0 0 9 * * *", func() { -// for i := 0; i < 3; i++ { -// if err := w.Update(); err == nil { -// return -// } -// logs.Err(err) -// <-time.After(time.Minute * 5) -// } -// }) -// task.Start() -// -// return w, w.Update() -//} - type Workday struct { *Client db *xorm.Engine @@ -131,7 +89,7 @@ func (this *Workday) Update() error { //获取沪市指数的日K线,用作历史是否节假日的判断依据 //判断日K线是否拉取过 - + //获取全部工作日 all := []*WorkdayModel(nil) if err := this.db.Find(&all); err != nil { @@ -146,27 +104,26 @@ func (this *Workday) Update() error { } now := time.Now() - if lastWorkday == nil || lastWorkday.Unix < IntegerDay(now).Unix() { + if lastWorkday.Unix < IntegerDay(now).Unix() { resp, err := this.Client.GetIndexDayAll("sh000001") if err != nil { logs.Err(err) return err } - return NewSessionFunc(this.db, func(session *xorm.Session) error { - for _, v := range resp.List { - if unix := v.Time.Unix(); unix > lastWorkday.Unix { - _, err = session.Insert(&WorkdayModel{Unix: unix, Date: v.Time.Format("20060102"), Is: true}) - if err != nil { - return err - } - this.cache.Set(uint64(unix), true) - } + inserts := []any(nil) + for _, v := range resp.List { + if unix := v.Time.Unix(); unix > lastWorkday.Unix { + inserts = append(inserts, &WorkdayModel{Unix: unix, Date: v.Time.Format("20060102")}) + this.cache.Set(uint64(unix), true) } - return nil - }) + } + + _, err = this.db.Insert(inserts) + return err } + return nil } @@ -220,7 +177,6 @@ type WorkdayModel struct { ID int64 `json:"id"` //主键 Unix int64 `json:"unix"` //时间戳 Date string `json:"date"` //日期 - Is bool `json:"is"` //是否是工作日 } func (this *WorkdayModel) TableName() string {