Kaynağa Gözat

fix:新增路透数据指标刷新;优化手工指标录入提醒逻辑

Roc 3 yıl önce
ebeveyn
işleme
003239861b

+ 1 - 1
models/data_entry.go

@@ -40,7 +40,7 @@ func GetEdbdataCount(tradeCode, nowDate string) (count int, err error) {
 
 // GetEdbInfoByFrequencyNotDay 获取频度非日度 且 提醒时间不为空 的指标数据
 func GetEdbInfoByFrequencyNotDay() (items []*EdbInfo, err error) {
-	sql := `SELECT * FROM edbinfo WHERE frequency!="日度" AND notice_time<>''  `
+	sql := `SELECT * FROM edbinfo WHERE frequency!="日度" AND notice_time<>'' and user_id>0 `
 	o := orm.NewOrm()
 	o.Using("edb")
 	_, err = o.Raw(sql).QueryRows(&items)

+ 178 - 0
models/data_manage/edb_data_lt.go

@@ -0,0 +1,178 @@
+package data_manage
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/rdlucklib/rdluck_tools/http"
+	"github.com/rdlucklib/rdluck_tools/orm"
+	"hongze/hongze_task/utils"
+	"reflect"
+	"strconv"
+	"strings"
+	"time"
+)
+
+type EdbDataLt struct {
+	EdbDataId     int `orm:"column(edb_data_id);pk"`
+	EdbInfoId     int
+	EdbCode       string
+	DataTime      string
+	Value         float64
+	Status        int
+	CreateTime    time.Time
+	ModifyTime    time.Time
+	DataTimestamp int64
+}
+
+// AddEdbDataLtBySql 执行添加数据的sql
+func AddEdbDataLtBySql(sqlStr string) (err error) {
+	o := orm.NewOrm()
+	o.Using("data")
+	_, err = o.Raw(sqlStr).Exec()
+	return
+}
+
+// ModifyEdbDataLt 修改路透社数据
+func ModifyEdbDataLt(edbInfoId int64, dataTime string, value float64) (err error) {
+	o := orm.NewOrm()
+	o.Using("data")
+	sql := ` UPDATE edb_data_lt SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
+	_, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
+	return
+}
+
+func GetEdbDataLtMaxOrMinDate(edbCode string) (min_date, max_date string, err error) {
+	o := orm.NewOrm()
+	o.Using("data")
+	sql := ` SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date FROM edb_data_lt WHERE edb_code=? `
+	err = o.Raw(sql, edbCode).QueryRow(&min_date, &max_date)
+	return
+}
+
+func GetEdbDataLtByCodeAndDate(edbCode string, startDate string) (count int, err error) {
+	o := orm.NewOrm()
+	o.Using("data")
+	sql := ` SELECT COUNT(1) AS count FROM edb_data_lt WHERE edb_code=? AND data_time=? `
+	err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
+	return
+}
+
+func GetEdbDataLtByCode(edbCode string, size int) (items []*EdbInfoSearchData, err error) {
+	o := orm.NewOrm()
+	o.Using("data")
+	sql := ` SELECT * FROM edb_data_lt WHERE edb_code=? ORDER BY data_time DESC LIMIT ? `
+	_, err = o.Raw(sql, edbCode, size).QueryRows(&items)
+	return
+}
+
+//刷新所有数据
+func RefreshAllEdbDataByLt(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
+	// 获取路透社数据
+	ltDataList, err := QueryEdbDataByLt(edbCode, startDate, endDate)
+	if err != nil {
+		return
+	}
+
+	o := orm.NewOrm()
+	o.Using("data")
+	o.Begin()
+	defer func() {
+		if err != nil {
+			o.Rollback()
+		} else {
+			o.Commit()
+		}
+	}()
+
+	//获取指标所有数据
+	dataList := make([]*EdbDataBase, 0)
+	dataTableName := GetEdbDataTableName(source)
+	sql := `SELECT * FROM %s WHERE edb_info_id=? `
+	sql = fmt.Sprintf(sql, dataTableName)
+	_, err = o.Raw(sql, edbInfoId).QueryRows(&dataList)
+	if err != nil {
+		return err
+	}
+	dataMap := make(map[string]string)
+	for _, v := range dataList {
+		dataMap[v.DataTime] = v.Value
+	}
+	edbInfoIdStr := strconv.Itoa(edbInfoId)
+
+	addSql := ` INSERT INTO edb_data_lt(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values `
+	var isAdd bool
+
+	for timestampInt, edbValue := range ltDataList {
+		dataTime := time.Unix(timestampInt/1000, 0)
+
+		//校验数据类型对不对
+		valType := reflect.TypeOf(edbValue)
+		if valType == nil {
+			continue
+		}
+		if valType.String() != "float64" {
+			continue
+		}
+		sValue := edbValue.(float64)
+
+		eDate := dataTime.Format(utils.FormatDate)
+		if err != nil {
+			return err
+		}
+
+		saveValue := utils.SubFloatToString(sValue, 30)
+		if existVal, ok := dataMap[eDate]; !ok {
+			timestamp := dataTime.UnixNano() / 1e6
+			timeStr := fmt.Sprintf("%d", timestamp)
+
+			addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, saveValue)
+			isAdd = true
+		} else {
+			if existVal != saveValue {
+				sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
+				sql = fmt.Sprintf(sql, dataTableName)
+				_, err = o.Raw(sql, sValue, edbInfoId, eDate).Exec()
+				if err != nil {
+					return err
+				}
+			}
+		}
+	}
+	if isAdd {
+		addSql = strings.TrimRight(addSql, ",")
+		_, err = o.Raw(addSql).Exec()
+		if err != nil {
+			fmt.Println("RefreshAllEdbDataByLt add Err", err.Error())
+			return
+		}
+	}
+	return
+}
+
+type EdbDataFromLt struct {
+	Close map[int64]interface{} `json:"CLOSE"`
+}
+
+// QueryEdbDataByLt 获取路透社数据
+func QueryEdbDataByLt(edbCode, startDate, endDate string) (dataList map[int64]interface{}, err error) {
+	dataList = make(map[int64]interface{})
+
+	ltUrl := utils.Hz_Data_LT_Url + `edbInfo/ek?EdbCode=%s&StartDate=%s&EndDate=%s`
+	ltUrl = fmt.Sprintf(ltUrl, edbCode, startDate, endDate)
+	utils.FileLog.Info("ltUrl:%s", ltUrl)
+	body, err := http.Get(ltUrl)
+	utils.FileLog.Info("lt result:%s", string(body))
+
+	if err != nil {
+		return
+	}
+	//fmt.Println(string(body))
+	item := new(EdbDataFromLt)
+	err = json.Unmarshal(body, &item)
+
+	if err != nil {
+		return
+	}
+	dataList = item.Close
+	return
+}

+ 41 - 0
services/data/edb_info.go

@@ -961,6 +961,47 @@ func RefreshDataFromZz(wg *sync.WaitGroup) (err error) {
 	return err
 }
 
+// RefreshDataFromLt 刷新路透数据
+func RefreshDataFromLt(wg *sync.WaitGroup) (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println("RefreshDataFromLt Err:" + err.Error())
+			go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", "RefreshDataFromLt ErrMsg:"+err.Error(), utils.EmailSendToUsers)
+		}
+		wg.Done()
+	}()
+	var condition string
+	var pars []interface{}
+	condition += " AND source=? "
+	pars = append(pars, utils.DATA_SOURCE_LT)
+	items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
+	if err != nil {
+		return errors.New("GetEdbInfoByCondition:" + err.Error())
+	}
+
+	endDate := time.Now().Format(utils.FormatDate)
+	for _, v := range items {
+		startDate := v.StartDate.Format(utils.FormatDate)
+		err = data_manage.RefreshAllEdbDataByLt(v.EdbInfoId, utils.DATA_SOURCE_LT, v.EdbCode, startDate, endDate)
+		if err != nil {
+			return errors.New("RefreshEdbDataByLt:" + err.Error())
+		}
+
+		//更新指标的最新数据
+		item, err := data_manage.GetEdbInfoMaxAndMinInfo(v.Source, v.EdbCode)
+		if err != nil {
+			return err
+		}
+		if item != nil {
+			err = data_manage.ModifyEdbInfoMaxAndMinInfo(v.EdbInfoId, item)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return err
+}
+
 func ResetEdbInfoIsUpdate(cont context.Context) (err error) {
 	go data_manage.ResetEdbInfoIsUpdate()
 	return nil

+ 36 - 24
services/task.go

@@ -170,7 +170,7 @@ func OneMinute(cont context.Context) (err error) {
 
 func RefreshData(cont context.Context) (err error) {
 	wg := sync.WaitGroup{}
-	wg.Add(7)
+	wg.Add(8)
 	//wind
 	go data.RefreshDataFromWind(&wg)
 	//同花顺
@@ -185,6 +185,8 @@ func RefreshData(cont context.Context) (err error) {
 	go data.RefreshDataFromYs(&wg)
 	//钢联
 	go data.RefreshDataFromGl(&wg)
+	//路透
+	go data.RefreshDataFromLt(&wg)
 
 	wg.Wait()
 	//计算指标
@@ -340,13 +342,15 @@ func AddEdbTask(cont context.Context) (err error) {
 	nowYearLastDay := utils.GetNowYearLastDay()
 
 	debugNoticeUserId := 0 //测试环境,需要发送消息的用户
-	if utils.RunMode == "debug" {
-		tmpWxUser, tmpErr := models.GetWxUserByMobile("17634786714")
-		if tmpErr == nil && tmpWxUser != nil {
-			//debugNoticeUserId = 44078 //测试环境的话,发送邮箱给颜鹏
-			debugNoticeUserId = int(tmpWxUser.UserId) //测试环境的话,发送邮箱给嘉豪
-		}
-	}
+	//测试环境也不发了
+	//if utils.RunMode == "debug" {
+	//	tmpWxUser, tmpErr := models.GetWxUserByMobile("17634786714")
+	//	if tmpErr == nil && tmpWxUser != nil {
+	//		//debugNoticeUserId = 44078 //测试环境的话,发送邮箱给颜鹏
+	//		debugNoticeUserId = int(tmpWxUser.UserId) //测试环境的话,发送邮箱给嘉豪
+	//	}
+	//}
+
 	//task.globalTaskManager.adminTaskList
 	for _, edb := range list {
 		if edb.UserId <= 0 {
@@ -357,10 +361,10 @@ func AddEdbTask(cont context.Context) (err error) {
 		noticeTime := "12:00:00" //提醒时间
 
 		var dataDtTime time.Time
-		edbData, tmpErr := models.GetLastEdbdataInfo(edb.TradeCode)
+		edbData, tmpErr := models.GetLastEdbdataInfo(tmpEdb.TradeCode)
 		if tmpErr != nil {
 			if tmpErr.Error() != utils.ErrNoRow() {
-				failList = append(failList, fmt.Sprint(edb.TradeCode, "失败,Err:", tmpErr.Error()))
+				failList = append(failList, fmt.Sprint(tmpEdb.TradeCode, "失败,Err:", tmpErr.Error()))
 				continue
 			}
 		}
@@ -371,12 +375,12 @@ func AddEdbTask(cont context.Context) (err error) {
 			dataDtTime = tmpDataDtTime
 		}
 
-		switch edb.Frequency {
+		switch tmpEdb.Frequency {
 		case "周度":
 			modifyDate := nowWeekLastDay //下次更新日期
-			if edb.NoticeTime != "" {
+			if tmpEdb.NoticeTime != "" {
 				addDay := 7
-				noticeArr := strings.Split(edb.NoticeTime, " ")
+				noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
 				if len(noticeArr) >= 2 {
 					noticeTime = noticeArr[1]
 				}
@@ -407,8 +411,8 @@ func AddEdbTask(cont context.Context) (err error) {
 		case "月度":
 			addDay := 0
 			modifyDate := nowMonthLastDay //下次更新日期
-			if edb.NoticeTime != "" {
-				strArr := strings.Split(edb.NoticeTime, "日")
+			if tmpEdb.NoticeTime != "" {
+				strArr := strings.Split(tmpEdb.NoticeTime, "日")
 				if len(strArr) >= 2 {
 					noticeTime = strArr[1]
 				}
@@ -426,8 +430,8 @@ func AddEdbTask(cont context.Context) (err error) {
 			}
 		case "季度":
 			//提醒时间
-			if edb.NoticeTime != "" {
-				noticeArr := strings.Split(edb.NoticeTime, " ")
+			if tmpEdb.NoticeTime != "" {
+				noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
 				if len(noticeArr) >= 2 {
 					noticeTime = noticeArr[1]
 				}
@@ -439,8 +443,8 @@ func AddEdbTask(cont context.Context) (err error) {
 			}
 		case "半年度":
 			//提醒时间
-			if edb.NoticeTime != "" {
-				noticeArr := strings.Split(edb.NoticeTime, " ")
+			if tmpEdb.NoticeTime != "" {
+				noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
 				if len(noticeArr) >= 2 {
 					noticeTime = noticeArr[1]
 				}
@@ -452,8 +456,8 @@ func AddEdbTask(cont context.Context) (err error) {
 			}
 		case "年度":
 			//提醒时间
-			if edb.NoticeTime != "" {
-				noticeArr := strings.Split(edb.NoticeTime, " ")
+			if tmpEdb.NoticeTime != "" {
+				noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
 				if len(noticeArr) >= 2 {
 					noticeTime = noticeArr[1]
 				}
@@ -466,8 +470,8 @@ func AddEdbTask(cont context.Context) (err error) {
 		}
 
 		if isNotice {
-			taskName := "edb_task_" + todayStr + ":" + fmt.Sprint(edb.TradeCode)
-			//fmt.Println(taskName, ";", edb.SecName)
+			taskName := "edb_task_" + todayStr + ":" + fmt.Sprint(tmpEdb.TradeCode)
+			//fmt.Println(taskName, ";", tmpEdb.SecName)
 
 			//定时任务
 			tmpTaskFunc := func(ctx context.Context) (funcErr error) {
@@ -497,8 +501,12 @@ func AddEdbTask(cont context.Context) (err error) {
 					funcDataDtTime = tmpDataDtTime
 				}
 
+				//提示频度文案
+				notifyFrequency := "每日"
+
 				switch tmpEdb.Frequency {
 				case "周度":
+					notifyFrequency = "每周"
 					modifyDate := nowWeekLastDay //下次更新日期
 					if tmpEdb.NoticeTime != "" {
 						addDay := 7
@@ -531,6 +539,7 @@ func AddEdbTask(cont context.Context) (err error) {
 						funcIsNotice = true
 					}
 				case "月度":
+					notifyFrequency = "每月"
 					addDay := 0
 					modifyDate := nowMonthLastDay //下次更新日期
 					if tmpEdb.NoticeTime != "" {
@@ -551,6 +560,7 @@ func AddEdbTask(cont context.Context) (err error) {
 						funcIsNotice = true
 					}
 				case "季度":
+					notifyFrequency = "每季度"
 					//提醒时间
 					if tmpEdb.NoticeTime != "" {
 						noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
@@ -564,6 +574,7 @@ func AddEdbTask(cont context.Context) (err error) {
 						funcIsNotice = true
 					}
 				case "半年度":
+					notifyFrequency = "每半年度"
 					//提醒时间
 					if tmpEdb.NoticeTime != "" {
 						noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
@@ -577,6 +588,7 @@ func AddEdbTask(cont context.Context) (err error) {
 						funcIsNotice = true
 					}
 				case "年度":
+					notifyFrequency = "每年"
 					//提醒时间
 					if tmpEdb.NoticeTime != "" {
 						noticeArr := strings.Split(tmpEdb.NoticeTime, " ")
@@ -643,7 +655,7 @@ func AddEdbTask(cont context.Context) (err error) {
 
 					first := "数据录入提醒"
 					keyword1 := tmpEdb.SecName
-					keyword2 := "每周 " + edb.NoticeTime
+					keyword2 := notifyFrequency + " " + tmpEdb.NoticeTime
 					remark := tmpEdb.SecName + "该更新了"
 
 					err = SendWxMsgWithFrequency(first, keyword1, keyword2, remark, openIdList)

+ 2 - 0
utils/constants.go

@@ -87,6 +87,7 @@ const (
 	DATA_SOURCE_CALCULATE_TIME_SHIFT            //时间移位->22
 	DATA_SOURCE_CALCULATE_ZJPJ                  //直接拼接->23
 	DATA_SOURCE_CALCULATE_LJZTBPJ               //累计值同比拼接->24
+	DATA_SOURCE_LT                              //路透->25
 )
 
 //http://datawind.hzinsights.com:8040/hz_server
@@ -94,6 +95,7 @@ const (
 const (
 	Hz_Data_Url    = "http://datawind.hzinsights.com:8040/" //同花顺,万得接口服务地址
 	Hz_Data_PB_Url = "http://datapb.hzinsights.com:8040/"   //彭博接口地址
+	Hz_Data_LT_Url = "http://dataek.hzinsights.com:8040/"   //路透社接口地址
 )
 
 //数据刷新频率