|
@@ -0,0 +1,859 @@
|
|
|
+package services
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "eta/eta_index_lib/logic"
|
|
|
+ "eta/eta_index_lib/models"
|
|
|
+ "eta/eta_index_lib/services/alarm_msg"
|
|
|
+ "eta/eta_index_lib/utils"
|
|
|
+ "fmt"
|
|
|
+ "io/ioutil"
|
|
|
+ "net/http"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+// GetKplerDataByApi 获取开普勒数据
|
|
|
+func GetKplerDataByApi(params models.KplerSearchEdbReq, terminalCode string) (indexes []*models.KplerIndexItem, terminalInfo *models.EdbTerminal, err error) {
|
|
|
+ terminal, e := GetApiTerminal(utils.DATA_SOURCE_KPLER, terminalCode)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取开普勒终端配置失败, %v", e)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if terminal.ServerUrl == "" {
|
|
|
+ err = fmt.Errorf("开普勒终端地址未配置")
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // 走API
|
|
|
+ if terminal.IsApi == 1 {
|
|
|
+ indexes, err = getKplerDataByApi(params, terminal.ServerUrl)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("获取开普勒指标数据失败, %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ terminalInfo = terminal
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// getEdbDataFromThsHfHttp API-获取高频指标数据
|
|
|
+func getKplerDataByApi(params models.KplerSearchEdbReq, serverUrl string) (list []*models.KplerIndexItem, err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ tips := fmt.Sprintf("开普勒指标API-getKplerDataByApi err: %v", err)
|
|
|
+ utils.FileLog.Info(tips)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // todo 判断指标是否已存在
|
|
|
+ productNamesStr := params.ProductNames
|
|
|
+ fromZoneNamesStr := params.FromZoneNames
|
|
|
+ toZoneNamesStr := params.ToZoneNames
|
|
|
+
|
|
|
+
|
|
|
+ libreq := new(models.KplerFlowDataLibReq)
|
|
|
+ libreq.Products = productNamesStr
|
|
|
+ libreq.FromZones = fromZoneNamesStr
|
|
|
+ libreq.ToZones = toZoneNamesStr
|
|
|
+ libreq.Split = params.Split
|
|
|
+ libreq.FlowDirection = params.FlowDirection
|
|
|
+ libreq.Granularity = params.Granularity
|
|
|
+ libreq.Unit = params.Unit
|
|
|
+ libreq.WithIntraRegion = "true"
|
|
|
+ libreq.WithForecast = "true"
|
|
|
+ libreq.OnlyRealized = "false"
|
|
|
+ libreq.StartDate = "2025-01-01"
|
|
|
+ libreq.EndDate = time.Now().Format(utils.FormatDate)
|
|
|
+ // 请求接口
|
|
|
+ apiResp, err := getKplerFlowDataLib(serverUrl, libreq)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("开普勒指标API-getKplerDataByApi err: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if apiResp.Ret != 200 {
|
|
|
+ err = fmt.Errorf("开普勒指标API-状态码: %d, 提示信息: %s", apiResp.Ret, apiResp.ErrMsg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexes := make([]*models.KplerIndexItem, 0)
|
|
|
+ if len(apiResp.Data) == 0 {
|
|
|
+ utils.FileLog.Info("开普勒指标API-无数据")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexCodeList := make([]string, 0)
|
|
|
+ existIndexCodeMap := make(map[string]bool)
|
|
|
+ flowDirection := params.FlowDirection
|
|
|
+ prefixIndexName := fmt.Sprintf("kpler%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
|
|
|
+ lastIndexName := params.Granularity
|
|
|
+
|
|
|
+ prefixIndexCode := fmt.Sprintf("k%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
|
|
|
+ // 获取首字母
|
|
|
+ lastIndexCode := strings.ToUpper(params.Granularity[:1])
|
|
|
+ // Tables中的每一个对应一个证券代码
|
|
|
+ for _, v := range apiResp.Data {
|
|
|
+ index := new(models.KplerIndexItem)
|
|
|
+ indexName := ""
|
|
|
+ indexCode := ""
|
|
|
+
|
|
|
+ indexName = fmt.Sprintf("%s%s%s", prefixIndexName, v.SplitItem, lastIndexName)
|
|
|
+ indexCode = fmt.Sprintf("%s%s%s", prefixIndexCode,v.SplitItem, lastIndexCode)
|
|
|
+
|
|
|
+ indexCodeList = append(indexCodeList, indexCode)
|
|
|
+ index.IndexCode = indexCode
|
|
|
+ index.IndexName = indexName
|
|
|
+ index.Frequency = getKplerFrequency(params.Granularity)
|
|
|
+ index.Unit = params.Unit
|
|
|
+ index.IndexData = v.IndexData
|
|
|
+ indexes = append(indexes, index)
|
|
|
+ }
|
|
|
+
|
|
|
+ indexObj := new(models.BaseFromKplerIndex)
|
|
|
+ // 查询指标是否存在
|
|
|
+ existList, err := indexObj.GetByIndexCodes(indexCodeList)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("查询指标是否存在失败, %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, v := range existList {
|
|
|
+ existIndexCodeMap[v.IndexCode] = true
|
|
|
+ }
|
|
|
+ // 过滤已经存在的指标
|
|
|
+ list = make([]*models.KplerIndexItem, 0)
|
|
|
+ for _, v := range indexes {
|
|
|
+ if _, ok := existIndexCodeMap[v.IndexCode]; !ok {
|
|
|
+ list = append(list, v)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return list, nil
|
|
|
+}
|
|
|
+
|
|
|
+func AddKplerIndexByApi(indexList []*models.KplerIndexItem, req *models.KplerSearchEdbReq, classifyId int, terminalCode string) (err error) {
|
|
|
+ errMsgList := make([]string, 0)
|
|
|
+ defer func() {
|
|
|
+ if len(errMsgList) > 0 {
|
|
|
+ msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
|
|
|
+ utils.FileLog.Info(msg)
|
|
|
+ go alarm_msg.SendAlarmMsg(msg, 3)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ maxSort := 0
|
|
|
+ for _, indexInfo := range indexList {
|
|
|
+ indexObj := new(models.BaseFromKplerIndex)
|
|
|
+ indexId := 0
|
|
|
+ //判断指标是否存在
|
|
|
+ var isAdd int
|
|
|
+ item, er := indexObj.GetByIndexCode(indexInfo.IndexCode)
|
|
|
+ if er != nil {
|
|
|
+ if er.Error() == utils.ErrNoRow() {
|
|
|
+ isAdd = 1
|
|
|
+ err = nil
|
|
|
+ } else {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprintf("查询数据源指标库失败 GetByIndexCode Err:%s", er))
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if item != nil && item.BaseFromKplerIndexId > 0 {
|
|
|
+ fmt.Println("item:", item)
|
|
|
+ isAdd = 2
|
|
|
+ } else {
|
|
|
+ isAdd = 1
|
|
|
+ }
|
|
|
+ // 批量新增指标
|
|
|
+ if isAdd == 1 {
|
|
|
+ indexObj.IndexCode = indexInfo.IndexCode
|
|
|
+ indexObj.IndexName = indexInfo.IndexName
|
|
|
+ indexObj.Frequency = indexInfo.Frequency
|
|
|
+ indexObj.ClassifyId = int(classifyId)
|
|
|
+ indexObj.ProductNames = req.ProductNames
|
|
|
+ indexObj.FromZoneName = req.FromZoneNames
|
|
|
+ indexObj.ToZoneName = req.ToZoneNames
|
|
|
+ indexObj.FlowDirection = req.FlowDirection
|
|
|
+ indexObj.Granularity = req.Granularity
|
|
|
+ indexObj.Split = req.Split
|
|
|
+ indexObj.Unit = req.Unit
|
|
|
+ indexObj.Sort = maxSort
|
|
|
+ indexObj.ModifyTime = time.Now()
|
|
|
+ indexObj.CreateTime = time.Now()
|
|
|
+ indexObj.TerminalCode = terminalCode
|
|
|
+ err = indexObj.Add()
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexId = indexObj.BaseFromKplerIndexId
|
|
|
+ maxSort++
|
|
|
+ } else if isAdd == 2 {
|
|
|
+ indexId = item.BaseFromKplerIndexId
|
|
|
+ if item.TerminalCode == `` && terminalCode != `` {
|
|
|
+ item.TerminalCode = terminalCode
|
|
|
+ err = item.Update([]string{"TerminalCode"})
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
|
|
|
+ indexObj.IndexName = indexInfo.IndexName
|
|
|
+ indexObj.Frequency = indexInfo.Frequency
|
|
|
+ indexObj.ClassifyId = int(classifyId)
|
|
|
+ indexObj.ProductNames = req.ProductNames
|
|
|
+ indexObj.FromZoneName = req.FromZoneNames
|
|
|
+ indexObj.ToZoneName = req.ToZoneNames
|
|
|
+ indexObj.FlowDirection = req.FlowDirection
|
|
|
+ indexObj.Granularity = req.Granularity
|
|
|
+ indexObj.Split = req.Split
|
|
|
+ indexObj.Unit = req.Unit
|
|
|
+ indexObj.ModifyTime = time.Now()
|
|
|
+
|
|
|
+ //修改数据
|
|
|
+ updateColsArr := make([]string, 0)
|
|
|
+ updateColsArr = append(updateColsArr, "index_name")
|
|
|
+ updateColsArr = append(updateColsArr, "classify_id")
|
|
|
+ updateColsArr = append(updateColsArr, "product_names")
|
|
|
+ updateColsArr = append(updateColsArr, "from_zone_name")
|
|
|
+ updateColsArr = append(updateColsArr, "to_zone_name")
|
|
|
+ updateColsArr = append(updateColsArr, "flow_direction")
|
|
|
+ updateColsArr = append(updateColsArr, "granularity")
|
|
|
+ updateColsArr = append(updateColsArr, "split")
|
|
|
+ updateColsArr = append(updateColsArr, "frequency")
|
|
|
+ updateColsArr = append(updateColsArr, "modify_time")
|
|
|
+
|
|
|
+ e := indexObj.Update(updateColsArr)
|
|
|
+ if e != nil {
|
|
|
+ fmt.Println("Index Update Err:" + e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dataObj := new(models.BaseFromKplerData)
|
|
|
+ exitDataMap := make(map[string]*models.BaseFromKplerData)
|
|
|
+ //获取已存在的所有数据
|
|
|
+ var exitDataList []*models.BaseFromKplerData
|
|
|
+ exitDataList, err = dataObj.GetByIndexCode(indexInfo.IndexCode)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ fmt.Println("exitDataListLen:", len(exitDataList))
|
|
|
+ for _, v := range exitDataList {
|
|
|
+ dateStr := v.DataTime
|
|
|
+ exitDataMap[dateStr] = v
|
|
|
+ }
|
|
|
+ addDataList := make([]*models.BaseFromKplerData, 0)
|
|
|
+
|
|
|
+ // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
|
|
|
+ for _, value := range indexInfo.IndexData {
|
|
|
+ date := value.DataTime
|
|
|
+ value := value.Value
|
|
|
+ if findData, ok := exitDataMap[date]; !ok {
|
|
|
+ _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var saveDataTime time.Time
|
|
|
+ if strings.Contains(date, "00:00:00") {
|
|
|
+ saveDataTime, err = time.Parse(utils.FormatDateTime, date)
|
|
|
+ } else {
|
|
|
+ saveDataTime, err = time.Parse(utils.FormatDate, date)
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ timestamp := saveDataTime.UnixNano() / 1e6
|
|
|
+
|
|
|
+ dataItem := new(models.BaseFromKplerData)
|
|
|
+ dataItem.BaseFromKplerIndexId = int(indexId)
|
|
|
+ dataItem.IndexCode = indexInfo.IndexCode
|
|
|
+ dataItem.DataTime = date
|
|
|
+ dataItem.Value = value
|
|
|
+ dataItem.CreateTime = time.Now()
|
|
|
+ dataItem.ModifyTime = time.Now()
|
|
|
+ dataItem.DataTimestamp = timestamp
|
|
|
+ addDataList = append(addDataList, dataItem)
|
|
|
+ if len(addDataList) > 500 {
|
|
|
+ err = dataObj.AddMulti(addDataList)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("批量新增指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ addDataList = make([]*models.BaseFromKplerData, 0)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
|
|
|
+ // 过滤0.50和0.5的比较
|
|
|
+ oldV, _ := strconv.ParseFloat(findData.Value, 64)
|
|
|
+ newV, _ := strconv.ParseFloat(value, 64)
|
|
|
+ if oldV == newV {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
|
|
|
+ dataObj.Value = value
|
|
|
+ dataObj.ModifyTime = time.Now()
|
|
|
+
|
|
|
+ updateDataColsArr := make([]string, 0)
|
|
|
+ updateDataColsArr = append(updateDataColsArr, "value")
|
|
|
+ updateDataColsArr = append(updateDataColsArr, "modify_time")
|
|
|
+ dataObj.Update(updateDataColsArr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(addDataList) > 0 {
|
|
|
+ err = dataObj.AddMulti(addDataList)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("批量新增指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ var dateItem *models.EdbInfoMaxAndMinInfo
|
|
|
+ dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexInfo.IndexCode)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ indexObj.ModifyIndexMaxAndMinDate(indexInfo.IndexCode, dateItem)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 同步刷新ETA指标库的指标
|
|
|
+ {
|
|
|
+ // 获取指标详情
|
|
|
+ baseObj := new(models.BaseFromKpler)
|
|
|
+ var edbInfo *models.EdbInfo
|
|
|
+ edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexInfo.IndexCode)
|
|
|
+ if err != nil {
|
|
|
+ if !utils.IsErrNoRow(err) {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexInfo.IndexCode, err.Error()))
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ err = nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 已经加入到指标库的话,那么就去更新ETA指标库吧
|
|
|
+ if edbInfo != nil {
|
|
|
+ go logic.RefreshBaseEdbInfo(edbInfo, ``)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// HandleKplerIndex 处理Kpler的excel数据
|
|
|
+func HandleKplerIndex(req *models.HandleKplerExcelDataReq) (err error) {
|
|
|
+ errMsgList := make([]string, 0)
|
|
|
+ defer func() {
|
|
|
+ if len(errMsgList) > 0 {
|
|
|
+ msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
|
|
|
+ utils.FileLog.Info(msg)
|
|
|
+ go alarm_msg.SendAlarmMsg(msg, 3)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ // 查询所有的一级分类
|
|
|
+ classifyObj := new(models.BaseFromKplerClassify)
|
|
|
+ classifyList, err := classifyObj.GetParentClassify()
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ classifyMap := make(map[string]int, 0)
|
|
|
+ for _, v := range classifyList {
|
|
|
+ classifyMap[v.ClassifyName] = int(v.ClassifyId)
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, v := range req.List {
|
|
|
+ if v.IndexName == "" || v.IndexCode == "" {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ err = handleKplerIndex(v, req.TerminalCode, classifyMap)
|
|
|
+ if err != nil {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// getKplerFlowDataLib 获取开普勒流数据
|
|
|
+func getKplerFlowDataLib(libUrl string, dataMap *models.KplerFlowDataLibReq) (resp *models.KplerFlowDataLibResp, err error) {
|
|
|
+ postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getFlowData")
|
|
|
+ postData, err := json.Marshal(dataMap)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ result, err := HttpPost(postUrl, string(postData), "application/json")
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";" + string(postData) + ";result:" + string(result))
|
|
|
+ err = json.Unmarshal(result, &resp)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return resp, nil
|
|
|
+}
|
|
|
+
|
|
|
+func getKplerProductLib(libUrl string) (resp *models.KplerProductLibResp, err error) {
|
|
|
+ postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getProductData")
|
|
|
+ result, err := HttpPost(postUrl, "", "application/json")
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
|
|
|
+ err = json.Unmarshal(result, &resp)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return resp, nil
|
|
|
+}
|
|
|
+
|
|
|
+func getKplerZoneLib(libUrl string) (resp *models.KplerZoneLibResp, err error) {
|
|
|
+ postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getZoneData")
|
|
|
+ result, err := HttpPost(postUrl, "", "application/json")
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
|
|
|
+ err = json.Unmarshal(result, &resp)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ return resp, nil
|
|
|
+}
|
|
|
+
|
|
|
+func handleKplerIndex(req *models.HandleKplerExcelData, terminalCode string, classifyMap map[string]int) (err error) {
|
|
|
+ indexName := req.IndexName
|
|
|
+ indexCode := req.IndexCode
|
|
|
+ excelDataMap := req.ExcelDataMap
|
|
|
+ errMsgList := make([]string, 0)
|
|
|
+ defer func() {
|
|
|
+ if len(errMsgList) > 0 {
|
|
|
+ msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
|
|
|
+ utils.FileLog.Info(msg)
|
|
|
+ go alarm_msg.SendAlarmMsg(msg, 3)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ indexObj := new(models.BaseFromKplerIndex)
|
|
|
+ dataObj := new(models.BaseFromKplerData)
|
|
|
+ classifyObj := new(models.BaseFromKplerClassify)
|
|
|
+
|
|
|
+ var indexId int64
|
|
|
+
|
|
|
+ addDataList := make([]*models.BaseFromKplerData, 0)
|
|
|
+
|
|
|
+ exitDataMap := make(map[string]*models.BaseFromKplerData)
|
|
|
+
|
|
|
+ // 修改指标信息
|
|
|
+ if indexName == "" {
|
|
|
+ utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 判断目录是否存在
|
|
|
+ var classifyId int64
|
|
|
+ now := time.Now()
|
|
|
+ if req.ClassifyName != "" {
|
|
|
+ classifyParentId := 0
|
|
|
+ level := 1
|
|
|
+ classifyParentId, _ = classifyMap[req.ParentClassifyName]
|
|
|
+ if classifyParentId > 0 {
|
|
|
+ level = 2
|
|
|
+ }
|
|
|
+ classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() == utils.ErrNoRow() {
|
|
|
+ //新增分类
|
|
|
+ classifyObj = &models.BaseFromKplerClassify{
|
|
|
+ ClassifyName: req.ClassifyName,
|
|
|
+ ClassifyNameEn: req.ClassifyName,
|
|
|
+ ParentId: classifyParentId,
|
|
|
+ SysUserId: 0,
|
|
|
+ SysUserRealName: "",
|
|
|
+ Level: level,
|
|
|
+ Sort: req.ClassifySort,
|
|
|
+ ModifyTime: now,
|
|
|
+ CreateTime: now,
|
|
|
+ }
|
|
|
+
|
|
|
+ classifyId, err = classifyObj.Add()
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("新增分类失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ classifyObj.ClassifyId = int(classifyId)
|
|
|
+ } else {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ classifyId = int64(classifyObj.ClassifyId)
|
|
|
+ classifyObj.ModifyTime = now
|
|
|
+ classifyObj.ParentId = classifyParentId
|
|
|
+ e := classifyObj.Update([]string{"ParentId", "ModifyTime"})
|
|
|
+ if e != nil {
|
|
|
+ fmt.Println("classifyObj Update Err:" + e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //判断指标是否存在
|
|
|
+ var isAdd int
|
|
|
+ item, err := indexObj.GetByIndexCode(indexCode)
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() == utils.ErrNoRow() {
|
|
|
+ isAdd = 1
|
|
|
+ err = nil
|
|
|
+ } else {
|
|
|
+ isAdd = -1
|
|
|
+ err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if item != nil && item.BaseFromKplerIndexId > 0 {
|
|
|
+ fmt.Println("item:", item)
|
|
|
+ isAdd = 2
|
|
|
+ } else {
|
|
|
+ isAdd = 1
|
|
|
+ }
|
|
|
+
|
|
|
+ if isAdd == 1 {
|
|
|
+ indexObj.IndexCode = indexCode
|
|
|
+ indexObj.IndexName = indexName
|
|
|
+ indexObj.Frequency = req.Frequency
|
|
|
+ indexObj.ClassifyId = int(classifyId)
|
|
|
+ indexObj.ProductNames = req.ProductNames
|
|
|
+ indexObj.FromZoneId = req.FromZoneId
|
|
|
+ indexObj.FromZoneName = req.FromZoneName
|
|
|
+ indexObj.ToZoneId = req.ToZoneId
|
|
|
+ indexObj.ToZoneName = req.ToZoneName
|
|
|
+ indexObj.FlowDirection = req.FlowDirection
|
|
|
+ indexObj.Granularity = req.Granularity
|
|
|
+ indexObj.Split = req.Split
|
|
|
+ indexObj.Unit = req.Unit
|
|
|
+ indexObj.Sort = req.Sort
|
|
|
+ indexObj.ModifyTime = time.Now()
|
|
|
+ indexObj.CreateTime = time.Now()
|
|
|
+ indexObj.TerminalCode = terminalCode
|
|
|
+ err = indexObj.Add()
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexId = int64(indexObj.BaseFromKplerIndexId)
|
|
|
+ } else if isAdd == 2 {
|
|
|
+ indexId = int64(item.BaseFromKplerIndexId)
|
|
|
+ if item.TerminalCode == `` && terminalCode != `` {
|
|
|
+ item.TerminalCode = terminalCode
|
|
|
+ err = item.Update([]string{"TerminalCode"})
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
|
|
|
+ indexObj.IndexName = indexName
|
|
|
+ indexObj.Frequency = req.Frequency
|
|
|
+ indexObj.ClassifyId = int(classifyId)
|
|
|
+ indexObj.ProductNames = req.ProductNames
|
|
|
+ indexObj.FromZoneId = req.FromZoneId
|
|
|
+ indexObj.FromZoneName = req.FromZoneName
|
|
|
+ indexObj.ToZoneId = req.ToZoneId
|
|
|
+ indexObj.ToZoneName = req.ToZoneName
|
|
|
+ indexObj.FlowDirection = req.FlowDirection
|
|
|
+ indexObj.Granularity = req.Granularity
|
|
|
+ indexObj.Split = req.Split
|
|
|
+ indexObj.Unit = req.Unit
|
|
|
+ indexObj.Sort = req.Sort
|
|
|
+ indexObj.ModifyTime = time.Now()
|
|
|
+
|
|
|
+ //修改数据
|
|
|
+ updateColsArr := make([]string, 0)
|
|
|
+ updateColsArr = append(updateColsArr, "index_name")
|
|
|
+ updateColsArr = append(updateColsArr, "classify_id")
|
|
|
+ updateColsArr = append(updateColsArr, "product_names")
|
|
|
+ updateColsArr = append(updateColsArr, "from_zone_id")
|
|
|
+ updateColsArr = append(updateColsArr, "from_zone_name")
|
|
|
+ updateColsArr = append(updateColsArr, "to_zone_id")
|
|
|
+ updateColsArr = append(updateColsArr, "to_zone_name")
|
|
|
+ updateColsArr = append(updateColsArr, "flow_direction")
|
|
|
+ updateColsArr = append(updateColsArr, "granularity")
|
|
|
+ updateColsArr = append(updateColsArr, "split")
|
|
|
+ updateColsArr = append(updateColsArr, "frequency")
|
|
|
+ updateColsArr = append(updateColsArr, "sort")
|
|
|
+ updateColsArr = append(updateColsArr, "modify_time")
|
|
|
+
|
|
|
+ e := indexObj.Update(updateColsArr)
|
|
|
+ if e != nil {
|
|
|
+ fmt.Println("Index Update Err:" + e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //获取已存在的所有数据
|
|
|
+ var exitDataList []*models.BaseFromKplerData
|
|
|
+ exitDataList, err = dataObj.GetByIndexCode(indexCode)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ fmt.Println("exitDataListLen:", len(exitDataList))
|
|
|
+ for _, v := range exitDataList {
|
|
|
+ dateStr := v.DataTime
|
|
|
+ exitDataMap[dateStr] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
|
|
|
+ for date, value := range excelDataMap {
|
|
|
+ if findData, ok := exitDataMap[date]; !ok {
|
|
|
+ _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var saveDataTime time.Time
|
|
|
+ if strings.Contains(date, "00:00:00") {
|
|
|
+ saveDataTime, err = time.Parse(utils.FormatDateTime, date)
|
|
|
+ } else {
|
|
|
+ saveDataTime, err = time.Parse(utils.FormatDate, date)
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ timestamp := saveDataTime.UnixNano() / 1e6
|
|
|
+
|
|
|
+ dataItem := new(models.BaseFromKplerData)
|
|
|
+ dataItem.BaseFromKplerIndexId = int(indexId)
|
|
|
+ dataItem.IndexCode = indexCode
|
|
|
+ dataItem.DataTime = date
|
|
|
+ dataItem.Value = value
|
|
|
+ dataItem.CreateTime = time.Now()
|
|
|
+ dataItem.ModifyTime = time.Now()
|
|
|
+ dataItem.DataTimestamp = timestamp
|
|
|
+ addDataList = append(addDataList, dataItem)
|
|
|
+ if len(addDataList) > 500 {
|
|
|
+ err = dataObj.AddMulti(addDataList)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("批量新增指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ addDataList = make([]*models.BaseFromKplerData, 0)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
|
|
|
+ // 过滤0.50和0.5的比较
|
|
|
+ oldV, _ := strconv.ParseFloat(findData.Value, 64)
|
|
|
+ newV, _ := strconv.ParseFloat(value, 64)
|
|
|
+ if oldV == newV {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
|
|
|
+ dataObj.Value = value
|
|
|
+ dataObj.ModifyTime = time.Now()
|
|
|
+
|
|
|
+ updateDataColsArr := make([]string, 0)
|
|
|
+ updateDataColsArr = append(updateDataColsArr, "value")
|
|
|
+ updateDataColsArr = append(updateDataColsArr, "modify_time")
|
|
|
+ dataObj.Update(updateDataColsArr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(addDataList) > 0 {
|
|
|
+ err = dataObj.AddMulti(addDataList)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("批量新增指标失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ var dateItem *models.EdbInfoMaxAndMinInfo
|
|
|
+ dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 同步刷新ETA指标库的指标
|
|
|
+ {
|
|
|
+ // 获取指标详情
|
|
|
+ baseObj := new(models.BaseFromKpler)
|
|
|
+ var edbInfo *models.EdbInfo
|
|
|
+ edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
|
|
|
+ if err != nil {
|
|
|
+ if !utils.IsErrNoRow(err) {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
|
|
|
+ return
|
|
|
+ } else {
|
|
|
+ err = nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 已经加入到指标库的话,那么就去更新ETA指标库吧
|
|
|
+ if edbInfo != nil {
|
|
|
+ go logic.RefreshBaseEdbInfo(edbInfo, ``)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func HttpPost(url, postData string, params ...string) ([]byte, error) {
|
|
|
+ body := ioutil.NopCloser(strings.NewReader(postData))
|
|
|
+ client := &http.Client{}
|
|
|
+ req, err := http.NewRequest("POST", url, body)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ contentType := "application/x-www-form-urlencoded;charset=utf-8"
|
|
|
+ if len(params) > 0 && params[0] != "" {
|
|
|
+ contentType = params[0]
|
|
|
+ }
|
|
|
+ req.Header.Set("Content-Type", contentType)
|
|
|
+ req.Header.Set("authorization", utils.MD5(utils.APP_EDB_DATA_ANALYSIS+utils.EDB_DATA_ANALYSIS_Md5_KEY))
|
|
|
+ resp, err := client.Do(req)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ defer resp.Body.Close()
|
|
|
+ b, err := ioutil.ReadAll(resp.Body)
|
|
|
+ utils.FileLog.Debug("HttpPost:" + string(b))
|
|
|
+ return b, err
|
|
|
+}
|
|
|
+
|
|
|
+func getKplerFrequency(granularity string) (frequency string) {
|
|
|
+ switch granularity {
|
|
|
+ case "daily":
|
|
|
+ return "日度"
|
|
|
+ case "weekly":
|
|
|
+ return "周度"
|
|
|
+ case "monthly":
|
|
|
+ return "月度"
|
|
|
+ case "yearly":
|
|
|
+ return "年度"
|
|
|
+ }
|
|
|
+ return ""
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func InitKplerProduct ()(err error) {
|
|
|
+ libUrl := "http://127.0.0.1:8915"
|
|
|
+ libResp, err := getKplerProductLib(libUrl)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if libResp.Ret != 200 {
|
|
|
+ fmt.Println("获取开普勒产品库失败", libResp.ErrMsg)
|
|
|
+ utils.FileLog.Info("获取开普勒产品库失败", libResp.ErrMsg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ classifyList, err := models.GetAllBaseFromKplerClassify()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ classifyMap := make(map[int]*models.BaseFromKplerClassify)
|
|
|
+ for _, v := range classifyList {
|
|
|
+ classifyMap[v.ProductId] = v
|
|
|
+ }
|
|
|
+ // 批量插入分类表中
|
|
|
+ // family > group(commodity) > product() > grade
|
|
|
+ sort := 0
|
|
|
+ for _, v := range libResp.Data {
|
|
|
+ id, _ := strconv.Atoi(v.Id)
|
|
|
+ if classifyObj, ok := classifyMap[id]; !ok {
|
|
|
+ classifyObj = new(models.BaseFromKplerClassify)
|
|
|
+ classifyObj.ClassifyName = v.Name
|
|
|
+ classifyObj.ClassifyNameEn = v.Name
|
|
|
+ classifyObj.ModifyTime = time.Now()
|
|
|
+ classifyObj.CreateTime = time.Now()
|
|
|
+ classifyObj.Sort = sort
|
|
|
+ classifyObj.Level = 1
|
|
|
+ classifyObj.ParentId = 0
|
|
|
+ classifyObj.SysUserId = 0
|
|
|
+ classifyObj.SysUserRealName = ""
|
|
|
+ classifyObj.ProductId = id
|
|
|
+ classifyObj.ProductName = v.Name
|
|
|
+ classifyObj.AncestorFamilyId, _ = strconv.Atoi(v.FamilyId)
|
|
|
+ classifyObj.AncestorFamilyName = v.Family
|
|
|
+ classifyObj.AncestorGroupId, _ = strconv.Atoi(v.GroupId)
|
|
|
+ classifyObj.AncestorGroupName = v.Group
|
|
|
+ classifyObj.AncestorProductId, _ = strconv.Atoi(v.ProductId)
|
|
|
+ classifyObj.AncestorProductName = v.Product
|
|
|
+ classifyObj.AncestorGradeId, _ = strconv.Atoi(v.GradeId)
|
|
|
+ classifyObj.AncestorGradeName = v.Grade
|
|
|
+ classifyObj.ClassifyType = v.Type
|
|
|
+ _, err = classifyObj.Add()
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("新增开普勒产品库失败", err)
|
|
|
+ utils.FileLog.Info("新增开普勒产品库失败", err)
|
|
|
+ }
|
|
|
+ classifyMap[id] = classifyObj
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, v := range classifyMap {
|
|
|
+ if v.ParentId == 0 {
|
|
|
+ if v.AncestorGradeId > 0 && v.AncestorGradeName != v.ClassifyName {
|
|
|
+ v.ParentId = classifyMap[v.AncestorGradeId].ClassifyId
|
|
|
+ } else if v.AncestorProductId > 0 && v.AncestorProductName != v.ClassifyName {
|
|
|
+ v.ParentId = classifyMap[v.AncestorProductId].ClassifyId
|
|
|
+ } else if v.AncestorGroupId > 0 && v.AncestorGroupName != v.ClassifyName {
|
|
|
+ v.ParentId = classifyMap[v.AncestorGroupId].ClassifyId
|
|
|
+ }
|
|
|
+ err = v.Update([]string{"ParentId"})
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("更新开普勒产品库失败", err)
|
|
|
+ utils.FileLog.Info("更新开普勒产品库失败", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Println("classifyList:", classifyList)
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func InitKplerZone() (err error) {
|
|
|
+ libUrl := "http://127.0.0.1:8915"
|
|
|
+ libResp, err := getKplerZoneLib(libUrl)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if libResp.Ret != 200 {
|
|
|
+ fmt.Println("获取开普勒区域库失败", libResp.ErrMsg)
|
|
|
+ utils.FileLog.Info("获取开普勒区域库失败", libResp.ErrMsg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // 批量插入区域表中
|
|
|
+ for _, v := range libResp.Data {
|
|
|
+ zoneObj := new(models.BaseFromKplerZone)
|
|
|
+ zoneObj.ZoneName = v.AncestorName
|
|
|
+ zoneObj.ZoneType = v.AncestorType
|
|
|
+ zoneObj.AncestorId, _ = strconv.Atoi(v.AncestorId)
|
|
|
+ zoneObj.AncestorType = v.AncestorType
|
|
|
+ zoneObj.AncestorName = v.AncestorName
|
|
|
+ zoneObj.DescendantId, _ = strconv.Atoi(v.DescendantId)
|
|
|
+ zoneObj.DescendantName = v.DescendantName
|
|
|
+ zoneObj.ModifyTime = time.Now()
|
|
|
+ zoneObj.CreateTime = time.Now()
|
|
|
+ _, err = zoneObj.Add()
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("新增开普勒区域库失败", err)
|
|
|
+ utils.FileLog.Info("新增开普勒区域库失败", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|