zwxi 1 year ago
parent
commit
3cb755cd79
4 changed files with 400 additions and 100 deletions
  1. 78 14
      controllers/base_from_smm.go
  2. 154 1
      models/base_from_smm.go
  3. 1 1
      services/base_from_smm.go
  4. 167 84
      services/base_from_smm_http.go

+ 78 - 14
controllers/base_from_smm.go

@@ -7,6 +7,7 @@ import (
 	"eta/eta_index_lib/services"
 	"eta/eta_index_lib/utils"
 	"fmt"
+	"math"
 	"strconv"
 	"time"
 )
@@ -120,12 +121,23 @@ func (this *SmmController) Refresh() {
 	if req.EdbInfoId <= 0 {
 		req.EdbInfoId = edbInfo.EdbInfoId
 	}
-	err = models.RefreshEdbDataFromSmm(req.EdbInfoId, req.EdbCode, req.StartDate)
-	if err != nil && err.Error() != utils.ErrNoRow() {
-		br.Msg = "刷新指标信息失败!"
-		br.ErrMsg = "刷新指标信息失败 RefreshEdbDataFromSmm,Err:" + err.Error()
-		return
+	if utils.BusinessCode == "E2023110300" {
+		// 中基宁波走API更新
+		err = services.GetSmmIndexLatestFromBridge(req.EdbInfoId, req.EdbCode, req.StartDate)
+		if err != nil && err.Error() != utils.ErrNoRow() {
+			br.Msg = "刷新指标信息失败!"
+			br.ErrMsg = "刷新指标信息失败 RefreshEdbDataFromSmm,Err:" + err.Error()
+			return
+		}
+	} else {
+		err = models.RefreshEdbDataFromSmm(req.EdbInfoId, req.EdbCode, req.StartDate)
+		if err != nil && err.Error() != utils.ErrNoRow() {
+			br.Msg = "刷新指标信息失败!"
+			br.ErrMsg = "刷新指标信息失败 RefreshEdbDataFromSmm,Err:" + err.Error()
+			return
+		}
 	}
+
 	// 更新指标最大最小值
 	err, errMsg := models.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
 	if err != nil {
@@ -275,24 +287,76 @@ func (this *SmmController) List() {
 		this.ServeJSON()
 	}()
 
+	list, err := models.GetBaseFromSmmIndex()
+	if err!= nil {
+		br.Msg = "获取指标失败"
+		br.ErrMsg = "获取指标失败, Err: " + err.Error()
+		return
+	}
+
+	indexCodeMap := make(map[string]*models.BaseFromSmmIndex)
+	for _, item := range list {
+		indexCodeMap[item.IndexCode] = item
+	}
+
 	// 从桥接服务获取指标和数据
 	var params models.BridgeZhongJiIndexListParams
 	params.Page = 1
 	params.PageSize = 500
-
-	indexList, e := services.GetSmmNewIndexListFromBridge(params)
-	if e != nil {
+	total, err := services.GetSmmNewIndexListTotalFromBridge(params)
+	if err!= nil {
 		br.Msg = "获取指标失败"
-		br.ErrMsg = "获取指标数据失败, Err: " + e.Error()
+		br.ErrMsg = "获取指标数据失败, Err: " + err.Error()
 		return
 	}
-
-	_,err := models.AddBaseFromSmmIndex(indexList)
-	if err!= nil {
-		br.Msg = "新增指标失败"
-		br.ErrMsg = "新增指标失败,Err:" + err.Error()
+	if total <= 0 {
+		br.Msg = "没有指标数据"
+		br.ErrMsg = "没有指标数据"
 		return
 	}
+	var addList []*models.BaseFromSmmIndex
+	for i := 1; i <= int(math.Ceil(float64(total)/float64(params.PageSize))); i++ {
+		params.Page = i
+		indexList, err := services.GetSmmNewIndexListFromBridge(params)
+		if err!= nil {
+			br.Msg = "获取指标失败"
+			br.ErrMsg = "获取指标数据失败, Err: " + err.Error()
+		}
+		for _, item := range indexList {
+			if _, ok := indexCodeMap[item.IndexCode]; !ok {
+				addList = append(addList, item)
+				indexCodeMap[item.IndexCode] = item
+			} else {
+				if indexCodeMap[item.IndexCode].DataState != item.DataState || indexCodeMap[item.IndexCode].EndDate != item.EndDate {
+					err = models.ModifyBaseFromSmmIndex(item)
+					if err != nil {
+						br.Msg = "更新指标失败"
+						br.ErrMsg = "更新指标失败,Err:" + err.Error()
+						return
+					}
+				}
+			}
+		}
+
+		if len(addList) > 900 {
+			_,err = models.AddBaseFromSmmIndex(addList)
+			if err!= nil {
+				br.Msg = "新增指标失败"
+				br.ErrMsg = "新增指标失败,Err:" + err.Error()
+				return
+			}
+		}
+	}
+
+	if len(addList) > 0 {
+		_,err = models.AddBaseFromSmmIndex(addList)
+		if err!= nil {
+			br.Msg = "新增指标失败"
+			br.ErrMsg = "新增指标失败,Err:" + err.Error()
+			return
+		}
+	}
+
 
 	br.Ret = 200
 	br.Success = true

+ 154 - 1
models/base_from_smm.go

@@ -405,4 +405,157 @@ func AddBaseFromSmmIndex(list []*BaseFromSmmIndex) (lastId int64, err error) {
 	o := orm.NewOrm()
 	_, err = o.InsertMulti(len(list), list)
 	return
-}
+}
+
+// BridgeZhongJiIndexLatestDataParams 桥接服务-获取中基更新指标数据入参
+type BridgeZhongJiIndexLatestDataParams struct {
+	EdbCode  string `json:"edb_code" form:"edb_code" description:"指标编码"`
+	LastTime int64 `json:"last_time" form:"last_time" description:"上次更新时间戳(秒)"`
+}
+
+func GetBaseFromSmmIndex() (list []*BaseFromSmmIndex, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT * FROM base_from_smm_index`
+	_, err = o.Raw(sql).QueryRows(&list)
+	return
+}
+
+func ModifyBaseFromSmmIndex(item *BaseFromSmmIndex) (err error) {
+	o := orm.NewOrm()
+	sql := ` UPDATE base_from_smm_index SET end_date = ?, modify_time=NOW(), data_state= ? `
+	_, err = o.Raw(sql, item.EndDate, item.DataState).Exec()
+	return
+}
+
+type SmmLatestDataResponse struct {
+	Code int           `json:"code"`
+	Msg  string        `json:"msg"`
+	Data SmmLatestResp `json:"data"`
+}
+
+type SmmLatestResp struct {
+	Code int           `json:"code"`
+	Msg  string        `json:"msg"`
+	Data SmmLatestData `json:"data"`
+}
+
+type SmmLatestData struct {
+	DataLen  int     `json:"data_len"`
+	DataList []Datum `json:"data_list"`
+}
+
+type Datum struct {
+	QuotaID string `json:"quota_id"`
+	Date    string `json:"date"`
+	Value   string `json:"value"`
+	Mark    string `json:"mark"`
+	Create  string `json:"create"`
+	Update  string `json:"update"`
+}
+
+// RefreshEdbDataFromSmmToEdb 刷新有色指标数据
+func RefreshEdbDataFromSmmToEdb(edbInfoId int, edbCode, startDate string, smmDataList []*BaseFromSmmData) (err error) {
+	source := utils.DATA_SOURCE_YS
+	subSource := utils.DATA_SUB_SOURCE_EDB
+
+	o := orm.NewOrm()
+	if err != nil {
+		return
+	}
+	edbInfoIdStr := strconv.Itoa(edbInfoId)
+	//计算数据
+	var condition string
+	var pars []interface{}
+
+	if edbCode != "" {
+		condition += " AND index_code=? "
+		pars = append(pars, edbCode)
+	}
+
+	if startDate != "" {
+		condition += " AND data_time>=? "
+		pars = append(pars, startDate)
+	}
+
+	// 真实数据的最大日期  , 插入规则配置的日期
+	var realDataMaxDate, edbDataInsertConfigDate time.Time
+	var edbDataInsertConfig *EdbDataInsertConfig
+	var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
+	{
+		edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
+		if err != nil && err.Error() != utils.ErrNoRow() {
+			return
+		}
+		if edbDataInsertConfig != nil {
+			edbDataInsertConfigDate = edbDataInsertConfig.Date
+		}
+	}
+
+	var existCondition string
+	var existPars []interface{}
+
+	existCondition += " AND edb_info_id=? "
+	existPars = append(existPars, edbInfoId)
+	if startDate != "" {
+		existCondition += " AND data_time>=? "
+		existPars = append(existPars, startDate)
+	}
+
+	existList, err := GetEdbDataByCondition(source, subSource, existCondition, existPars)
+	if err != nil {
+		return err
+	}
+	existMap := make(map[string]*EdbInfoSearchData)
+	for _, v := range existList {
+		existMap[v.DataTime] = v
+	}
+	addSql := ` INSERT INTO edb_data_ys(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
+	var isAdd bool
+	for _, v := range smmDataList {
+		item := v
+		eDate := item.DataTime
+		dataTime, err := time.ParseInLocation(utils.FormatDate, eDate, time.Local)
+		if err != nil {
+			return err
+		}
+		if findItem, ok := existMap[v.DataTime]; !ok {
+			sValue := item.Value
+
+			timestamp := dataTime.UnixNano() / 1e6
+			timeStr := fmt.Sprintf("%d", timestamp)
+
+			addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, sValue)
+			isAdd = true
+		} else {
+			if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != item.Value {
+				err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, item.Value)
+				if err != nil {
+					return err
+				}
+			}
+		}
+
+		// 下面代码主要目的是处理掉手动插入的数据判断
+		{
+			if realDataMaxDate.IsZero() || dataTime.After(realDataMaxDate) {
+				realDataMaxDate = dataTime
+			}
+			if edbDataInsertConfigDate.IsZero() || dataTime.Equal(edbDataInsertConfigDate) {
+				isFindConfigDateRealData = true
+			}
+		}
+	}
+
+	// 处理手工数据补充的配置
+	HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
+
+	if isAdd {
+		addSql = strings.TrimRight(addSql, ",")
+		_, err = o.Raw(addSql).Exec()
+		if err != nil {
+			fmt.Println("RefreshEdbDataFromSmm add Err", err.Error())
+			return
+		}
+	}
+	return
+}

+ 1 - 1
services/base_from_smm.go

@@ -220,7 +220,7 @@ func SmmIndexHandle(baseFilePath, renameFilePath, indexName, indexCode, unit, fr
 }
 
 func GetEdbDataFromSmm(edbCode string) (smmBaseDataAll []models.BaseFromSmmDataList, err error) {
-	if utils.BusinessCode == "E2023110300" || utils.BusinessCode == "E2023080900" {
+	if utils.BusinessCode == "E2023110300" {
 		return GetSmmNewIndexFromBridge(edbCode)
 	}
 

+ 167 - 84
services/base_from_smm_http.go

@@ -13,8 +13,9 @@ import (
 )
 
 var (
-	BridgeApiZhongJiIndexDataUrl = "/api/index_data/zhongji/smm/data" // 获取指标数据API
-	BridgeApiZhongJiIndexListUrl  = "/api/index_data/zhongji/smm/list"  // 获取指标列表API
+	BridgeApiZhongJiIndexDataUrl   = "/api/index_data/zhongji/smm/data"   // 获取指标数据API
+	BridgeApiZhongJiIndexListUrl   = "/api/index_data/zhongji/smm/list"   // 获取指标列表API
+	BridgeApiZhongJiIndexLatestUrl = "/api/index_data/zhongji/smm/latest" // 获取指标最新数据API
 )
 
 // Data is a struct that represents the data field in the JSON string
@@ -59,86 +60,6 @@ type SmmTokenData struct {
 	Token string `json:"token"`
 }
 
-// getSmmAccessToken token内部请求接口
-//func getSmmAccessToken() (tokenData SmmTokenData, err error) {
-//	defer func() {
-//		if err != nil {
-//			go alarm_msg.SendAlarmMsg("更新SMM的token失败;ERR:"+err.Error(), 3)
-//		}
-//	}()
-//
-//	url := "https://datapro-api.smm.cn/dapi/user/auth"
-//
-//	// 准备Form-data参数
-//	values := map[string]io.Reader{
-//		"user_name": strings.NewReader("18815284804"),
-//		"password":  strings.NewReader(utils.MD5("huangcf@cbnb.com.cn")),
-//		"source":    strings.NewReader("datapro"),
-//	}
-//
-//	// 创建一个新的buffer,用于存储请求体
-//	var body bytes.Buffer
-//	writer := multipart.NewWriter(&body)
-//
-//	// 添加Form-data参数
-//	for key, value := range values {
-//		part, err := writer.CreateFormFile(key, key)
-//		if err != nil {
-//			fmt.Println("Error creating form file:", err)
-//			return
-//		}
-//		_, err = io.Copy(part, value)
-//		if err != nil {
-//			fmt.Println("Error copying file content:", err)
-//			return
-//		}
-//	}
-//
-//	// 关闭multipart writer
-//	writer.Close()
-//
-//	// 发送POST请求
-//	req, err := netHttp.NewRequest("POST", url, &body)
-//	if err != nil {
-//		fmt.Println("Error creating request:", err)
-//		return
-//	}
-//
-//	req.Header.Set("Content-Type", writer.FormDataContentType())
-//
-//	client := &netHttp.Client{}
-//	resp, err := client.Do(req)
-//	if err != nil {
-//		fmt.Println("Error sending request:", err)
-//		return
-//	}
-//	defer resp.Body.Close()
-//
-//	// 处理响应
-//	fmt.Println("Response Status:", resp.Status)
-//
-//	bodyContent, err := ioutil.ReadAll(resp.Body)
-//	if err != nil {
-//		fmt.Println("Error reading response body:", err)
-//		return
-//	}
-//	utils.FileLog.Info("SMM刷新token:" + string(bodyContent))
-//
-//	var tokenResp GetSmmTokenResp
-//	err = json.Unmarshal(bodyContent, &tokenResp)
-//	if err != nil {
-//		err = errors.New("Unmarshal Err:" + err.Error())
-//		return
-//	}
-//	if tokenResp.Code != 0 {
-//		err = errors.New("getAccessToken err:" + tokenResp.Msg)
-//		return
-//	}
-//	tokenData = tokenResp.Data
-//
-//	return
-//}
-
 // GetSmmNewIndexFromBridge 从桥接服务获取指标
 func GetSmmNewIndexFromBridge(edbCode string) (indexDataList []models.BaseFromSmmDataList, err error) {
 	defer func() {
@@ -316,7 +237,7 @@ func GetSmmNewIndexListFromBridge(params models.BridgeZhongJiIndexListParams) (i
 		for i, typeItem := range typeList {
 			if i == 0 {
 				item.Type1 = typeItem
-			} else if i == 1{
+			} else if i == 1 {
 				item.Type2 = typeItem
 			} else if i == 2 {
 				item.Type3 = typeItem
@@ -326,4 +247,166 @@ func GetSmmNewIndexListFromBridge(params models.BridgeZhongJiIndexListParams) (i
 	}
 
 	return
-}
+}
+
+// GetSmmNewIndexListTotalFromBridge 从桥接服务获取指标列表总数
+func GetSmmNewIndexListTotalFromBridge(params models.BridgeZhongJiIndexListParams) (total int, err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("桥接服务-获取SMM指标失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiZhongJiIndexListUrl)
+
+	data, e := json.Marshal(params)
+	if e != nil {
+		err = fmt.Errorf("data json marshal err: %s", e.Error())
+		return
+	}
+	body := ioutil.NopCloser(strings.NewReader(string(data)))
+	client := &netHttp.Client{}
+	req, e := netHttp.NewRequest("POST", url, body)
+	if e != nil {
+		err = fmt.Errorf("http create request err: %s", e.Error())
+		return
+	}
+
+	checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
+	contentType := "application/json;charset=utf-8"
+	req.Header.Set("Content-Type", contentType)
+	req.Header.Set("Authorization", checkToken)
+	resp, e := client.Do(req)
+	if e != nil {
+		err = fmt.Errorf("http client do err: %s", e.Error())
+		return
+	}
+	defer func() {
+		_ = resp.Body.Close()
+	}()
+	b, e := ioutil.ReadAll(resp.Body)
+	if e != nil {
+		err = fmt.Errorf("resp body read err: %s", e.Error())
+		return
+	}
+	if len(b) == 0 {
+		err = fmt.Errorf("resp body is empty")
+		return
+	}
+	// 生产环境解密
+	if utils.RunMode == "release" {
+		str := string(b)
+		str = strings.Trim(str, `"`)
+		b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
+	}
+
+	result := new(models.SmmListResp)
+	if e = json.Unmarshal(b, &result); e != nil {
+		err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
+		return
+	}
+	if result.Code != 200 {
+		err = fmt.Errorf("result: %s", string(b))
+		return
+	}
+
+	total = result.Data.Data.Total
+	return
+}
+
+// GetSmmIndexLatestFromBridge 从桥接服务获取指标更新
+func GetSmmIndexLatestFromBridge(edbInfoId int, edbCode, startDate string) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("桥接服务-获取SMM指标失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiZhongJiIndexLatestUrl)
+
+	startDateTime, err := time.Parse(utils.FormatDate, startDate)
+	if err != nil {
+		err = fmt.Errorf("startDateTime parse err: %s", err.Error())
+		return
+	}
+	timeStamp := startDateTime.UnixNano() / 1e6
+
+	param := models.BridgeZhongJiIndexLatestDataParams{
+		EdbCode:  edbCode,
+		LastTime: timeStamp,
+	}
+	data, e := json.Marshal(param)
+	if e != nil {
+		err = fmt.Errorf("data json marshal err: %s", e.Error())
+		return
+	}
+	body := ioutil.NopCloser(strings.NewReader(string(data)))
+	client := &netHttp.Client{}
+	req, e := netHttp.NewRequest("POST", url, body)
+	if e != nil {
+		err = fmt.Errorf("http create request err: %s", e.Error())
+		return
+	}
+
+	checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
+	contentType := "application/json;charset=utf-8"
+	req.Header.Set("Content-Type", contentType)
+	req.Header.Set("Authorization", checkToken)
+	resp, e := client.Do(req)
+	if e != nil {
+		err = fmt.Errorf("http client do err: %s", e.Error())
+		return
+	}
+	defer func() {
+		_ = resp.Body.Close()
+	}()
+	b, e := ioutil.ReadAll(resp.Body)
+	if e != nil {
+		err = fmt.Errorf("resp body read err: %s", e.Error())
+		return
+	}
+	if len(b) == 0 {
+		err = fmt.Errorf("resp body is empty")
+		return
+	}
+	// 生产环境解密
+	if utils.RunMode == "release" {
+		str := string(b)
+		str = strings.Trim(str, `"`)
+		b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
+	}
+
+	result := new(models.SmmLatestDataResponse)
+	if e = json.Unmarshal(b, &result); e != nil {
+		err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
+		return
+	}
+	if result.Code != 200 {
+		err = fmt.Errorf("result: %s", string(b))
+		return
+	}
+	smmDataList := make([]*models.BaseFromSmmData, 0)
+
+	for _, v := range result.Data.Data.DataList {
+		item := new(models.BaseFromSmmData)
+		item.BaseFromSmmIndexId = edbInfoId
+		item.IndexCode = v.QuotaID
+		item.DataTime = v.Date
+		item.Value = v.Value
+		item.CreateTime = time.Now()
+		item.ModifyTime = time.Now()
+
+		timeDate, _ := time.Parse(utils.FormatDateTime, v.Date)
+		timestamp := timeDate.UnixNano() / 1e6
+		item.DataTimestamp = timestamp
+
+		smmDataList = append(smmDataList, item)
+	}
+
+	err = models.RefreshEdbDataFromSmmToEdb(edbInfoId, edbCode, startDate, smmDataList)
+	return
+}