|
@@ -1,11 +1,16 @@
|
|
|
package services
|
|
|
|
|
|
import (
|
|
|
+ "encoding/json"
|
|
|
+ "errors"
|
|
|
"eta/eta_index_lib/logic"
|
|
|
"eta/eta_index_lib/models"
|
|
|
"eta/eta_index_lib/services/alarm_msg"
|
|
|
"eta/eta_index_lib/utils"
|
|
|
"fmt"
|
|
|
+ "io"
|
|
|
+ "net/http"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
"time"
|
|
|
)
|
|
@@ -27,6 +32,204 @@ func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+func HandleApiMysteelIndex(req *models.HandleMysteelIndexResp) (errMsg string, err error) {
|
|
|
+ addIndexCodeList := make([]string, 0)
|
|
|
+ for _, v := range req.List {
|
|
|
+ if v.IndexCode == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ addIndexCodeList = append(addIndexCodeList, v.IndexCode)
|
|
|
+ }
|
|
|
+ errMsg, err = HandleApiIndex(addIndexCodeList)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ _ = SetMysteelChemicalEdbInfoUpdateStat(false)
|
|
|
+ _ = SetEdbSourceStat(false)
|
|
|
+
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func HandleApiIndex(indexCodes []string) (errMsg string, err error) {
|
|
|
+ if len(indexCodes) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ resp, err := GetEdbDataFromMySteelChemical(indexCodes, utils.GetEdbRefreshStartDate(""), utils.BASE_END_DATE, "desc")
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !resp.Success {
|
|
|
+ errMsg = "获取数据失败"
|
|
|
+ err = errors.New(resp.Message)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexInfoMap, err := GetMySteelChemicalIndexNameMap(indexCodes)
|
|
|
+ if err != nil {
|
|
|
+ errMsg = "获取指标数据失败"
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ indexObj := &models.BaseFromMysteelChemicalIndex{}
|
|
|
+ existIndexs, err := indexObj.GetBatchIndexItem(indexCodes)
|
|
|
+ if err != nil {
|
|
|
+ errMsg = "获取指标数据失败"
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ //获取已存在的所有数据
|
|
|
+ existDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
|
|
|
+ existIndexMap := make(map[string]*models.BaseFromMysteelChemicalIndex)
|
|
|
+ updateDataObj := new(models.BaseFromMysteelChemicalData)
|
|
|
+ for _, v := range existIndexs {
|
|
|
+ // 更新指标的名称,单位和频度等信息
|
|
|
+ if info, ok := indexInfoMap[v.IndexCode]; ok {
|
|
|
+ v.IndexName = info.IndexName
|
|
|
+ v.Unit = info.UnitName
|
|
|
+ v.Frequency = info.FrequencyName
|
|
|
+ v.ModifyTime = time.Now()
|
|
|
+ err = v.Update([]string{"index_name", "unit", "frequency", "modify_time"})
|
|
|
+ if err != nil {
|
|
|
+ errMsg = "更新指标失败"
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ errMsg = "添加指标失败"
|
|
|
+ return
|
|
|
+ }
|
|
|
+ existIndexMap[v.IndexCode] = v
|
|
|
+ exitDataList, er := updateDataObj.GetIndexDataList(v.IndexCode)
|
|
|
+ if er != nil {
|
|
|
+ errMsg = "获取指标数据失败"
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ fmt.Println("exitDataListLen:", len(exitDataList))
|
|
|
+ for _, v := range exitDataList {
|
|
|
+ dateStr := v.DataTime.Format(utils.FormatDate)
|
|
|
+ existDataMap[dateStr] = v
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mysteelChemicalDatas, err := tranformData(resp)
|
|
|
+ if err != nil {
|
|
|
+ errMsg = "转换数据失败"
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var indexErr error
|
|
|
+ var lErr error
|
|
|
+ defer func() {
|
|
|
+ if indexErr != nil {
|
|
|
+ tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error())
|
|
|
+ alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
+ }
|
|
|
+
|
|
|
+ if lErr != nil {
|
|
|
+ tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error())
|
|
|
+ alarm_msg.SendAlarmMsg(tips, 3)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ var hasUpdate bool
|
|
|
+ dataObj := new(models.BaseFromMysteelChemicalData)
|
|
|
+ for _, items := range mysteelChemicalDatas {
|
|
|
+ addItems := make([]*models.BaseFromMysteelChemicalData, 0)
|
|
|
+ for _, v := range items {
|
|
|
+ dateStr := v.DataTime.Format(utils.FormatDate)
|
|
|
+ if findData, ok := existDataMap[dateStr]; !ok {
|
|
|
+ index, ok := existIndexMap[v.IndexCode]
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ v.BaseFromMysteelChemicalIndexId = index.BaseFromMysteelChemicalIndexId
|
|
|
+ addItems = append(addItems, v)
|
|
|
+ } else {
|
|
|
+ if findData != nil && findData.Value != v.Value {
|
|
|
+ dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
|
|
|
+ dataObj.Value = v.Value
|
|
|
+ dataObj.ModifyTime = time.Now()
|
|
|
+
|
|
|
+ err = dataObj.Update([]string{"value", "modify_time"})
|
|
|
+ if err != nil {
|
|
|
+ errMsg = "更新数据失败"
|
|
|
+ return
|
|
|
+ }
|
|
|
+ hasUpdate = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ err = dataObj.AddV2(addItems)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //修改最大最小日期
|
|
|
+ if len(items) <= 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ mysteelIndexMaxItem, er := dataObj.GetMysteelIndexInfoMaxAndMinInfo(items[0].IndexCode)
|
|
|
+ if er == nil && mysteelIndexMaxItem != nil {
|
|
|
+ e := dataObj.ModifyMysteelIndexMaxAndMinInfo(items[0].IndexCode, mysteelIndexMaxItem)
|
|
|
+ if e != nil {
|
|
|
+ fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error())
|
|
|
+ utils.FileLog.Info("修改钢联化工的最大最小日期失败,Err:" + e.Error())
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode)
|
|
|
+ if e != nil && e.Error() != utils.ErrNoRow() {
|
|
|
+ indexErr = e
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if edbInfo != nil {
|
|
|
+ dataUpdateResult := 2
|
|
|
+ dataUpdateFailedReason := "服务异常"
|
|
|
+ _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
|
|
|
+ if logErr != nil {
|
|
|
+ lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 0, 0)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if hasUpdate {
|
|
|
+ dataUpdateResult = 1
|
|
|
+ dataUpdateFailedReason = ""
|
|
|
+ } else {
|
|
|
+ dataUpdateFailedReason = "未刷新到数据"
|
|
|
+ }
|
|
|
+
|
|
|
+ // 添加刷新成功日志
|
|
|
+ lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 0, 0)
|
|
|
+ if lErr != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func tranformData(dataResp *models.MySteelChemicalApiResp) (items [][]*models.BaseFromMysteelChemicalData, err error) {
|
|
|
+ for _, v := range dataResp.Data {
|
|
|
+ tmpDataItems := make([]*models.BaseFromMysteelChemicalData, 0)
|
|
|
+ for _, vv := range v.DataList {
|
|
|
+ tmpData := new(models.BaseFromMysteelChemicalData)
|
|
|
+ tmpData.IndexCode = vv.IndexCode
|
|
|
+ dataDate, er := time.Parse(utils.FormatDate, vv.DataDate)
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ tmpData.DataTime = dataDate
|
|
|
+ tmpData.Value = strconv.Itoa(vv.DataValue)
|
|
|
+ tmpData.CreateTime = time.Now()
|
|
|
+ tmpData.ModifyTime = time.Now()
|
|
|
+ tmpDataItems = append(tmpDataItems, tmpData)
|
|
|
+ }
|
|
|
+ items = append(items, tmpDataItems)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
|
|
|
defer func() {
|
|
|
if err != nil {
|
|
@@ -286,3 +489,105 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
|
|
|
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
+type MySteelChemicalApiDataBody struct {
|
|
|
+ IndexCodes []string `json:"indexCodes"`
|
|
|
+ StartTime string `json:"startTime"`
|
|
|
+ EndTime string `json:"endTime"`
|
|
|
+ Order string `json:"order"`
|
|
|
+}
|
|
|
+
|
|
|
+type MySteelChemicalApiInfoBody struct {
|
|
|
+ PageNum int `json:"pageNum"`
|
|
|
+ PageSize int `json:"pageSize"`
|
|
|
+ IncludeInfo bool `json:"includeInfo"`
|
|
|
+}
|
|
|
+
|
|
|
+func GetEdbDataFromMySteelChemical(indexCodes []string, startTime, endTime, order string) (item *models.MySteelChemicalApiResp, err error) {
|
|
|
+ // 如果没有配置,获取配置的方式是api,那么就走官方接口
|
|
|
+ if utils.MsClRefreshToken == "" {
|
|
|
+ err = errors.New("钢联接口token为配置")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ m := new(MySteelChemicalApiDataBody)
|
|
|
+ m.IndexCodes = indexCodes
|
|
|
+ m.StartTime = startTime
|
|
|
+ m.EndTime = endTime
|
|
|
+ m.Order = order
|
|
|
+ postData, er := json.Marshal(m)
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
|
|
|
+ body, err := MySteelChemicalPost(postUrl, "data", postData)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err = json.Unmarshal(body, &item)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func GetMySteelChemicalIndexNameMap(indexCodes []string) (indexNameMap map[string]*models.MySteelChemicalApiInfoItem, err error) {
|
|
|
+ // 如果没有配置,获取配置的方式是api,那么就走官方接口
|
|
|
+ if utils.MsClRefreshToken == "" {
|
|
|
+ err = errors.New("钢联接口token为配置")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ m := new(MySteelChemicalApiInfoBody)
|
|
|
+ m.PageNum = 1
|
|
|
+ m.PageSize = 100 // 看官方api最多也就十几条指标,先固定设置100应该足够了
|
|
|
+ m.IncludeInfo = true
|
|
|
+ postData, er := json.Marshal(m)
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
|
|
|
+ body, err := MySteelChemicalPost(postUrl, "info", postData)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var item *models.MySteelChemicalApiInfoResp
|
|
|
+ err = json.Unmarshal(body, &item)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !item.Success {
|
|
|
+ err = errors.New(item.Message)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexNameMap = make(map[string]*models.MySteelChemicalApiInfoItem)
|
|
|
+ for _, v := range item.Data.List {
|
|
|
+ indexNameMap[v.IndexCode] = v
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func MySteelChemicalPost(postUrl, hType string, postData []byte) (body []byte, err error) {
|
|
|
+ req, er := http.NewRequest(`POST`, postUrl, strings.NewReader(string(postData)))
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ req.Header.Set(`Content-Type`, `application/json`)
|
|
|
+ req.Header.Set(`accessTokenSign`, utils.MsClRefreshToken)
|
|
|
+ req.Header.Set(`infoOrData`, hType)
|
|
|
+
|
|
|
+ client := &http.Client{}
|
|
|
+ resp, er := client.Do(req)
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer resp.Body.Close()
|
|
|
+ body, err = io.ReadAll(resp.Body)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|