package services import ( "encoding/json" "errors" "eta/eta_index_lib/logic" "eta/eta_index_lib/models" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "io" "net/http" "strconv" "strings" "time" ) func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) { for _, v := range req.List { if v.IndexName == "" || v.IndexCode == "" { continue } err = handleIndex(v) if err != nil { return } } _ = SetMysteelChemicalEdbInfoUpdateStat(false) _ = SetEdbSourceStat(false) return } func HandleApiMysteelIndex(req *models.HandleMysteelIndexResp) (errMsg string, err error) { addIndexCodeList := make([]string, 0) for _, v := range req.List { if v.IndexCode == "" { continue } addIndexCodeList = append(addIndexCodeList, v.IndexCode) } errMsg, err = HandleApiIndex(addIndexCodeList) if err != nil { return } _ = SetMysteelChemicalEdbInfoUpdateStat(false) _ = SetEdbSourceStat(false) return } func ApiCheck() (ok bool, err error) { item, err := getPageIndexInfoMap(1, 1, false) if err != nil { if err.Error() == "406" { return false, nil } if item != nil && item.Code == "100006" { return false, nil } return } if item != nil && item.Code == "100006" { return false, nil } return true, nil } func HandleApiIndex(indexCodes []string) (errMsg string, err error) { if len(indexCodes) == 0 { return } resp, err := GetEdbDataFromMySteelChemical(indexCodes, utils.GetEdbRefreshStartDate(""), utils.BASE_END_DATE, "desc") if err != nil { return } if !resp.Success { errMsg = "获取数据失败" err = errors.New(resp.Message) return } indexInfoMap, err := GetMySteelChemicalIndexNameMap() if err != nil { errMsg = "获取指标数据失败" return } indexObj := &models.BaseFromMysteelChemicalIndex{} existIndexs, err := indexObj.GetBatchIndexItem(indexCodes) if err != nil { errMsg = "获取指标数据失败" return } //获取已存在的所有数据 existDataMap := make(map[string]*models.BaseFromMysteelChemicalData) existIndexMap := make(map[string]*models.BaseFromMysteelChemicalIndex) updateDataObj := new(models.BaseFromMysteelChemicalData) for _, v := range existIndexs { // 更新指标的名称,单位和频度等信息 if info, ok := indexInfoMap[v.IndexCode]; ok { v.IndexName = info.IndexName v.Unit = info.UnitName v.Frequency = info.FrequencyName v.ModifyTime = time.Now() err = v.Update([]string{"index_name", "unit", "frequency", "modify_time"}) if err != nil { errMsg = "更新指标失败" return } } if err != nil { errMsg = "添加指标失败" return } existIndexMap[v.IndexCode] = v exitDataList, er := updateDataObj.GetIndexDataList(v.IndexCode) if er != nil { errMsg = "获取指标数据失败" err = er return } fmt.Println("exitDataListLen:", len(exitDataList)) for _, v := range exitDataList { dateStr := v.DataTime.Format(utils.FormatDate) existDataMap[dateStr] = v } } mysteelChemicalDatas, err := tranformData(resp) if err != nil { errMsg = "转换数据失败" return } var indexErr error var lErr error defer func() { if indexErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } if lErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } }() var hasUpdate bool dataObj := new(models.BaseFromMysteelChemicalData) for _, items := range mysteelChemicalDatas { addItems := make([]*models.BaseFromMysteelChemicalData, 0) for _, v := range items { dateStr := v.DataTime.Format(utils.FormatDate) if findData, ok := existDataMap[dateStr]; !ok { index, ok := existIndexMap[v.IndexCode] if !ok { continue } v.BaseFromMysteelChemicalIndexId = index.BaseFromMysteelChemicalIndexId addItems = append(addItems, v) } else { if findData != nil && findData.Value != v.Value { dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId dataObj.Value = v.Value dataObj.ModifyTime = time.Now() err = dataObj.Update([]string{"value", "modify_time"}) if err != nil { errMsg = "更新数据失败" return } hasUpdate = true } } } err = dataObj.AddV2(addItems) if err != nil { return } //修改最大最小日期 if len(items) <= 0 { continue } mysteelIndexMaxItem, er := dataObj.GetMysteelIndexInfoMaxAndMinInfo(items[0].IndexCode) if er == nil && mysteelIndexMaxItem != nil { e := dataObj.ModifyMysteelIndexMaxAndMinInfo(items[0].IndexCode, mysteelIndexMaxItem) if e != nil { fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error()) utils.FileLog.Info("修改钢联化工的最大最小日期失败,Err:" + e.Error()) } } edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { indexErr = e return } if edbInfo != nil { dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``) if logErr != nil { lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 0, 0) return } if hasUpdate { dataUpdateResult = 1 dataUpdateFailedReason = "" } else { dataUpdateFailedReason = "未刷新到数据" } // 添加刷新成功日志 lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 0, 0) if lErr != nil { return } } } return } func tranformData(dataResp *models.MySteelChemicalApiResp) (items [][]*models.BaseFromMysteelChemicalData, err error) { for _, v := range dataResp.Data { tmpNewDataMap := make(map[string]int64) tmpDateDataMap := make(map[string]*models.BaseFromMysteelChemicalData) tmpDataItems := make([]*models.BaseFromMysteelChemicalData, 0) for _, vv := range v.DataList { tmpData := new(models.BaseFromMysteelChemicalData) tmpData.IndexCode = vv.IndexCode // 如果存在多条数据,则取发布时间最新的数据 pub, ok := tmpNewDataMap[vv.DataDate] if !ok { tmpNewDataMap[vv.DataDate] = vv.PublishTime tmpData.Value = strconv.FormatFloat(vv.DataValue, 'f', -1, 64) } else { if pub < vv.PublishTime { tmpNewDataMap[vv.DataDate] = vv.PublishTime tmpData = tmpDateDataMap[vv.DataDate] tmpData.Value = strconv.FormatFloat(vv.DataValue, 'f', -1, 64) } continue } dataDate, er := time.Parse(utils.FormatDate, vv.DataDate) if er != nil { err = er return } tmpData.DataTime = dataDate tmpData.CreateTime = time.Now() tmpData.ModifyTime = time.Now() tmpDataItems = append(tmpDataItems, tmpData) tmpDateDataMap[vv.DataDate] = tmpData } items = append(items, tmpDataItems) } return } func handleIndex(indexItem *models.HandleMysteelIndex) (err error) { defer func() { if err != nil { // 添加刷新失败日志 dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexItem.IndexCode) if e == nil { //查询指标存在,才添加刷新日志 _ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) } } }() indexObj := new(models.BaseFromMysteelChemicalIndex) var indexId int64 addDataList := make([]models.BaseFromMysteelChemicalData, 0) exitDataMap := make(map[string]*models.BaseFromMysteelChemicalData) //判断指标是否存在 var isAdd int item, err := indexObj.GetIndexItem(indexItem.IndexCode) if err != nil { if err.Error() == utils.ErrNoRow() { isAdd = 1 } else { isAdd = -1 return } } nameChange := false if item != nil && item.BaseFromMysteelChemicalIndexId > 0 { isAdd = 2 if item.IndexName != indexItem.IndexName { nameChange = true } } else { isAdd = 1 } fmt.Println("isAdd:", isAdd) if !strings.Contains(indexItem.Frequency, "度") { indexItem.Frequency = indexItem.Frequency + "度" } if isAdd == 1 { indexObj.IndexCode = indexItem.IndexCode indexObj.IndexName = indexItem.IndexName indexObj.Unit = indexItem.Unit indexObj.Source = indexItem.Source indexObj.Describe = indexItem.Describe indexObj.StartDate, err = utils.DealExcelDate(indexItem.StartDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.StartDate + err.Error()) return } indexObj.EndDate, err = utils.DealExcelDate(indexItem.EndDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.EndDate + err.Error()) return } indexObj.Frequency = indexItem.Frequency //indexObj.CreateTime = time.Now().Local() //indexObj.ModifyTime = time.Now().Local() err = indexObj.Add() if err != nil { fmt.Println("add err:" + err.Error()) return } indexId = indexObj.BaseFromMysteelChemicalIndexId } else if isAdd == 2 { indexObj.BaseFromMysteelChemicalIndexId = item.BaseFromMysteelChemicalIndexId indexObj.IndexCode = indexItem.IndexCode indexObj.IndexName = indexItem.IndexName indexObj.Unit = indexItem.Unit indexObj.Source = indexItem.Source indexObj.Describe = indexItem.Describe indexObj.StartDate, err = utils.DealExcelDate(indexItem.StartDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.StartDate + err.Error()) return } indexObj.EndDate, err = utils.DealExcelDate(indexItem.EndDate) if err != nil { fmt.Println("utils.DealExcelDate err:" + indexItem.EndDate + err.Error()) return } indexObj.Frequency = indexItem.Frequency indexObj.ModifyTime = time.Now() indexId = item.BaseFromMysteelChemicalIndexId indexObj.IsSupplierStop = item.IsSupplierStop var isStop int if strings.Contains(indexItem.IndexName, "停") { isStop = 1 indexObj.IsSupplierStop = 1 } indexObj.IsStop = isStop //修改数据 updateColsArr := make([]string, 0) updateColsArr = append(updateColsArr, "index_name") updateColsArr = append(updateColsArr, "unit") updateColsArr = append(updateColsArr, "source") updateColsArr = append(updateColsArr, "frequency") updateColsArr = append(updateColsArr, "start_date") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "describe") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "is_stop") updateColsArr = append(updateColsArr, "is_supplier_stop") updateColsArr = append(updateColsArr, "modify_time") e := indexObj.Update(updateColsArr) if e != nil { fmt.Println("Index Update Err:" + e.Error()) return } if item.IndexName != indexItem.IndexName { var changeRecord models.BaseFromMysteelChemicalRecord changeRecord.BaseFromMysteelChemicalIndexId = item.BaseFromMysteelChemicalIndexId changeRecord.OldIndexName = item.IndexName changeRecord.NewIndexName = indexItem.IndexName ctime := time.Now() changeRecord.CreateTime = ctime changeRecord.Timestamp = ctime.Unix() e = changeRecord.AddBaseFromMysteelChemicalRecord() if e != nil { fmt.Println("mysteel chemical changeRecord Add Err:" + e.Error()) return } } dataObj := new(models.BaseFromMysteelChemicalData) //获取已存在的所有数据 exitDataList, err := dataObj.GetIndexDataList(indexItem.IndexCode) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return err } fmt.Println("exitDataListLen:", len(exitDataList)) for _, v := range exitDataList { dateStr := v.DataTime.Format(utils.FormatDate) exitDataMap[dateStr] = v } } dataObj := new(models.BaseFromMysteelChemicalData) var hasUpdate bool // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for date, value := range indexItem.ExcelDataMap { dateTime, err := utils.DealExcelDate(date) if err != nil { fmt.Println("time.ParseInLocation Err:" + err.Error()) return err } date = dateTime.Format(utils.FormatDate) if findData, ok := exitDataMap[date]; !ok { if !strings.Contains(value, "#N/A") { dataItem := new(models.BaseFromMysteelChemicalData) dataItem.BaseFromMysteelChemicalIndexId = indexId dataItem.IndexCode = indexItem.IndexCode dataItem.DataTime = dateTime dataItem.Value = value dataItem.UpdateDate = indexItem.UpdateDate dataItem.CreateTime = time.Now() dataItem.ModifyTime = time.Now() addDataList = append(addDataList, *dataItem) } } else { if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据 dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId dataObj.Value = value dataObj.ModifyTime = time.Now() updateDataColsArr := make([]string, 0) updateDataColsArr = append(updateDataColsArr, "value") updateDataColsArr = append(updateDataColsArr, "modify_time") dataObj.Update(updateDataColsArr) hasUpdate = true } } } if len(addDataList) > 0 { err = dataObj.Add(addDataList) if err != nil { fmt.Println("dataObj.Add() Err:" + err.Error()) return } hasUpdate = true } //修改最大最小日期 mysteelIndexMaxItem, err := dataObj.GetMysteelIndexInfoMaxAndMinInfo(indexItem.IndexCode) if err == nil && mysteelIndexMaxItem != nil { e := dataObj.ModifyMysteelIndexMaxAndMinInfo(indexItem.IndexCode, mysteelIndexMaxItem) if e != nil { fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error()) } } // 同步刷新图库钢联的指标 //go func() { var indexErr error var lErr error defer func() { if indexErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } if lErr != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error()) alarm_msg.SendAlarmMsg(tips, 3) } }() edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode) if e != nil && e.Error() != utils.ErrNoRow() { indexErr = e return } if edbInfo != nil { dataUpdateResult := 2 dataUpdateFailedReason := "服务异常" _, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``) if logErr != nil { lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0) return } if hasUpdate { dataUpdateResult = 1 dataUpdateFailedReason = "" } else { dataUpdateFailedReason = "未刷新到数据" } // 添加刷新成功日志 lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 1, 0) if lErr != nil { return } //如果变更了指标名称,则添加指标信息变更日志 if nameChange { edbInfo.SourceIndexName = indexItem.IndexName lErr = edbInfo.Update([]string{"SourceIndexName"}) if lErr != nil { return } lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 0, "", 0, 1) } } //}() return } type MySteelChemicalApiDataBody struct { IndexCodes []string `json:"indexCodes"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Order string `json:"order"` } type MySteelChemicalApiInfoBody struct { PageNum int `json:"pageNum"` PageSize int `json:"pageSize"` IncludeInfo bool `json:"includeInfo"` } // GetEdbDataFromMySteelChemical 批量获得钢联化工的指标数据 func GetEdbDataFromMySteelChemical(indexCodes []string, startTime, endTime, order string) (item *models.MySteelChemicalApiResp, err error) { if utils.MysteelChemicalApiToken == "" { err = errors.New("钢联接口token未配置") return } m := new(MySteelChemicalApiDataBody) m.IndexCodes = indexCodes m.StartTime = startTime m.EndTime = endTime m.Order = order postData, er := json.Marshal(m) if er != nil { err = er return } postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA` body, err := MySteelChemicalPost(postUrl, "data", postData) if err != nil { return } err = json.Unmarshal(body, &item) if err != nil { return } return } // GetMySteelChemicalIndexNameMap 获取钢联化工的所有指标的信息 func GetMySteelChemicalIndexNameMap() (indexNameMap map[string]*models.MySteelChemicalApiInfoItem, err error) { if utils.MysteelChemicalApiToken == "" { err = errors.New("钢联接口token未配置") return } item, err := getPageIndexInfoMap(1, 200, true) if err != nil { return } indexNameMap = make(map[string]*models.MySteelChemicalApiInfoItem) for _, v := range item.Data.List { indexNameMap[v.IndexCode] = v } // 如果总条数大于200,则继续获取 if item.Data.Total > 200 || item.Data.Pages > 1 { for i := 2; i <= item.Data.Pages; i++ { item, err = getPageIndexInfoMap(i, 200, true) if err != nil { return } for _, v := range item.Data.List { indexNameMap[v.IndexCode] = v } } return } return } func getPageIndexInfoMap(pageNum, pageSize int, includeInfo bool) (item *models.MySteelChemicalApiInfoResp, err error) { m := new(MySteelChemicalApiInfoBody) m.PageNum = pageNum m.PageSize = pageSize m.IncludeInfo = includeInfo postData, er := json.Marshal(m) if er != nil { err = er return } postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA` body, er := MySteelChemicalPost(postUrl, "info", postData) if er != nil { err = er return } err = json.Unmarshal(body, &item) if err != nil { return } if !item.Success { err = errors.New(item.Message) utils.FileLog.Info("code:" + item.Code + " message:" + item.Message) return } return } func MySteelChemicalPost(postUrl, hType string, postData []byte) (body []byte, err error) { req, er := http.NewRequest(`POST`, postUrl, strings.NewReader(string(postData))) if er != nil { err = er return } req.Header.Set(`Content-Type`, `application/json`) req.Header.Set(`accessTokenSign`, utils.MysteelChemicalApiToken) req.Header.Set(`infoOrData`, hType) client := &http.Client{} resp, er := client.Do(req) if er != nil { err = er return } defer resp.Body.Close() body, err = io.ReadAll(resp.Body) if err != nil { return } return } func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err error) { indexObj := &models.BaseFromMysteelChemicalIndex{} tmpIndex, err := indexObj.GetIndexItem(edbCode) if err != nil { return } terminal, err := GetTerminal(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, tmpIndex.TerminalCode) if err != nil { err = fmt.Errorf("获取钢联化工接口配置出错 Err: %s", err) return } if tmpIndex.TerminalCode == "" { // 设置指标与终端关系的缓存 terminalCodeCacheKey := utils.CACHE_EDB_TERMINAL_CODE_URL + edbCode _ = utils.Rc.Put(terminalCodeCacheKey, terminal.TerminalCode, utils.GetTodayLastSecond()) } // 如果配置了api的token, 那么就走api接口 if utils.MysteelChemicalApiToken != "" { resp, er := GetEdbDataFromMySteelChemical([]string{edbCode}, startDate, endDate, "desc") if er != nil { err = er return } if !resp.Success { err = errors.New(resp.Message) return } dataObj := new(models.BaseFromMysteelChemicalData) exitDataList, er := dataObj.GetIndexDataList(edbCode) if er != nil { err = er return } existDataMap := make(map[string]*models.BaseFromMysteelChemicalData) for _, v := range exitDataList { dateStr := v.DataTime.Format(utils.FormatDate) existDataMap[dateStr] = v } mysteelChemicalDatas, er := tranformData(resp) if er != nil { err = er return } addItems := make([]*models.BaseFromMysteelChemicalData, 0) indexObj := &models.BaseFromMysteelChemicalIndex{} existIndex, er := indexObj.GetIndexItem(edbCode) if er != nil { err = er return } if len(mysteelChemicalDatas) == 0 { err = errors.New("没有获取到数据") return } // 因为只有一个指标,所以取第一个就可以了 items := mysteelChemicalDatas[0] for _, v := range items { dateStr := v.DataTime.Format(utils.FormatDate) if findData, ok := existDataMap[dateStr]; !ok { v.BaseFromMysteelChemicalIndexId = existIndex.BaseFromMysteelChemicalIndexId addItems = append(addItems, v) } else { if findData != nil && findData.Value != v.Value { dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId dataObj.Value = v.Value dataObj.ModifyTime = time.Now() err = dataObj.Update([]string{"value", "modify_time"}) if err != nil { return } } } } err = dataObj.AddV2(addItems) if err != nil { return } return } return }