Browse Source

Merge branch 'bzq1/mysteel_chemical2_custom_mcf' of eta_server/eta_index_lib into master

鲍自强 8 months ago
parent
commit
278020e750

+ 76 - 23
controllers/base_from_mysteel_chemical.go

@@ -87,19 +87,12 @@ func (this *MySteelChemicalController) Refresh() {
 		br.ErrMsg = "请输入指标编码,指标编码为空"
 		return
 	}
-	if req.EdbInfoId <= 0 {
+	if req.EdbInfoId < 0 {
 		br.Msg = "请输入指标ID!"
 		br.ErrMsg = "请输入指标ID"
 		return
 	}
 
-	// 获取指标详情
-	edbInfo, err := models.GetEdbInfoByEdbCode(source, req.EdbCode)
-	if err != nil {
-		br.Msg = "指标不存在!"
-		br.ErrMsg = "指标不存在"
-		return
-	}
 	cacheKey = utils.CACHE_EDB_DATA_REFRESH + strconv.Itoa(source) + "_" + req.EdbCode
 	if utils.Rc.IsExist(cacheKey) {
 		br.Ret = 501
@@ -109,30 +102,55 @@ func (this *MySteelChemicalController) Refresh() {
 	}
 	dataUpdateTime := time.Now().Format(utils.FormatDateTime)
 	utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
+	// 将数据刷新到BaseFromMysteelChemicalData
+	err = services.RefreshDataFromMysteelChemical(req.EdbCode, utils.GetEdbRefreshStartDate(req.StartDate), utils.BASE_END_DATE)
+	if err != nil {
+		br.Msg = "获取指标信息失败!"
+		br.ErrMsg = "获取指标信息失败 RefreshDataFromMysteelChemical,Err:" + err.Error()
+		return
+	}
 	err = models.RefreshEdbDataFromMysteelChemical(req.EdbInfoId, req.EdbCode, req.StartDate)
 	if err != nil && err.Error() != utils.ErrNoRow() {
 		br.Msg = "刷新指标信息失败!"
 		br.ErrMsg = "刷新指标信息失败 RefreshEdbDataFromMysteel,Err:" + err.Error()
 		return
 	}
+	if req.EdbInfoId != 0 {
+		// 获取指标详情
+		edbInfo, err := models.GetEdbInfoByEdbCode(source, req.EdbCode)
+		if err != nil {
+			br.Msg = "指标不存在!"
+			br.ErrMsg = "指标不存在"
+			return
+		}
 
-	// 更新指标最大最小值
-	erDataUpdateDate, err, errMsg := models.UnifiedModifyEdbInfoMaxAndMinInfoDataUpdate(edbInfo, dataUpdateTime)
-	if err != nil {
-		br.Msg = errMsg
-		br.ErrMsg = err.Error()
-		return
-	}
-	// 添加指标刷新成功日志
-	if erDataUpdateDate != "" {
-		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 1, "", 0, 0)
-	} else {
-		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 2, "未刷新到数据", 0, 0)
-	}
+		// 更新指标最大最小值
+		erDataUpdateDate, err, errMsg := models.UnifiedModifyEdbInfoMaxAndMinInfoDataUpdate(edbInfo, dataUpdateTime)
+		if err != nil {
+			br.Msg = errMsg
+			br.ErrMsg = err.Error()
+			return
+		}
+		// 添加指标刷新成功日志
+		isSourceRefresh := 0
+		if utils.MysteelChemicalApiToken != "" {
+			isSourceRefresh = 1
+		}
+		if erDataUpdateDate != "" {
+			_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 1, "", isSourceRefresh, 0)
+		} else {
+			_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 2, "未刷新到数据", isSourceRefresh, 0)
+		}
 
-	// 更新ES
-	go logic.UpdateEs(edbInfo.EdbInfoId)
+		// 更新ES
+		go logic.UpdateEs(edbInfo.EdbInfoId)
+	}
 
+	if utils.MysteelChemicalApiToken != "" {
+		// 钢联走api接口,那就更新数据源明细表
+		_ = services.SetMysteelChemicalEdbInfoUpdateStat(false)
+		_ = services.SetEdbSourceStat(false)
+	}
 	br.Ret = 200
 	br.Success = true
 	br.Msg = "获取成功"
@@ -196,6 +214,41 @@ func (this *MySteelChemicalController) HandleMysteelIndex() {
 	br.Msg = "处理成功"
 }
 
+// @Title 处理钢联指标的接口
+// @Description 处理钢联指标的接口
+// @Success 200 {object} models.HandleMysteelIndexResp
+// @router /handle/api/mysteel/index [post]
+func (this *MySteelChemicalController) HandleApiMysteelIndex() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+	body := this.Ctx.Input.RequestBody
+	var req models.HandleMysteelIndexResp
+	err := json.Unmarshal(body, &req)
+	if err != nil {
+		br.Msg = "参数解析异常!"
+		br.ErrMsg = "参数解析失败,Err:" + err.Error()
+		return
+	}
+
+	errMsg, err := services.HandleApiMysteelIndex(&req)
+	if err != nil {
+		fmt.Println("HandleMysteelIndex Err:" + err.Error())
+		if errMsg == "" {
+			br.Msg = "数据获取失败"
+		} else {
+			br.Msg = errMsg
+		}
+		br.ErrMsg = "处理失败,Err:" + err.Error()
+		return
+	}
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "处理成功"
+}
+
 // GetMaxFileIndex
 // @Title 获取最大的文件编号下标
 // @Description 获取最大的文件编号下标

+ 65 - 0
models/base_from_mysteel_chemical.go

@@ -243,6 +243,21 @@ func (m *BaseFromMysteelChemicalIndex) GetIndexItem(indexCode string) (item *Bas
 	return
 }
 
+func (m *BaseFromMysteelChemicalIndex) GetBatchIndexItem(indexCodes []string) (items []*BaseFromMysteelChemicalIndex, err error) {
+	if len(indexCodes) <= 0 {
+		return
+	}
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_mysteel_chemical_index WHERE index_code IN (%s) `
+	holder := make([]string, 0, len(indexCodes))
+	for range indexCodes {
+		holder = append(holder, "?")
+	}
+	sql = fmt.Sprintf(sql, strings.Join(holder, ","))
+	_, err = o.Raw(sql, indexCodes).QueryRows(&items)
+	return
+}
+
 func (m *BaseFromMysteelChemicalIndex) GetIndexCreate(terminalCode string) (items []*BaseFromMysteelChemicalIndex, err error) {
 	o := orm.NewOrm()
 	sql := `SELECT * FROM base_from_mysteel_chemical_index WHERE index_name = '' AND terminal_code = ? `
@@ -445,6 +460,16 @@ func (r *BaseFromMysteelChemicalData) Add(list []BaseFromMysteelChemicalData) (e
 	return
 }
 
+// AddV2 新增
+func (r *BaseFromMysteelChemicalData) AddV2(list []*BaseFromMysteelChemicalData) (err error) {
+	if len(list) == 0 {
+		return
+	}
+	o := orm.NewOrm()
+	_, err = o.InsertMulti(500, list)
+	return
+}
+
 type AddMysteelIndexResp struct {
 	EdbCode                           string `description:"指标编码"`
 	TerminalCode                      string `description:"指标终端编码"`
@@ -536,3 +561,43 @@ func (m *BaseFromMysteelChemicalIndex) GetIndexByCondition(condition string, par
 	_, err = o.Raw(sql, pars).QueryRows(&items)
 	return
 }
+
+type MySteelChemicalApiResp struct {
+	Code      string                    `json:"code" description:"200成功,其他失败"`
+	Success   bool                      `json:"success" description:"true 成功,false 失败"`
+	Timestamp int64                     `json:"timestamp" description:"时间戳"`
+	Message   string                    `json:"message" description:"显示执行信息"`
+	Data      []*MySteelChemicalApiData `json:"data" description:"数据"`
+}
+type MySteelChemicalApiInfoResp struct {
+	Code      string                  `json:"code" description:"200成功,其他失败"`
+	Success   bool                    `json:"success" description:"true 成功,false 失败"`
+	Timestamp int64                   `json:"timestamp" description:"时间戳"`
+	Message   string                  `json:"message" description:"显示执行信息"`
+	Data      *MySteelChemicalApiInfo `json:"data" description:"数据"`
+}
+
+type MySteelChemicalApiInfo struct {
+	Total int                           `json:"total" description:"总条数"`
+	Pages int                           `json:"pages" description:"总页数"`
+	List  []*MySteelChemicalApiInfoItem `json:"list" description:"数据列表"`
+}
+
+type MySteelChemicalApiInfoItem struct {
+	IndexCode     string `json:"INDEX_CODE"`
+	IndexName     string `json:"INDEX_NAME"`
+	FrequencyName string `json:"FREQUENCY_NAME"`
+	UnitName      string `json:"UNIT_NAME"`
+}
+
+type MySteelChemicalApiData struct {
+	IndexCode string                        `json:"INDEX_CODE"`
+	DataList  []*MySteelChemicalApiDataList `json:"dataList"`
+}
+
+type MySteelChemicalApiDataList struct {
+	PublishTime int64   `json:"PUBLISH_TIME"`
+	IndexCode   string  `json:"INDEX_CODE"`
+	DataDate    string  `json:"DATA_DATE"`
+	DataValue   float64 `json:"DATA_VALUE"`
+}

+ 10 - 0
models/edb_info.go

@@ -1384,6 +1384,16 @@ func EdbInfoAdd(req *AddEdbInfoParams, serverUrl string, sysUserId int, sysUserR
 			err = errors.New("指标信息不全")
 			return
 		}
+
+		// 兼容数据
+		{
+			if req.Frequency == `` {
+				req.Frequency = tmpItem.Frequency
+			}
+			if req.Unit == `` {
+				req.Unit = tmpItem.Unit
+			}
+		}
 	}
 	//获取该层级下最大的排序数
 	maxSort, err := GetEdbAndClassifyMaxSort(req.ClassifyId, 0)

+ 9 - 0
routers/commentsRouter.go

@@ -970,6 +970,15 @@ func init() {
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:MySteelChemicalController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:MySteelChemicalController"],
+        beego.ControllerComments{
+            Method: "HandleApiMysteelIndex",
+            Router: `/handle/api/mysteel/index`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_index_lib/controllers:MySteelChemicalController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:MySteelChemicalController"],
         beego.ControllerComments{
             Method: "HandleMysteelIndex",

+ 428 - 0
services/base_from_mysteel_chemical.go

@@ -1,11 +1,16 @@
 package services
 
 import (
+	"encoding/json"
+	"errors"
 	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
 	"eta/eta_index_lib/services/alarm_msg"
 	"eta/eta_index_lib/utils"
 	"fmt"
+	"io"
+	"net/http"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -27,6 +32,218 @@ func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) {
 	return
 }
 
+func HandleApiMysteelIndex(req *models.HandleMysteelIndexResp) (errMsg string, err error) {
+	addIndexCodeList := make([]string, 0)
+	for _, v := range req.List {
+		if v.IndexCode == "" {
+			continue
+		}
+		addIndexCodeList = append(addIndexCodeList, v.IndexCode)
+	}
+	errMsg, err = HandleApiIndex(addIndexCodeList)
+	if err != nil {
+		return
+	}
+
+	_ = SetMysteelChemicalEdbInfoUpdateStat(false)
+	_ = SetEdbSourceStat(false)
+
+	return
+}
+
+func HandleApiIndex(indexCodes []string) (errMsg string, err error) {
+	if len(indexCodes) == 0 {
+		return
+	}
+	resp, err := GetEdbDataFromMySteelChemical(indexCodes, utils.GetEdbRefreshStartDate(""), utils.BASE_END_DATE, "desc")
+	if err != nil {
+		return
+	}
+	if !resp.Success {
+		errMsg = "获取数据失败"
+		err = errors.New(resp.Message)
+		return
+	}
+	indexInfoMap, err := GetMySteelChemicalIndexNameMap()
+	if err != nil {
+		errMsg = "获取指标数据失败"
+		return
+	}
+
+	indexObj := &models.BaseFromMysteelChemicalIndex{}
+	existIndexs, err := indexObj.GetBatchIndexItem(indexCodes)
+	if err != nil {
+		errMsg = "获取指标数据失败"
+		return
+	}
+
+	//获取已存在的所有数据
+	existDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
+	existIndexMap := make(map[string]*models.BaseFromMysteelChemicalIndex)
+	updateDataObj := new(models.BaseFromMysteelChemicalData)
+	for _, v := range existIndexs {
+		// 更新指标的名称,单位和频度等信息
+		if info, ok := indexInfoMap[v.IndexCode]; ok {
+			v.IndexName = info.IndexName
+			v.Unit = info.UnitName
+			v.Frequency = info.FrequencyName
+			v.ModifyTime = time.Now()
+			err = v.Update([]string{"index_name", "unit", "frequency", "modify_time"})
+			if err != nil {
+				errMsg = "更新指标失败"
+				return
+			}
+		}
+		if err != nil {
+			errMsg = "添加指标失败"
+			return
+		}
+		existIndexMap[v.IndexCode] = v
+		exitDataList, er := updateDataObj.GetIndexDataList(v.IndexCode)
+		if er != nil {
+			errMsg = "获取指标数据失败"
+			err = er
+			return
+		}
+		fmt.Println("exitDataListLen:", len(exitDataList))
+		for _, v := range exitDataList {
+			dateStr := v.DataTime.Format(utils.FormatDate)
+			existDataMap[dateStr] = v
+		}
+	}
+	mysteelChemicalDatas, err := tranformData(resp)
+	if err != nil {
+		errMsg = "转换数据失败"
+		return
+	}
+
+	var indexErr error
+	var lErr error
+	defer func() {
+		if indexErr != nil {
+			tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error())
+			alarm_msg.SendAlarmMsg(tips, 3)
+		}
+
+		if lErr != nil {
+			tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error())
+			alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+	var hasUpdate bool
+	dataObj := new(models.BaseFromMysteelChemicalData)
+	for _, items := range mysteelChemicalDatas {
+		addItems := make([]*models.BaseFromMysteelChemicalData, 0)
+		for _, v := range items {
+			dateStr := v.DataTime.Format(utils.FormatDate)
+			if findData, ok := existDataMap[dateStr]; !ok {
+				index, ok := existIndexMap[v.IndexCode]
+				if !ok {
+					continue
+				}
+				v.BaseFromMysteelChemicalIndexId = index.BaseFromMysteelChemicalIndexId
+				addItems = append(addItems, v)
+			} else {
+				if findData != nil && findData.Value != v.Value {
+					dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
+					dataObj.Value = v.Value
+					dataObj.ModifyTime = time.Now()
+
+					err = dataObj.Update([]string{"value", "modify_time"})
+					if err != nil {
+						errMsg = "更新数据失败"
+						return
+					}
+					hasUpdate = true
+				}
+			}
+		}
+		err = dataObj.AddV2(addItems)
+		if err != nil {
+			return
+		}
+		//修改最大最小日期
+		if len(items) <= 0 {
+			continue
+		}
+		mysteelIndexMaxItem, er := dataObj.GetMysteelIndexInfoMaxAndMinInfo(items[0].IndexCode)
+		if er == nil && mysteelIndexMaxItem != nil {
+			e := dataObj.ModifyMysteelIndexMaxAndMinInfo(items[0].IndexCode, mysteelIndexMaxItem)
+			if e != nil {
+				fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error())
+				utils.FileLog.Info("修改钢联化工的最大最小日期失败,Err:" + e.Error())
+			}
+		}
+
+		edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode)
+		if e != nil && e.Error() != utils.ErrNoRow() {
+			indexErr = e
+			return
+		}
+
+		if edbInfo != nil {
+			dataUpdateResult := 2
+			dataUpdateFailedReason := "服务异常"
+			_, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
+			if logErr != nil {
+				lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 0, 0)
+				return
+			}
+
+			if hasUpdate {
+				dataUpdateResult = 1
+				dataUpdateFailedReason = ""
+			} else {
+				dataUpdateFailedReason = "未刷新到数据"
+			}
+
+			// 添加刷新成功日志
+			lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 0, 0)
+			if lErr != nil {
+				return
+			}
+		}
+	}
+	return
+}
+
+func tranformData(dataResp *models.MySteelChemicalApiResp) (items [][]*models.BaseFromMysteelChemicalData, err error) {
+	for _, v := range dataResp.Data {
+		tmpNewDataMap := make(map[string]int64)
+		tmpDateDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
+		tmpDataItems := make([]*models.BaseFromMysteelChemicalData, 0)
+		for _, vv := range v.DataList {
+			tmpData := new(models.BaseFromMysteelChemicalData)
+			tmpData.IndexCode = vv.IndexCode
+			// 如果存在多条数据,则取发布时间最新的数据
+			pub, ok := tmpNewDataMap[vv.DataDate]
+			if !ok {
+				tmpNewDataMap[vv.DataDate] = vv.PublishTime
+				tmpData.Value = strconv.FormatFloat(vv.DataValue, 'f', -1, 64)
+			} else {
+				if pub < vv.PublishTime {
+					tmpNewDataMap[vv.DataDate] = vv.PublishTime
+					tmpData = tmpDateDataMap[vv.DataDate]
+					tmpData.Value = strconv.FormatFloat(vv.DataValue, 'f', -1, 64)
+				}
+				continue
+			}
+			dataDate, er := time.Parse(utils.FormatDate, vv.DataDate)
+			if er != nil {
+				err = er
+				return
+			}
+			tmpData.DataTime = dataDate
+			tmpData.CreateTime = time.Now()
+			tmpData.ModifyTime = time.Now()
+			tmpDataItems = append(tmpDataItems, tmpData)
+			tmpDateDataMap[vv.DataDate] = tmpData
+		}
+		items = append(items, tmpDataItems)
+	}
+	return
+}
+
 func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 	defer func() {
 		if err != nil {
@@ -271,3 +488,214 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 
 	return
 }
+
+type MySteelChemicalApiDataBody struct {
+	IndexCodes []string `json:"indexCodes"`
+	StartTime  string   `json:"startTime"`
+	EndTime    string   `json:"endTime"`
+	Order      string   `json:"order"`
+}
+
+type MySteelChemicalApiInfoBody struct {
+	PageNum     int  `json:"pageNum"`
+	PageSize    int  `json:"pageSize"`
+	IncludeInfo bool `json:"includeInfo"`
+}
+
+// GetEdbDataFromMySteelChemical 批量获得钢联化工的指标数据
+func GetEdbDataFromMySteelChemical(indexCodes []string, startTime, endTime, order string) (item *models.MySteelChemicalApiResp, err error) {
+	if utils.MysteelChemicalApiToken == "" {
+		err = errors.New("钢联接口token未配置")
+		return
+	}
+	m := new(MySteelChemicalApiDataBody)
+	m.IndexCodes = indexCodes
+	m.StartTime = startTime
+	m.EndTime = endTime
+	m.Order = order
+	postData, er := json.Marshal(m)
+	if er != nil {
+		err = er
+		return
+	}
+	postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
+	body, err := MySteelChemicalPost(postUrl, "data", postData)
+	if err != nil {
+		return
+	}
+	err = json.Unmarshal(body, &item)
+	if err != nil {
+		return
+	}
+	return
+}
+
+// GetMySteelChemicalIndexNameMap 获取钢联化工的所有指标的信息
+func GetMySteelChemicalIndexNameMap() (indexNameMap map[string]*models.MySteelChemicalApiInfoItem, err error) {
+	if utils.MysteelChemicalApiToken == "" {
+		err = errors.New("钢联接口token未配置")
+		return
+	}
+	item, err := getPageIndexInfoMap(1, 200)
+	if err != nil {
+		return
+	}
+	indexNameMap = make(map[string]*models.MySteelChemicalApiInfoItem)
+	for _, v := range item.Data.List {
+		indexNameMap[v.IndexCode] = v
+	}
+	// 如果总条数大于200,则继续获取
+	if item.Data.Total > 200 || item.Data.Pages > 1 {
+		for i := 2; i <= item.Data.Pages; i++ {
+			item, err = getPageIndexInfoMap(i, 200)
+			if err != nil {
+				return
+			}
+			for _, v := range item.Data.List {
+				indexNameMap[v.IndexCode] = v
+			}
+		}
+		return
+	}
+	return
+}
+
+func getPageIndexInfoMap(pageNum, pageSize int) (item *models.MySteelChemicalApiInfoResp, err error) {
+	m := new(MySteelChemicalApiInfoBody)
+	m.PageNum = pageNum
+	m.PageSize = pageSize
+	m.IncludeInfo = true
+	postData, er := json.Marshal(m)
+	if er != nil {
+		err = er
+		return
+	}
+	postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
+	body, er := MySteelChemicalPost(postUrl, "info", postData)
+	if er != nil {
+		err = er
+		return
+	}
+	err = json.Unmarshal(body, &item)
+	if err != nil {
+		return
+	}
+	if !item.Success {
+		err = errors.New(item.Message)
+		utils.FileLog.Info("code:" + item.Code + " message:" + item.Message)
+		return
+	}
+	return
+}
+
+func MySteelChemicalPost(postUrl, hType string, postData []byte) (body []byte, err error) {
+	req, er := http.NewRequest(`POST`, postUrl, strings.NewReader(string(postData)))
+	if er != nil {
+		err = er
+		return
+	}
+	req.Header.Set(`Content-Type`, `application/json`)
+	req.Header.Set(`accessTokenSign`, utils.MysteelChemicalApiToken)
+	req.Header.Set(`infoOrData`, hType)
+
+	client := &http.Client{}
+	resp, er := client.Do(req)
+	if er != nil {
+		err = er
+		return
+	}
+	defer resp.Body.Close()
+	body, err = io.ReadAll(resp.Body)
+	if err != nil {
+		return
+	}
+	return
+}
+
+func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err error) {
+	indexObj := &models.BaseFromMysteelChemicalIndex{}
+	tmpIndex, err := indexObj.GetIndexItem(edbCode)
+	if err != nil {
+		return
+	}
+
+	terminal, err := GetTerminal(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, tmpIndex.TerminalCode)
+	if err != nil {
+		err = fmt.Errorf("获取钢联化工接口配置出错 Err: %s", err)
+		return
+	}
+
+	if tmpIndex.TerminalCode == "" {
+		// 设置指标与终端关系的缓存
+		terminalCodeCacheKey := utils.CACHE_EDB_TERMINAL_CODE_URL + edbCode
+		_ = utils.Rc.Put(terminalCodeCacheKey, terminal.TerminalCode, utils.GetTodayLastSecond())
+	}
+
+	// 如果配置了api的token, 那么就走api接口
+	if utils.MysteelChemicalApiToken != "" {
+		resp, er := GetEdbDataFromMySteelChemical([]string{edbCode}, startDate, endDate, "desc")
+		if er != nil {
+			err = er
+			return
+		}
+		if !resp.Success {
+			err = errors.New(resp.Message)
+			return
+		}
+
+		dataObj := new(models.BaseFromMysteelChemicalData)
+		exitDataList, er := dataObj.GetIndexDataList(edbCode)
+		if er != nil {
+			err = er
+			return
+		}
+		existDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
+		for _, v := range exitDataList {
+			dateStr := v.DataTime.Format(utils.FormatDate)
+			existDataMap[dateStr] = v
+		}
+		mysteelChemicalDatas, er := tranformData(resp)
+		if er != nil {
+			err = er
+			return
+		}
+
+		addItems := make([]*models.BaseFromMysteelChemicalData, 0)
+		indexObj := &models.BaseFromMysteelChemicalIndex{}
+		existIndex, er := indexObj.GetIndexItem(edbCode)
+		if er != nil {
+			err = er
+			return
+		}
+		if len(mysteelChemicalDatas) == 0 {
+			err = errors.New("没有获取到数据")
+			return
+		}
+		// 因为只有一个指标,所以取第一个就可以了
+		items := mysteelChemicalDatas[0]
+		for _, v := range items {
+			dateStr := v.DataTime.Format(utils.FormatDate)
+			if findData, ok := existDataMap[dateStr]; !ok {
+				v.BaseFromMysteelChemicalIndexId = existIndex.BaseFromMysteelChemicalIndexId
+				addItems = append(addItems, v)
+			} else {
+				if findData != nil && findData.Value != v.Value {
+					dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
+					dataObj.Value = v.Value
+					dataObj.ModifyTime = time.Now()
+
+					err = dataObj.Update([]string{"value", "modify_time"})
+					if err != nil {
+						return
+					}
+				}
+			}
+		}
+		err = dataObj.AddV2(addItems)
+		if err != nil {
+			return
+		}
+		return
+	}
+	return
+}

+ 6 - 3
utils/config.go

@@ -3,10 +3,11 @@ package utils
 import (
 	"encoding/json"
 	"fmt"
+	"strconv"
+
 	beeLogger "github.com/beego/bee/v2/logger"
 	"github.com/beego/beego/v2/server/web"
 	"github.com/qiniu/qmgo"
-	"strconv"
 )
 
 var (
@@ -71,8 +72,9 @@ var (
 	EDB_DATA_LIMIT        = 10
 	Hz_Wind_Data_Url_LIST []WindUrlMap // wind接口服务地址配置(客户本地机器,是个list列表)
 
-	ThsDataMethod   string //同花顺数据获取的方式,app是通过终端;api是通过接口
-	ThsRefreshToken string // 同花顺的刷新token
+	ThsDataMethod           string //同花顺数据获取的方式,app是通过终端;api是通过接口
+	ThsRefreshToken         string // 同花顺的刷新token
+	MysteelChemicalApiToken string // 钢联化工的api数据token
 )
 
 type WindUrlMap struct {
@@ -225,6 +227,7 @@ func init() {
 		if ThsDataMethod == `` {
 			ThsDataMethod = "api"
 		}
+		MysteelChemicalApiToken = config["mysteel_chemical_api_token"]
 	}
 
 	// ES配置