Browse Source

test: 同花顺高频定时刷新

hsun 8 months ago
parent
commit
db9af6b8c3
4 changed files with 254 additions and 0 deletions
  1. 195 0
      models/data_manage/base_from_ths_hf_index.go
  2. 2 0
      models/db.go
  3. 47 0
      services/data/base_from_ths_hf.go
  4. 10 0
      services/task.go

+ 195 - 0
models/data_manage/base_from_ths_hf_index.go

@@ -0,0 +1,195 @@
+package data_manage
+
+import (
+	"eta/eta_task/utils"
+	"fmt"
+	"github.com/beego/beego/v2/client/orm"
+	"strings"
+	"time"
+)
+
+// BaseFromThsHfIndex 同花顺高频数据
+type BaseFromThsHfIndex struct {
+	BaseFromThsHfIndexId    int       `orm:"column(base_from_ths_hf_index_id);pk"`
+	BaseFromThsHfClassifyId int       `description:"分类ID"`
+	IndexCode               string    `description:"指标编码"`
+	IndexName               string    `description:"指标名称"`
+	Unit                    string    `description:"单位"`
+	Source                  string    `description:"数据来源"`
+	Frequency               string    `description:"频度"`
+	StartDate               time.Time `description:"开始日期(至时分秒)"`
+	EndDate                 time.Time `description:"结束日期(至时分秒)"`
+	Describe                string    `description:"指标描述"`
+	Sort                    int       `description:"排序"`
+	IsStop                  int       `description:"是否停更:0-否;1-停更"`
+	TerminalCode            string    `description:"所属终端编码"`
+	StockCode               string    `description:"证券代码"`
+	Indicator               string    `description:"同花顺指标代码"`
+	ApiPars                 string    `description:"API请求参数"`
+	LatestValue             float64   `description:"最新值"`
+	SysUserId               int       `description:"创建人ID"`
+	SysUserRealName         string    `description:"创建人姓名"`
+	CreateTime              time.Time `description:"创建时间"`
+	ModifyTime              time.Time `description:"修改时间"`
+}
+
+func (m *BaseFromThsHfIndex) TableName() string {
+	return "base_from_ths_hf_index"
+}
+
+type BaseFromThsHfIndexCols struct {
+	PrimaryId               string
+	BaseFromThsHfClassifyId string
+	IndexCode               string
+	IndexName               string
+	Unit                    string
+	Source                  string
+	Frequency               string
+	StartDate               string
+	EndDate                 string
+	Describe                string
+	Sort                    string
+	IsStop                  string
+	TerminalCode            string
+	StockCode               string
+	Indicator               string
+	ApiPars                 string
+	LatestValue             string
+	SysUserId               string
+	SysUserRealName         string
+	CreateTime              string
+	ModifyTime              string
+}
+
+func (m *BaseFromThsHfIndex) Cols() BaseFromThsHfIndexCols {
+	return BaseFromThsHfIndexCols{
+		PrimaryId:               "base_from_ths_hf_index_id",
+		BaseFromThsHfClassifyId: "base_from_ths_hf_classify_id",
+		IndexCode:               "index_code",
+		IndexName:               "index_name",
+		Unit:                    "unit",
+		Source:                  "source",
+		Frequency:               "frequency",
+		StartDate:               "start_date",
+		EndDate:                 "end_date",
+		Describe:                "describe",
+		Sort:                    "sort",
+		IsStop:                  "is_stop",
+		TerminalCode:            "terminal_code",
+		StockCode:               "stock_code",
+		Indicator:               "indicator",
+		ApiPars:                 "api_pars",
+		LatestValue:             "latest_value",
+		SysUserId:               "sys_user_id",
+		SysUserRealName:         "sys_user_real_name",
+		CreateTime:              "create_time",
+		ModifyTime:              "modify_time",
+	}
+}
+
+func (m *BaseFromThsHfIndex) Create() (err error) {
+	o := orm.NewOrm()
+	id, err := o.Insert(m)
+	if err != nil {
+		return
+	}
+	m.BaseFromThsHfIndexId = int(id)
+	return
+}
+
+func (m *BaseFromThsHfIndex) CreateMulti(items []*BaseFromThsHfIndex) (err error) {
+	if len(items) == 0 {
+		return
+	}
+	o := orm.NewOrm()
+	_, err = o.InsertMulti(len(items), items)
+	return
+}
+
+func (m *BaseFromThsHfIndex) Update(cols []string) (err error) {
+	o := orm.NewOrm()
+	_, err = o.Update(m, cols...)
+	return
+}
+
+func (m *BaseFromThsHfIndex) Remove() (err error) {
+	o := orm.NewOrm()
+	sql := fmt.Sprintf(`DELETE FROM %s WHERE %s = ? LIMIT 1`, m.TableName(), m.Cols().PrimaryId)
+	_, err = o.Raw(sql, m.BaseFromThsHfIndexId).Exec()
+	return
+}
+
+func (m *BaseFromThsHfIndex) MultiRemove(ids []int) (err error) {
+	if len(ids) == 0 {
+		return
+	}
+	o := orm.NewOrm()
+	sql := fmt.Sprintf(`DELETE FROM %s WHERE %s IN (%s)`, m.TableName(), m.Cols().PrimaryId, utils.GetOrmInReplace(len(ids)))
+	_, err = o.Raw(sql, ids).Exec()
+	return
+}
+
+func (m *BaseFromThsHfIndex) RemoveByCondition(condition string, pars []interface{}) (err error) {
+	if condition == "" {
+		return
+	}
+	o := orm.NewOrm()
+	sql := fmt.Sprintf(`DELETE FROM %s WHERE %s`, m.TableName(), condition)
+	_, err = o.Raw(sql, pars).Exec()
+	return
+}
+
+func (m *BaseFromThsHfIndex) GetItemById(id int) (item *BaseFromThsHfIndex, err error) {
+	o := orm.NewOrm()
+	sql := fmt.Sprintf(`SELECT * FROM %s WHERE %s = ? LIMIT 1`, m.TableName(), m.Cols().PrimaryId)
+	err = o.Raw(sql, id).QueryRow(&item)
+	return
+}
+
+func (m *BaseFromThsHfIndex) GetItemByCondition(condition string, pars []interface{}, orderRule string) (item *BaseFromThsHfIndex, err error) {
+	o := orm.NewOrm()
+	order := ``
+	if orderRule != "" {
+		order = ` ORDER BY ` + orderRule
+	}
+	sql := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s %s LIMIT 1`, m.TableName(), condition, order)
+	err = o.Raw(sql, pars).QueryRow(&item)
+	return
+}
+
+func (m *BaseFromThsHfIndex) GetCountByCondition(condition string, pars []interface{}) (count int, err error) {
+	o := orm.NewOrm()
+	sql := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = o.Raw(sql, pars).QueryRow(&count)
+	return
+}
+
+func (m *BaseFromThsHfIndex) GetItemsByCondition(condition string, pars []interface{}, fieldArr []string, orderRule string) (items []*BaseFromThsHfIndex, err error) {
+	o := orm.NewOrm()
+	fields := strings.Join(fieldArr, ",")
+	if len(fieldArr) == 0 {
+		fields = `*`
+	}
+	order := fmt.Sprintf(`ORDER BY %s DESC`, m.Cols().CreateTime)
+	if orderRule != "" {
+		order = ` ORDER BY ` + orderRule
+	}
+	sql := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s %s`, fields, m.TableName(), condition, order)
+	_, err = o.Raw(sql, pars).QueryRows(&items)
+	return
+}
+
+func (m *BaseFromThsHfIndex) GetPageItemsByCondition(condition string, pars []interface{}, fieldArr []string, orderRule string, startSize, pageSize int) (items []*BaseFromThsHfIndex, err error) {
+	o := orm.NewOrm()
+	fields := strings.Join(fieldArr, ",")
+	if len(fieldArr) == 0 {
+		fields = `*`
+	}
+	order := fmt.Sprintf(`ORDER BY %s DESC`, m.Cols().CreateTime)
+	if orderRule != "" {
+		order = ` ORDER BY ` + orderRule
+	}
+	sql := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s %s LIMIT ?,?`, fields, m.TableName(), condition, order)
+	_, err = o.Raw(sql, pars, startSize, pageSize).QueryRows(&items)
+	return
+}

+ 2 - 0
models/db.go

@@ -127,6 +127,8 @@ func initEdbDataTable() {
 		new(data_manage.BaseFromIcpiClassify),
 		new(data_manage.BaseFromIcpiData),
 		new(data_manage.BusinessSysInteractionLog), // 商家系统交互记录表
+
+		new(data_manage.BaseFromThsHfIndex),
 	)
 }
 

+ 47 - 0
services/data/base_from_ths_hf.go

@@ -0,0 +1,47 @@
+package data
+
+import (
+	"context"
+	"eta/eta_task/models/data_manage"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/utils"
+	"fmt"
+)
+
+// RefreshBaseFromThsHfIndex 同花顺高频数据
+func RefreshBaseFromThsHfIndex(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("RefreshBaseFromThsHfIndex-刷新同花顺高频数据失败, %v", err)
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	indexOb := new(data_manage.BaseFromThsHfIndex)
+	indexes, e := indexOb.GetItemsByCondition(``, make([]interface{}, 0), []string{indexOb.Cols().PrimaryId, indexOb.Cols().IndexCode}, "")
+	if e != nil {
+		err = fmt.Errorf("获取源指标列表失败, %v", e)
+		return
+	}
+	if len(indexes) == 0 {
+		return
+	}
+
+	refreshUrl := "ths/hf/base/refresh"
+	for _, v := range indexes {
+		param := make(map[string]interface{})
+		param["BaseIndexCode"] = v.IndexCode
+		param["RefreshType"] = 1 // 默认刷6小时前的
+		res, e := postRefreshEdbData(param, refreshUrl)
+		if e != nil {
+			utils.FileLog.Info(fmt.Sprintf("RefreshBaseFromThsHfIndex-postRefreshEdbData, code: %s, err: %v", v.IndexCode, e))
+			continue
+		}
+		if res != nil && res.Ret != 200 {
+			utils.FileLog.Info(fmt.Sprintf("RefreshBaseFromThsHfIndex-postRefreshEdbData, code: %s, Ret: %d, ErrMsg: %s", v.IndexCode, res.Ret, res.ErrMsg))
+			continue
+		}
+	}
+	return
+}

+ 10 - 0
services/task.go

@@ -27,6 +27,12 @@ func Task() {
 
 	XyTask()
 
+	// TODO:测试-刷新同花顺高频, 上线前删除
+	if utils.RunMode == "debug" {
+		refreshThsHfBase := task.NewTask("refreshThsHfBase", "0 30 12 * * *", data.RefreshBaseFromThsHfIndex)
+		task.AddTask("refreshThsHfBase", refreshThsHfBase)
+	}
+
 	task.StartTask()
 	fmt.Println("task end")
 }
@@ -122,6 +128,10 @@ func releaseTask() {
 		refreshPCSGBloomberg := task.NewTask("refreshPCSGBloombergDaily", "0 30 9 * * *", data.RefreshPCSGBloomberg)
 		task.AddTask("中石油新加坡-每日Bloomberg指标刷新", refreshPCSGBloomberg)
 	}
+
+	// 刷新同花顺高频
+	refreshThsHfBase := task.NewTask("refreshThsHfBase", "0 0 0,6,9,12,15,18,21 * * *", data.RefreshBaseFromThsHfIndex)
+	task.AddTask("refreshThsHfBase", refreshThsHfBase)
 }
 
 func RefreshData(cont context.Context) (err error) {