소스 검색

同花顺高频刷新

hsun 8 달 전
부모
커밋
5c0702c082
6개의 변경된 파일372개의 추가작업 그리고 4개의 파일을 삭제
  1. 100 4
      controllers/base_from_ths_hf.go
  2. 6 0
      models/base_from_ths_hf.go
  3. 49 0
      models/base_from_ths_hf_data.go
  4. 5 0
      models/edb_ths_hf.go
  5. 211 0
      services/base_from_ths_hf.go
  6. 1 0
      utils/constants.go

+ 100 - 4
controllers/base_from_ths_hf.go

@@ -265,7 +265,7 @@ func (this *ThsHfController) BaseAdd() {
 	}
 	if len(indexes) == 0 {
 		br.Msg = "未搜索到指标"
-		br.ErrMsg = fmt.Sprintf("未搜索到指标")
+		br.ErrMsg = "未搜索到指标"
 		return
 	}
 	indexWithData := indexes[0]
@@ -333,10 +333,10 @@ func (this *ThsHfController) BaseAdd() {
 	br.Msg = "操作成功"
 }
 
-// TODO:BaseRefresh
+// BaseRefresh
 // @Title 同花顺高频数据-数据源刷新
 // @Description 同花顺高频数据-数据源刷新
-// @Success 200 {object} models.RefreshEdbInfoReq
+// @Success 200 {object} models.ThsHfBaseRefreshReq
 // @router /hf/base/refresh [post]
 func (this *ThsHfController) BaseRefresh() {
 	br := new(models.BaseResponse).Init()
@@ -347,6 +347,102 @@ func (this *ThsHfController) BaseRefresh() {
 		this.Data["json"] = br
 		this.ServeJSON()
 	}()
+	var params models.ThsHfBaseRefreshReq
+	if e := json.Unmarshal(this.Ctx.Input.RequestBody, &params); e != nil {
+		br.Msg = "参数解析异常"
+		br.ErrMsg = fmt.Sprintf("参数解析失败, %v", e)
+		return
+	}
+	params.BaseIndexCode = strings.TrimSpace(params.BaseIndexCode)
+	if params.BaseIndexCode == "" {
+		br.Msg = "参数异常"
+		br.ErrMsg = fmt.Sprintf("参数异常, BaseIndexCode: %s", params.BaseIndexCode)
+		return
+	}
+	if params.RefreshType <= 0 {
+		params.RefreshType = 1
+	}
+
+	indexItem := new(models.BaseFromThsHfIndex)
+	{
+		ob := new(models.BaseFromThsHfIndex)
+		cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
+		pars := make([]interface{}, 0)
+		pars = append(pars, params.BaseIndexCode)
+		item, e := ob.GetItemByCondition(cond, pars, "")
+		if e != nil {
+			if e.Error() == utils.ErrNoRow() {
+				br.Msg = "指标不存在"
+				return
+			}
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("获取源指标失败, %v", e)
+			return
+		}
+		indexItem = item
+	}
+
+	source := utils.DATA_SOURCE_THS
+	subSource := utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
+	cacheKey := fmt.Sprintf("%s_%d_%d_%s_%s", utils.CACHE_BASE_EDB_REFRESH, source, subSource, indexItem.StockCode, indexItem.Indicator)
+	defer func() {
+		_ = utils.Rc.Delete(cacheKey)
+	}()
+	if utils.Rc.IsExist(cacheKey) {
+		br.Ret = 501
+		br.Success = true
+		br.Msg = "系统处理中,请稍后重试"
+		return
+	}
+	utils.Rc.SetNX(cacheKey, 1, 3*time.Minute)
+
+	// API参数
+	var apiPars models.ThsHfSearchEdbReq
+	if e := json.Unmarshal([]byte(indexItem.ApiPars), &apiPars); e != nil {
+		br.Msg = "操作失败"
+		br.ErrMsg = fmt.Sprintf("源指标API参数异常, %v", e)
+		return
+	}
+	// 刷新6小时: 指标开始时间前推6小时; 全部: API参数中的开始时间
+	if params.RefreshType == 1 {
+		apiPars.StartTime = indexItem.StartDate.Add(-6 * time.Hour).Format(utils.FormatDateTime)
+	}
+	// 若API参数中的结束时间不为空, 且不在EndDate之后, 那么不再刷新该指标
+	if apiPars.EndTime != "" {
+		apiEnd, e := time.ParseInLocation(utils.FormatDateTime, apiPars.EndTime, time.Local)
+		if e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("API参数结束时间有误, %v", e)
+			return
+		}
+		if !apiEnd.After(indexItem.EndDate) {
+			br.Ret = 200
+			br.Success = true
+			br.Msg = "该指标无需刷新"
+			return
+		}
+	}
+
+	// 获取指标数据
+	indexes, e := services.GetEdbDataFromThsHf(apiPars, indexItem.TerminalCode)
+	if e != nil {
+		br.Msg = "操作失败"
+		br.ErrMsg = fmt.Sprintf("获取同花顺高频指标失败, %v", e)
+		return
+	}
+	if len(indexes) == 0 {
+		br.Msg = "未搜索到指标"
+		br.ErrMsg = fmt.Sprintf("未搜索到指标, StockCode: %s, Indicator: %s", indexItem.StockCode, indexItem.Indicator)
+		return
+	}
+	indexWithData := indexes[0]
+
+	// 写入指标数据
+	if e = services.WriteRefreshBaseThsHfIndex(indexItem, indexWithData, apiPars.StartTime); e != nil {
+		br.Msg = "操作失败"
+		br.ErrMsg = fmt.Sprintf("写入源指标数据失败, %v", e)
+		return
+	}
 
 	br.Ret = 200
 	br.Success = true
@@ -618,7 +714,7 @@ func (this *ThsHfController) EdbRefresh() {
 		br.Msg = "系统处理中,请稍后重试"
 		return
 	}
-	utils.Rc.SetNX(cacheKey, 1, 3*time.Minute)
+	utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
 	defer func() {
 		_ = utils.Rc.Delete(cacheKey)
 	}()

+ 6 - 0
models/base_from_ths_hf.go

@@ -341,6 +341,12 @@ func (m *BaseFromThsHfIndex) CreateIndexAndData(indexItem *BaseFromThsHfIndex, i
 	return
 }
 
+// ThsHfBaseRefreshReq 数据源刷新请求
+type ThsHfBaseRefreshReq struct {
+	BaseIndexCode string `description:"源指标编码"`
+	RefreshType   int    `description:"刷新类型: 1-最近6小时; 2-全部刷新"`
+}
+
 // ThsHfEdbAddReq 新增至指标库
 type ThsHfEdbAddReq struct {
 	ConvertRule ThsHfIndexConvert2EdbRule

+ 49 - 0
models/base_from_ths_hf_data.go

@@ -178,3 +178,52 @@ func (m *BaseFromThsHfData) Format2Item() (item *BaseFromThsHfDataItem) {
 	item.UniqueCode = m.UniqueCode
 	return
 }
+
+func (m *BaseFromThsHfData) MultiInsertOrUpdate(inserts, updates []*BaseFromThsHfData) (err error) {
+	o := orm.NewOrm()
+	if len(inserts) > 0 {
+		_, e := o.InsertMulti(600, inserts)
+		if e != nil {
+			err = fmt.Errorf("insert multi err: %s", e.Error())
+			return
+		}
+	}
+	if len(updates) > 0 {
+		sql := fmt.Sprintf("UPDATE %s SET %s = ?, modify_time = NOW() WHERE %s = ?", m.TableName(), m.Cols().Value, m.Cols().UniqueCode)
+		p, e := o.Raw(sql).Prepare()
+		if e != nil {
+			err = fmt.Errorf("prepare err: %s", e.Error())
+			return
+		}
+		defer func() {
+			_ = p.Close()
+		}()
+		for _, v := range updates {
+			_, e = p.Exec(v.Value, v.UniqueCode)
+			if e != nil {
+				err = fmt.Errorf("update err: %s", e.Error())
+				return
+			}
+		}
+	}
+	return
+}
+
+func (m *BaseFromThsHfData) GetIndexMinMax(indexCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	o := orm.NewOrm()
+	sql := fmt.Sprintf(`SELECT MIN(%s) AS min_date, MAX(%s) AS max_date, MIN(%s) AS min_value,MAX(%s) AS max_value FROM %s WHERE %s = ?`, m.Cols().DataTime, m.Cols().DataTime, m.Cols().Value, m.Cols().Value, m.TableName(), m.Cols().IndexCode)
+	err = o.Raw(sql, indexCode).QueryRow(&item)
+	if err != nil {
+		return
+	}
+
+	// 获取最新值
+	var lastVal float64
+	sql = fmt.Sprintf(`SELECT %s AS latest_value FROM %s WHERE %s = ? ORDER BY %s DESC LIMIT 1`, m.Cols().Value, m.TableName(), m.Cols().IndexCode, m.Cols().DataTime)
+	err = o.Raw(sql, indexCode).QueryRow(&lastVal)
+	if err != nil {
+		return
+	}
+	item.LatestValue = lastVal
+	return
+}

+ 5 - 0
models/edb_ths_hf.go

@@ -151,6 +151,11 @@ func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex
 }
 
 func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
+	if edbInfo == nil || edbBaseMapping == nil {
+		err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
+		return
+	}
+
 	// 真实数据的最大日期, 插入规则配置的日期
 	var realDataMaxDate, edbDataInsertConfigDate time.Time
 	var edbDataInsertConfig *EdbDataInsertConfig

+ 211 - 0
services/base_from_ths_hf.go

@@ -2,10 +2,13 @@ package services
 
 import (
 	"encoding/json"
+	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/services/alarm_msg"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/rdlucklib/rdluck_tools/http"
+	"github.com/shopspring/decimal"
 	"net/url"
 	"strings"
 	"time"
@@ -22,6 +25,9 @@ func GetEdbDataFromThsHf(thsParams models.ThsHfSearchEdbReq, terminalCode string
 		err = fmt.Errorf("获取同花顺终端配置失败, %v", e)
 		return
 	}
+	if thsParams.EndTime == "" {
+		thsParams.EndTime = time.Now().Local().Format(utils.FormatDateTime)
+	}
 
 	// 走API
 	if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" {
@@ -302,3 +308,208 @@ func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverU
 	}
 	return
 }
+
+// WriteRefreshBaseThsHfIndex 源指标刷新
+func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("WriteRefreshBaseThsHfIndex-更新失败, %v", err)
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+	if indexItem == nil {
+		err = fmt.Errorf("指标不存在")
+		return
+	}
+	if len(codeWithData.IndexData) == 0 {
+		return
+	}
+
+	// 获取源指标数据
+	dataOb := new(models.BaseFromThsHfData)
+	originData := make([]*models.BaseFromThsHfData, 0)
+	{
+		cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().IndexCode)
+		pars := make([]interface{}, 0)
+		pars = append(pars, indexItem.IndexCode)
+		if startTime != "" {
+			cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
+			pars = append(pars, startTime)
+		}
+		list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
+		if e != nil {
+			err = fmt.Errorf("获取源指标数据失败, %v", e)
+			return
+		}
+		originData = list
+	}
+
+	// 更新指标数据
+	dateExist := make(map[string]*models.BaseFromThsHfData)
+	newValExist := make(map[string]bool)
+	if len(originData) > 0 {
+		// unicode去重
+		for _, d := range originData {
+			uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
+			dateExist[uni] = d
+		}
+	}
+
+	// 筛选新增/更新数据
+	updateData := make([]*models.BaseFromThsHfData, 0)
+	insertData := make([]*models.BaseFromThsHfData, 0)
+	for _, d := range codeWithData.IndexData {
+		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
+		origin := dateExist[uni]
+
+		// unicode检验是否存在
+		strNewVal := decimal.NewFromFloat(d.Value).Round(4).String()
+		di, _ := decimal.NewFromString(strNewVal)
+		newVal, _ := di.Float64()
+		if origin != nil {
+			strExistVal := decimal.NewFromFloat(origin.Value).Round(4).String()
+			if strNewVal == strExistVal {
+				continue
+			}
+			origin.Value = newVal
+			origin.ModifyTime = time.Now().Local()
+			updateData = append(updateData, origin)
+		}
+
+		// 新增的数据去重
+		if newValExist[uni] {
+			continue
+		}
+		newValExist[uni] = true
+		newData := new(models.BaseFromThsHfData)
+		newData.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId
+		newData.IndexCode = indexItem.IndexCode
+		newData.DataTime = d.DataTime
+		newData.Value = newVal
+		newData.CreateTime = time.Now()
+		newData.ModifyTime = time.Now()
+		newData.DataTimestamp = d.DataTime.UnixNano() / 1e6
+		insertData = append(insertData, newData)
+	}
+	if e := dataOb.MultiInsertOrUpdate(insertData, updateData); e != nil {
+		err = fmt.Errorf("新增/更新源指标数据失败, %v", e)
+		return
+	}
+
+	// 更新指标开始结束时间
+	minMax, e := dataOb.GetIndexMinMax(indexItem.IndexCode)
+	if e == nil && minMax != nil {
+		minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
+			return
+		}
+		maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
+			return
+		}
+		indexItem.StartDate = minDate
+		indexItem.EndDate = maxDate
+		indexItem.ModifyTime = time.Now().Local()
+		updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
+		if e = indexItem.Update(updateCols); e != nil {
+			err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
+			return
+		}
+	}
+
+	// 同步刷新指标库
+	go func() {
+		_ = RefreshThsHfIndexFromBase(indexItem.IndexCode, startTime)
+	}()
+	return
+}
+
+// RefreshThsHfIndexFromBase 根据源指标刷新指标库
+func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标库失败, %v", err)
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	// 获取指标关联信息
+	mappings := make([]*models.BaseFromEdbMapping, 0)
+	{
+		ob := new(models.BaseFromEdbMapping)
+		cond := fmt.Sprintf(" AND %s = ?", ob.Cols().BaseIndexCode)
+		pars := make([]interface{}, 0)
+		pars = append(pars, baseCode)
+		list, e := ob.GetItemsByCondition(cond, pars, []string{}, "")
+		if e != nil {
+			err = fmt.Errorf("获取源指标关联失败, %v", e)
+			return
+		}
+		mappings = list
+	}
+	if len(mappings) == 0 {
+		return
+	}
+	codeMapping := make(map[string]*models.BaseFromEdbMapping)
+	edbInfoIds := make([]int, 0)
+	for _, v := range mappings {
+		if codeMapping[v.EdbCode] == nil {
+			codeMapping[v.EdbCode] = v
+		}
+		edbInfoIds = append(edbInfoIds, v.EdbInfoId)
+	}
+
+	// 指标信息
+	edbInfoList, e := models.GetEdbInfoByIdList(edbInfoIds)
+	if e != nil {
+		err = fmt.Errorf("获取指标信息列表失败, %v", e)
+		return
+	}
+	codeEdb := make(map[string]*models.EdbInfo)
+	for _, v := range edbInfoList {
+		if codeEdb[v.EdbCode] == nil {
+			codeEdb[v.EdbCode] = v
+		}
+	}
+
+	thsOb := new(models.EdbThsHf)
+	source := thsOb.GetSource()
+	subSource := thsOb.GetSubSource()
+	for _, v := range mappings {
+		cacheKey := fmt.Sprintf("%s_%d_%d_%s", utils.CACHE_EDB_DATA_REFRESH, source, subSource, v.EdbCode)
+		if utils.Rc.IsExist(cacheKey) {
+			continue
+		}
+		utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
+
+		edb := codeEdb[v.EdbCode]
+		if edb == nil {
+			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-指标信息有误, EdbCode: %s", v.EdbCode))
+			continue
+		}
+
+		// 刷新指标
+		if e := thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil {
+			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标失败, %v", e))
+			_ = utils.Rc.Delete(cacheKey)
+			continue
+		}
+
+		// 更新指标最值
+		e, _ = models.UnifiedModifyEdbInfoMaxAndMinInfo(edb)
+		if e != nil {
+			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-更新指标最值失败, %v", e))
+			_ = utils.Rc.Delete(cacheKey)
+			continue
+		}
+		_ = utils.Rc.Delete(cacheKey)
+
+		// 更新ES
+		go logic.UpdateEs(edb.EdbInfoId)
+	}
+	return
+}

+ 1 - 0
utils/constants.go

@@ -228,6 +228,7 @@ const (
 	CACHE_EDB_THS_SERVER_TOKEN       = "edb:ths_server_token:"            //同花顺调用凭证
 	CACHE_SELF_EDB_HANDLE            = "CACHE_SELF_EDB_HANDLE:"           // 自定义指标缓存
 	CACHE_BASE_EDB_ADD               = "CACHE_BASE_EDB_ADD_"              // 添加至数据源缓存
+	CACHE_BASE_EDB_REFRESH           = "CACHE_BASE_EDB_REFRESH_"          // 刷新数据源缓存
 )
 
 // 图表类型