浏览代码

feat:增加钢联api的对接

zqbao 9 月之前
父节点
当前提交
62c44ee4df

+ 31 - 0
controllers/base_from_mysteel_chemical.go

@@ -196,6 +196,37 @@ func (this *MySteelChemicalController) HandleMysteelIndex() {
 	br.Msg = "处理成功"
 }
 
+// @Title 处理钢联指标的接口
+// @Description 处理钢联指标的接口
+// @Success 200 {object} models.HandleMysteelIndexResp
+// @router /handle/api/mysteel/index [post]
+func (this *MySteelChemicalController) HandleApiMysteelIndex() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+	body := this.Ctx.Input.RequestBody
+	var req models.HandleMysteelIndexResp
+	err := json.Unmarshal(body, &req)
+	if err != nil {
+		br.Msg = "参数解析异常!"
+		br.ErrMsg = "参数解析失败,Err:" + err.Error()
+		return
+	}
+
+	errMsg, err := services.HandleApiMysteelIndex(&req)
+	if err != nil {
+		fmt.Println("HandleMysteelIndex Err:" + err.Error())
+		br.Msg = errMsg
+		br.ErrMsg = "处理失败,Err:" + err.Error()
+		return
+	}
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "处理成功"
+}
+
 // GetMaxFileIndex
 // @Title 获取最大的文件编号下标
 // @Description 获取最大的文件编号下标

+ 42 - 0
models/base_from_mysteel_chemical.go

@@ -243,6 +243,21 @@ func (m *BaseFromMysteelChemicalIndex) GetIndexItem(indexCode string) (item *Bas
 	return
 }
 
+func (m *BaseFromMysteelChemicalIndex) GetBatchIndexItem(indexCodes []string) (items []*BaseFromMysteelChemicalIndex, err error) {
+	if len(indexCodes) <= 0 {
+		return
+	}
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_mysteel_chemical_index WHERE index_code IN (%s) `
+	holder := make([]string, 0, len(indexCodes))
+	for range indexCodes {
+		holder = append(holder, "?")
+	}
+	sql = fmt.Sprintf(sql, strings.Join(holder, ","))
+	_, err = o.Raw(sql, indexCodes).QueryRows(&items)
+	return
+}
+
 func (m *BaseFromMysteelChemicalIndex) GetIndexCreate(terminalCode string) (items []*BaseFromMysteelChemicalIndex, err error) {
 	o := orm.NewOrm()
 	sql := `SELECT * FROM base_from_mysteel_chemical_index WHERE index_name = '' AND terminal_code = ? `
@@ -445,6 +460,13 @@ func (r *BaseFromMysteelChemicalData) Add(list []BaseFromMysteelChemicalData) (e
 	return
 }
 
+// AddV2 新增
+func (r *BaseFromMysteelChemicalData) AddV2(list []*BaseFromMysteelChemicalData) (err error) {
+	o := orm.NewOrm()
+	_, err = o.InsertMulti(500, list)
+	return
+}
+
 type AddMysteelIndexResp struct {
 	EdbCode                           string `description:"指标编码"`
 	TerminalCode                      string `description:"指标终端编码"`
@@ -536,3 +558,23 @@ func (m *BaseFromMysteelChemicalIndex) GetIndexByCondition(condition string, par
 	_, err = o.Raw(sql, pars).QueryRows(&items)
 	return
 }
+
+type MySteelChemicalApiResp struct {
+	Code      string                    `json:"code" description:"200成功,其他失败"`
+	Success   bool                      `json:"success" description:"true 成功,false 失败"`
+	Timestamp int64                     `json:"timestamp" description:"时间戳"`
+	Message   string                    `json:"message" description:"显示执行信息"`
+	Data      []*MySteelChemicalApiData `json:"data" description:"数据"`
+}
+
+type MySteelChemicalApiData struct {
+	IndexCode string                        `json:"INDEX_CODE"`
+	DataList  []*MySteelChemicalApiDataList `json:"dataList"`
+}
+
+type MySteelChemicalApiDataList struct {
+	PublishTime int64  `json:"PUBLISH_TIME"`
+	IndexCode   string `json:"INDEX_CODE"`
+	DataDate    string `json:"DATA_DATE"`
+	DataValue   int    `json:"DATA_VALUE"`
+}

+ 242 - 0
services/base_from_mysteel_chemical.go

@@ -1,11 +1,16 @@
 package services
 
 import (
+	"encoding/json"
+	"errors"
 	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
 	"eta/eta_index_lib/services/alarm_msg"
 	"eta/eta_index_lib/utils"
 	"fmt"
+	"io"
+	"net/http"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -27,6 +32,187 @@ func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) {
 	return
 }
 
+func HandleApiMysteelIndex(req *models.HandleMysteelIndexResp) (errMsg string, err error) {
+	addIndexCodeList := make([]string, 0)
+	for _, v := range req.List {
+		if v.IndexName == "" || v.IndexCode == "" {
+			continue
+		}
+		addIndexCodeList = append(addIndexCodeList, v.IndexCode)
+	}
+	errMsg, err = HandleApiIndex(addIndexCodeList)
+	if err != nil {
+		return
+	}
+
+	_ = SetMysteelChemicalEdbInfoUpdateStat(false)
+	_ = SetEdbSourceStat(false)
+
+	return
+}
+
+func HandleApiIndex(indexCodes []string) (errMsg string, err error) {
+	if len(indexCodes) == 0 {
+		return
+	}
+	resp, err := GetEdbDataFromMySteelChemical(indexCodes, utils.GetEdbRefreshStartDate(""), utils.BASE_END_DATE, "desc")
+	if err != nil {
+		return
+	}
+	if !resp.Success {
+		errMsg = "获取数据失败"
+		err = errors.New(resp.Message)
+		return
+	}
+	indexCodes = make([]string, 0)
+	for _, v := range resp.Data {
+		indexCodes = append(indexCodes, v.IndexCode)
+	}
+
+	indexObj := &models.BaseFromMysteelChemicalIndex{}
+	existIndexs, err := indexObj.GetBatchIndexItem(indexCodes)
+	if err != nil {
+		errMsg = "获取指标数据失败"
+		return
+	}
+
+	//获取已存在的所有数据
+	existDataMap := make(map[string]*models.BaseFromMysteelChemicalData)
+	existIndexMap := make(map[string]*models.BaseFromMysteelChemicalIndex)
+	updateDataObj := new(models.BaseFromMysteelChemicalData)
+	for _, v := range existIndexs {
+		existIndexMap[v.IndexCode] = v
+		exitDataList, er := updateDataObj.GetIndexDataList(v.IndexCode)
+		if er != nil {
+			errMsg = "获取指标数据失败"
+			err = er
+			return
+		}
+		fmt.Println("exitDataListLen:", len(exitDataList))
+		for _, v := range exitDataList {
+			dateStr := v.DataTime.Format(utils.FormatDate)
+			existDataMap[dateStr] = v
+		}
+	}
+	mysteelChemicalDatas, err := tranformData(resp)
+	if err != nil {
+		errMsg = "转换数据失败"
+		return
+	}
+
+	var indexErr error
+	var lErr error
+	defer func() {
+		if indexErr != nil {
+			tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error())
+			alarm_msg.SendAlarmMsg(tips, 3)
+		}
+
+		if lErr != nil {
+			tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error())
+			alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+	var hasUpdate bool
+	dataObj := new(models.BaseFromMysteelChemicalData)
+	for _, items := range mysteelChemicalDatas {
+		addItems := make([]*models.BaseFromMysteelChemicalData, 0)
+		for _, v := range items {
+			dateStr := v.DataTime.Format(utils.FormatDate)
+			if findData, ok := existDataMap[dateStr]; !ok {
+				index, ok := existIndexMap[v.IndexCode]
+				if !ok {
+					continue
+				}
+				v.BaseFromMysteelChemicalIndexId = index.BaseFromMysteelChemicalIndexId
+				addItems = append(addItems, v)
+			} else {
+				if findData != nil && findData.Value != v.Value {
+					dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
+					dataObj.Value = v.Value
+					dataObj.ModifyTime = time.Now()
+
+					err = dataObj.Update([]string{"value", "modify_time"})
+					if err != nil {
+						errMsg = "更新数据失败"
+						return
+					}
+					hasUpdate = true
+				}
+			}
+		}
+		err = dataObj.AddV2(addItems)
+		if err != nil {
+			return
+		}
+		//修改最大最小日期
+		if len(items) <= 0 {
+			continue
+		}
+		mysteelIndexMaxItem, er := dataObj.GetMysteelIndexInfoMaxAndMinInfo(items[0].IndexCode)
+		if er == nil && mysteelIndexMaxItem != nil {
+			e := dataObj.ModifyMysteelIndexMaxAndMinInfo(items[0].IndexCode, mysteelIndexMaxItem)
+			if e != nil {
+				fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error())
+				utils.FileLog.Info("修改钢联化工的最大最小日期失败,Err:" + e.Error())
+			}
+		}
+
+		edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode)
+		if e != nil && e.Error() != utils.ErrNoRow() {
+			indexErr = e
+			return
+		}
+
+		if edbInfo != nil {
+			dataUpdateResult := 2
+			dataUpdateFailedReason := "服务异常"
+			_, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
+			if logErr != nil {
+				lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 0, 0)
+				return
+			}
+
+			if hasUpdate {
+				dataUpdateResult = 1
+				dataUpdateFailedReason = ""
+			} else {
+				dataUpdateFailedReason = "未刷新到数据"
+			}
+
+			// 添加刷新成功日志
+			lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 0, 0)
+			if lErr != nil {
+				return
+			}
+		}
+		return
+	}
+	return
+}
+
+func tranformData(dataResp *models.MySteelChemicalApiResp) (items [][]*models.BaseFromMysteelChemicalData, err error) {
+	for _, v := range dataResp.Data {
+		tmpDataItems := make([]*models.BaseFromMysteelChemicalData, 0)
+		for _, vv := range v.DataList {
+			tmpData := new(models.BaseFromMysteelChemicalData)
+			tmpData.IndexCode = vv.IndexCode
+			dataDate, er := time.Parse(utils.FormatDate, vv.DataDate)
+			if er != nil {
+				err = er
+				return
+			}
+			tmpData.DataTime = dataDate
+			tmpData.Value = strconv.Itoa(vv.DataValue)
+			tmpData.CreateTime = time.Now()
+			tmpData.ModifyTime = time.Now()
+			tmpDataItems = append(tmpDataItems, tmpData)
+		}
+		items = append(items, tmpDataItems)
+	}
+	return
+}
+
 func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 	defer func() {
 		if err != nil {
@@ -271,3 +457,59 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 
 	return
 }
+
+type MySteelChemicalApiBody struct {
+	IndexCodes []string `json:"indexCodes"`
+	StartTime  string   `json:"startTime"`
+	EndTime    string   `json:"endTime"`
+	Order      string   `json:"order"`
+}
+
+func GetEdbDataFromMySteelChemical(indexCodes []string, startTime, endTime, order string) (item *models.MySteelChemicalApiResp, err error) {
+	// 如果没有配置,获取配置的方式是api,那么就走官方接口
+	if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" {
+		if utils.MsClRefreshToken == "" {
+			err = errors.New("钢联接口token为配置")
+			return
+		}
+		m := new(MySteelChemicalApiBody)
+		m.IndexCodes = indexCodes
+		m.StartTime = startTime
+		m.EndTime = endTime
+		m.Order = order
+		postData, er := json.Marshal(m)
+		if er != nil {
+			err = er
+			return
+		}
+		postUrl := `https://mds.mysteel.com/dynamic/order/api/fcAbRA`
+		req, er := http.NewRequest(`POST`, postUrl, strings.NewReader(string(postData)))
+		if er != nil {
+			err = er
+			return
+		}
+		req.Header.Set(`Content-Type`, `application/json`)
+		req.Header.Set(`accessTokenSign`, utils.MsClRefreshToken)
+		req.Header.Set(`infoOrData`, "data")
+
+		client := &http.Client{}
+		resp, er := client.Do(req)
+		if er != nil {
+			err = er
+			return
+		}
+		defer resp.Body.Close()
+		body, er := io.ReadAll(resp.Body)
+		if er != nil {
+			err = er
+			return
+		}
+		err = json.Unmarshal(body, &item)
+		if err != nil {
+			err = er
+			return
+		}
+		return
+	}
+	return
+}

+ 6 - 3
utils/config.go

@@ -3,10 +3,11 @@ package utils
 import (
 	"encoding/json"
 	"fmt"
+	"strconv"
+
 	beeLogger "github.com/beego/bee/v2/logger"
 	"github.com/beego/beego/v2/server/web"
 	"github.com/qiniu/qmgo"
-	"strconv"
 )
 
 var (
@@ -70,8 +71,9 @@ var (
 	EDB_DATA_LIMIT        = 10
 	Hz_Wind_Data_Url_LIST []WindUrlMap // wind接口服务地址配置(客户本地机器,是个list列表)
 
-	ThsDataMethod   string //同花顺数据获取的方式,app是通过终端;api是通过接口
-	ThsRefreshToken string // 同花顺的刷新token
+	ThsDataMethod    string //同花顺数据获取的方式,app是通过终端;api是通过接口
+	ThsRefreshToken  string // 同花顺的刷新token
+	MsClRefreshToken string // 钢联的刷新token
 )
 
 type WindUrlMap struct {
@@ -219,6 +221,7 @@ func init() {
 		if ThsDataMethod == `` {
 			ThsDataMethod = "api"
 		}
+		MsClRefreshToken = config["mscl_refresh_token"]
 	}
 
 	// ES配置