package services import ( "encoding/json" "errors" "eta/eta_index_lib/logic" "eta/eta_index_lib/models" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "io" "net/http" "strconv" "strings" "time" ) func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) { for _, v := range req.List { if v.IndexName == "" || v.IndexCode == "" { continue } err = handleIndex(v) if err != nil { return } } _ = SetMysteelChemicalEdbInfoUpdateStat(false) _ = SetEdbSourceStat(false) return } func HandleApiMysteelIndex(req *models.HandleMysteelIndexResp) (errMsg string, err error) { addIndexCodeList := make([]string, 0) for _, v := range req.List { if v.IndexCode == "" { continue } addIndexCodeList = append(addIndexCodeList, v.IndexCode) } errMsg, err = HandleApiIndex(addIndexCodeList) if err != nil { return } _ = SetMysteelChemicalEdbInfoUpdateStat(false) _ = SetEdbSourceStat(false) return } func HandleApiIndex(indexCodes []string) (errMsg string, err error) { if len(indexCodes) == 0 { return } resp, err := GetEdbDataFromMySteelChemical(indexCodes, utils.GetEdbRefreshStartDate(""), utils.BASE_END_DATE, "desc") if err != nil { return } if !resp.Success { errMsg = "获取数据失败" err = errors.New(resp.Message) return } indexInfoMap, err := GetMySteelChemicalIndexNameMap(indexCodes) if err != nil { errMsg = "获取指标数据失败" return } indexObj := &models.BaseFromMysteelChemicalIndex{} existIndexs, err := indexObj.GetBatchIndexItem(indexCodes) if err != nil { errMsg = "获取指标数据失败" return } //获取已存在的所有数据 existDataMap := make(map[string]*models.BaseFromMysteelChemicalData) existIndexMap := make(map[string]*models.BaseFromMysteelChemicalIndex) updateDataObj := new(models.BaseFromMysteelChemicalData) for _, v := range existIndexs { // 更新指标的名称,单位和频度等信息 if info, ok := indexInfoMap[v.IndexCode]; ok { v.IndexName = info.IndexName v.Unit = info.UnitName v.Frequency = info.FrequencyName v.ModifyTime = time.Now() err = v.Update([]string{"index_name", "unit", "frequency", "modify_time"}) if err != nil { errMsg = "更新指标失败" return } } if err != nil { errMsg = "添加指标失败" return } existIndexMap[v.IndexCode] = v exitDataList, er := updateDataObj.GetIndexDataList(v.IndexCode) if er != nil { errMsg = "获取指标数据失败" err = er return } fmt.Println("exitDataListLen:", len(exitDataList)) for _, v := range exitDataList { dateStr := v.DataTime.Format(utils.FormatDate) existDataMap[dateStr] = v } } mysteelChemicalDatas, err := tranformData(resp) if err != nil { errMsg = "转换数据失败" return } var indexErr error var lErr error defer func() { if indexErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } if lErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } }() var hasUpdate bool dataObj := new(models.BaseFromMysteelChemicalData) for _, items := range mysteelChemicalDatas { addItems := make([]*models.BaseFromMysteelChemicalData, 0) for _, v := range items { dateStr := v.DataTime.Format(utils.FormatDate) if findData, ok := existDataMap[dateStr]; !ok { index, ok := existIndexMap[v.IndexCode] if !ok { continue } v.BaseFromMysteelChemicalIndexId = index.BaseFromMysteelChemicalIndexId addItems = append(addItems, v) } else { if findData != nil && findData.Value != v.Value { dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId dataObj.Value = v.Value dataObj.ModifyTime = time.Now() err = dataObj.Update([]string{"value", "modify_time"}) if err != nil { errMsg = "更新数据失败" return } hasUpdate = true } } } err = dataObj.AddV2(addItems) if err != nil { return } //修改最大最小日期 if len(items) <= 0 { continue } mysteelIndexMaxItem, er := dataObj.GetMysteelIndexInfoMaxAndMinInfo(items[0].IndexCode) if er == nil && mysteelIndexMaxItem != nil { e := dataObj.ModifyMysteelIndexMaxAndMinInfo(items[0].IndexCode, mysteelIndexMaxItem) if e != nil { fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error()) utils.FileLog.Info("修改钢联化工的最大最小日期失败,Err:" + e.Error()) } } edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { indexErr = e return } if edbInfo != nil { dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``) if logErr != nil { lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 0, 0) return } if hasUpdate { dataUpdateResult = 1 dataUpdateFailedReason = "" } else { dataUpdateFailedReason = "未刷新到数据" } // 添加刷新成功日志 lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 0, 0) if lErr != nil { return } } return } return } func tranformData(dataResp *models.MySteelChemicalApiResp) (items [][]*models.BaseFromMysteelChemicalData, err error) { for _, v := range dataResp.Data { tmpDataItems := make([]*models.BaseFromMysteelChemicalData, 0) for _, vv := range v.DataList { tmpData := new(models.BaseFromMysteelChemicalData) tmpData.IndexCode = vv.IndexCode dataDate, er := time.Parse(utils.FormatDate, vv.DataDate) if er != nil { err = er return } tmpData.DataTime = dataDate tmpData.Value = strconv.Itoa(vv.DataValue) tmpData.CreateTime = time.Now() tmpData.ModifyTime = time.Now() tmpDataItems = append(tmpDataItems, tmpData) } items = append(items, tmpDataItems) } return } func handleIndex(indexItem *models.HandleMysteelIndex) (err error) { defer func() { if err != nil { // 添加刷新失败日志 dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexItem.IndexCode) if e == nil { //查询指标存在,才添加刷新日志 _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) } } }() indexObj := new(models.BaseFromMysteelChemicalIndex) var indexId int64 addDataList := make([]models.BaseFromMysteelChemicalData, 0) exitDataMap := make(map[string]*models.BaseFromMysteelChemicalData) //判断指标是否存在 var isAdd int item, err := indexObj.GetIndexItem(indexItem.IndexCode) if err != nil { if err.Error() == utils.ErrNoRow() { isAdd = 1 } else { isAdd = -1 return } } nameChange := false if item != nil && item.BaseFromMysteelChemicalIndexId > 0 { isAdd = 2 if item.IndexName != indexItem.IndexName { nameChange = true } } else { isAdd = 1 } fmt.Println("isAdd:", isAdd) if !strings.Contains(indexItem.Frequency, "度") { indexItem.Frequency = indexItem.Frequency + "度" } if isAdd == 1 { indexObj.IndexCode = indexItem.IndexCode indexObj.IndexName = indexItem.IndexName indexObj.Unit = indexItem.Unit indexObj.Source = indexItem.Source indexObj.Describe = indexItem.Describe indexObj.StartDate, err = utils.DealExcelDate(indexItem.StartDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.StartDate + err.Error()) return } indexObj.EndDate, err = utils.DealExcelDate(indexItem.EndDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.EndDate + err.Error()) return } indexObj.Frequency = indexItem.Frequency //indexObj.CreateTime = time.Now().Local() //indexObj.ModifyTime = time.Now().Local() err = indexObj.Add() if err != nil { fmt.Println("add err:" + err.Error()) return } indexId = indexObj.BaseFromMysteelChemicalIndexId } else if isAdd == 2 { indexObj.BaseFromMysteelChemicalIndexId = item.BaseFromMysteelChemicalIndexId indexObj.IndexCode = indexItem.IndexCode indexObj.IndexName = indexItem.IndexName indexObj.Unit = indexItem.Unit indexObj.Source = indexItem.Source indexObj.Describe = indexItem.Describe indexObj.StartDate, err = utils.DealExcelDate(indexItem.StartDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.StartDate + err.Error()) return } indexObj.EndDate, err = utils.DealExcelDate(indexItem.EndDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.EndDate + err.Error()) return } indexObj.Frequency = indexItem.Frequency indexObj.ModifyTime = time.Now() indexId = item.BaseFromMysteelChemicalIndexId var isStop int if strings.Contains(indexItem.IndexName, "停") { isStop = 1 } indexObj.IsStop = isStop //修改数据 updateColsArr := make([]string, 0) updateColsArr = append(updateColsArr, "index_name") updateColsArr = append(updateColsArr, "unit") updateColsArr = append(updateColsArr, "source") updateColsArr = append(updateColsArr, "frequency") updateColsArr = append(updateColsArr, "start_date") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "describe") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "is_stop") updateColsArr = append(updateColsArr, "modify_time") e := indexObj.Update(updateColsArr) if e != nil { fmt.Println("Index Update Err:" + e.Error()) return } dataObj := new(models.BaseFromMysteelChemicalData) //获取已存在的所有数据 exitDataList, err := dataObj.GetIndexDataList(indexItem.IndexCode) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return err } fmt.Println("exitDataListLen:", len(exitDataList)) for _, v := range exitDataList { dateStr := v.DataTime.Format(utils.FormatDate) exitDataMap[dateStr] = v } } dataObj := new(models.BaseFromMysteelChemicalData) var hasUpdate bool // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for date, value := range indexItem.ExcelDataMap { dateTime, err := utils.DealExcelDate(date) if err != nil { fmt.Println("time.ParseInLocation Err:" + err.Error()) return err } date = dateTime.Format(utils.FormatDate) if findData, ok := exitDataMap[date]; !ok { if !strings.Contains(value, "#N/A") { dataItem := new(models.BaseFromMysteelChemicalData) dataItem.BaseFromMysteelChemicalIndexId = indexId dataItem.IndexCode = indexItem.IndexCode dataItem.DataTime = dateTime dataItem.Value = value dataItem.UpdateDate = indexItem.UpdateDate dataItem.CreateTime = time.Now() dataItem.ModifyTime = time.Now() addDataList = append(addDataList, *dataItem) } } else { if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据 dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId dataObj.Value = value dataObj.ModifyTime = time.Now() updateDataColsArr := make([]string, 0) updateDataColsArr = append(updateDataColsArr, "value") updateDataColsArr = append(updateDataColsArr, "modify_time") dataObj.Update(updateDataColsArr) hasUpdate = true } } } if len(addDataList) > 0 { err = dataObj.Add(addDataList) if err != nil { fmt.Println("dataObj.Add() Err:" + err.Error()) return } hasUpdate = true } //修改最大最小日期 mysteelIndexMaxItem, err := dataObj.GetMysteelIndexInfoMaxAndMinInfo(indexItem.IndexCode) if err == nil && mysteelIndexMaxItem != nil { e := dataObj.ModifyMysteelIndexMaxAndMinInfo(indexItem.IndexCode, mysteelIndexMaxItem) if e != nil { fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error()) } } // 同步刷新图库钢联的指标 //go func() { var indexErr error var lErr error defer func() { if indexErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } if lErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } }() edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { indexErr = e return } if edbInfo != nil { dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``) if logErr != nil { lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) return } if hasUpdate { dataUpdateResult = 1 dataUpdateFailedReason = "" } else { dataUpdateFailedReason = "未刷新到数据" } // 添加刷新成功日志 lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 1, 0) if lErr != nil { return } //如果变更了指标名称,则添加指标信息变更日志 if nameChange { edbInfo.SourceIndexName = indexItem.IndexName lErr = edbInfo.Update([]string{"SourceIndexName"}) if lErr != nil { return } lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 0, "", 0, 1) } } //}() return } type MySteelChemicalApiDataBody struct { IndexCodes []string `json:"indexCodes"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Order string `json:"order"` } type MySteelChemicalApiInfoBody struct { PageNum int `json:"pageNum"` PageSize int `json:"pageSize"` IncludeInfo bool `json:"includeInfo"` } func GetEdbDataFromMySteelChemical(indexCodes []string, startTime, endTime, order string) (item *models.MySteelChemicalApiResp, err error) { // 如果没有配置,获取配置的方式是api,那么就走官方接口 if utils.MsClRefreshToken == "" { err = errors.New("钢联接口token为配置") return } m := new(MySteelChemicalApiDataBody) m.IndexCodes = indexCodes m.StartTime = startTime m.EndTime = endTime m.Order = order postData, er := json.Marshal(m) if er != nil { err = er return } postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA` body, err := MySteelChemicalPost(postUrl, "data", postData) if err != nil { return } err = json.Unmarshal(body, &item) if err != nil { return } return } func GetMySteelChemicalIndexNameMap(indexCodes []string) (indexNameMap map[string]*models.MySteelChemicalApiInfoItem, err error) { // 如果没有配置,获取配置的方式是api,那么就走官方接口 if utils.MsClRefreshToken == "" { err = errors.New("钢联接口token为配置") return } m := new(MySteelChemicalApiInfoBody) m.PageNum = 1 m.PageSize = 100 // 看官方api最多也就十几条指标,先固定设置100应该足够了 m.IncludeInfo = true postData, er := json.Marshal(m) if er != nil { err = er return } postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA` body, err := MySteelChemicalPost(postUrl, "info", postData) if err != nil { return } var item *models.MySteelChemicalApiInfoResp err = json.Unmarshal(body, &item) if err != nil { return } if !item.Success { err = errors.New(item.Message) return } indexNameMap = make(map[string]*models.MySteelChemicalApiInfoItem) for _, v := range item.Data.List { indexNameMap[v.IndexCode] = v } return } func MySteelChemicalPost(postUrl, hType string, postData []byte) (body []byte, err error) { req, er := http.NewRequest(`POST`, postUrl, strings.NewReader(string(postData))) if er != nil { err = er return } req.Header.Set(`Content-Type`, `application/json`) req.Header.Set(`accessTokenSign`, utils.MsClRefreshToken) req.Header.Set(`infoOrData`, hType) client := &http.Client{} resp, er := client.Do(req) if er != nil { err = er return } defer resp.Body.Close() body, err = io.ReadAll(resp.Body) if err != nil { return } return }