|
- package services
- import (
- "encoding/json"
- "eta/eta_index_lib/logic"
- "eta/eta_index_lib/models"
- "eta/eta_index_lib/services/alarm_msg"
- "eta/eta_index_lib/utils"
- "fmt"
- "io/ioutil"
- "net/http"
- "strconv"
- "strings"
- "time"
- )
- // GetKplerDataByApi 获取开普勒数据
- func GetKplerDataByApi(params models.KplerSearchEdbReq, terminalCode string, isRefresh bool) (indexes []*models.KplerIndexItem, apiQueryUrl string, terminalInfo *models.EdbTerminal, err error) {
- terminal, e := GetApiTerminal(utils.DATA_SOURCE_KPLER, terminalCode)
- if e != nil {
- err = fmt.Errorf("获取开普勒终端配置失败, %v", e)
- return
- }
-
- if terminal.ServerUrl == "" {
- err = fmt.Errorf("开普勒终端地址未配置")
- return
- }
-
- // 走API
- if terminal.IsApi == 1 {
- indexes, apiQueryUrl, err = getKplerDataByApi(params, terminal.ServerUrl, isRefresh)
- if err != nil {
- err = fmt.Errorf("获取开普勒指标数据失败, %v", err)
- return
- }
- terminalInfo = terminal
- return
- }
- return
- }
- // getEdbDataFromThsHfHttp API-获取高频指标数据
- func getKplerDataByApi(params models.KplerSearchEdbReq, serverUrl string, isRefresh bool) (list []*models.KplerIndexItem, apiQueryUrl string, err error) {
- defer func() {
- if err != nil {
- tips := fmt.Sprintf("开普勒指标API-getKplerDataByApi err: %v", err)
- utils.FileLog.Info(tips)
- }
- }()
- // todo 判断指标是否已存在
- productNamesStr := params.ProductNames
- fromZoneNamesStr := params.FromZoneNames
- toZoneNamesStr := params.ToZoneNames
-
- granularity := GetKplerGranularity(params.Granularity)
- libreq := new(models.KplerFlowDataLibReq)
- libreq.Products = productNamesStr
- libreq.FromZones = fromZoneNamesStr
- libreq.ToZones = toZoneNamesStr
- libreq.Split = params.Split
- libreq.FlowDirection = params.FlowDirection
- libreq.Granularity = granularity
- libreq.Unit = params.Unit
- libreq.WithIntraRegion = "true"
- libreq.WithForecast = "true"
- libreq.OnlyRealized = "false"
- libreq.StartDate = "2013-01-01"
- libreq.EndDate = time.Now().Format(utils.FormatDate)
- // 请求接口
- apiResp, err := getKplerFlowDataLib(serverUrl, libreq)
- if err != nil {
- err = fmt.Errorf("开普勒指标API-getKplerDataByApi err: %v", err)
- return
- }
- if apiResp.Ret != 200 {
- err = fmt.Errorf("开普勒指标API-状态码: %d, 提示信息: %s", apiResp.Ret, apiResp.ErrMsg)
- return
- }
- indexes := make([]*models.KplerIndexItem, 0)
- if len(apiResp.Data.List) == 0 {
- utils.FileLog.Info("开普勒指标API-无数据")
- return
- }
- indexCodeList := make([]string, 0)
- existIndexCodeMap := make(map[string]bool)
- flowDirection := params.FlowDirection
- prefixIndexName := fmt.Sprintf("kpler%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
- lastIndexName := params.Granularity
- prefixIndexCode := fmt.Sprintf("k%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
- // 获取首字母
- lastIndexCode := strings.ToUpper(params.Granularity[:1])
- apiQueryUrl = apiResp.Data.ApiQueryUrl
- // Tables中的每一个对应一个证券代码
- for _, v := range apiResp.Data.List {
- index := new(models.KplerIndexItem)
- indexName := ""
- indexCode := ""
-
- indexName = fmt.Sprintf("%s%s%s", prefixIndexName, v.SplitItem, lastIndexName)
- indexCode = fmt.Sprintf("%s%s%s", prefixIndexCode,v.SplitItem, lastIndexCode)
-
- indexCodeList = append(indexCodeList, indexCode)
- index.IndexCode = indexCode
- index.IndexName = indexName
- index.Frequency = getKplerFrequency(params.Granularity)
- index.Unit = params.Unit
- index.IndexData = v.IndexData
- indexes = append(indexes, index)
- }
- indexObj := new(models.BaseFromKplerIndex)
- // 查询指标是否存在
- existList, err := indexObj.GetByIndexCodes(indexCodeList)
- if err != nil {
- err = fmt.Errorf("查询指标是否存在失败, %v", err)
- return
- }
- for _, v := range existList {
- existIndexCodeMap[v.IndexCode] = true
- }
- // 过滤已经存在的指标, 如果是刷新指标,则无需过滤
- list = make([]*models.KplerIndexItem, 0)
- for _, v := range indexes {
- if _, ok := existIndexCodeMap[v.IndexCode]; !ok || isRefresh {
- list = append(list, v)
- }
- }
- return
- }
- func AddKplerIndexByApi(indexList []*models.KplerIndexItem, req *models.KplerSearchEdbReq, apiQueryUrl string, classifyId int, terminalCode string) (err error) {
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
-
- maxSort := 0
- for _, indexInfo := range indexList {
- indexObj := new(models.BaseFromKplerIndex)
- indexId := 0
- //判断指标是否存在
- var isAdd int
- item, er := indexObj.GetByIndexCode(indexInfo.IndexCode)
- if er != nil {
- if er.Error() == utils.ErrNoRow() {
- isAdd = 1
- err = nil
- } else {
- errMsgList = append(errMsgList, fmt.Sprintf("查询数据源指标库失败 GetByIndexCode Err:%s", er))
- continue
- }
- }
- if item != nil && item.BaseFromKplerIndexId > 0 {
- fmt.Println("item:", item)
- isAdd = 2
- } else {
- isAdd = 1
- }
- fromZoneIdsStr := ""
- toZoneIdsStr := ""
- if len(req.FromZoneIds) > 0 {
- for _, v := range req.FromZoneIds {
- fromZoneIdsStr = fmt.Sprintf("%s%d,", fromZoneIdsStr, v)
- }
- }
- if len(req.ToZoneIds) > 0 {
- for _, v := range req.ToZoneIds {
- toZoneIdsStr = fmt.Sprintf("%s%d,", toZoneIdsStr, v)
- }
- }
- // 批量新增指标
- if isAdd == 1 {
- indexObj.IndexCode = indexInfo.IndexCode
- indexObj.IndexName = indexInfo.IndexName
- indexObj.Frequency = indexInfo.Frequency
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneId = fromZoneIdsStr
- indexObj.ToZoneId = toZoneIdsStr
- indexObj.FromZoneName = req.FromZoneNames
- indexObj.ToZoneName = req.ToZoneNames
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = GetKplerGranularity(req.Granularity)
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.Sort = maxSort
- indexObj.ApiQueryUrl = apiQueryUrl
- indexObj.ModifyTime = time.Now()
- indexObj.CreateTime = time.Now()
- indexObj.TerminalCode = terminalCode
- err = indexObj.Add()
- if err != nil {
- err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
- return
- }
- indexId = indexObj.BaseFromKplerIndexId
- maxSort++
- } else if isAdd == 2 {
- indexId = item.BaseFromKplerIndexId
- if item.TerminalCode == `` && terminalCode != `` {
- item.TerminalCode = terminalCode
- err = item.Update([]string{"TerminalCode"})
- if err != nil {
- err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
- return
- }
- }
-
- indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
- indexObj.IndexName = indexInfo.IndexName
- indexObj.Frequency = indexInfo.Frequency
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneId = fromZoneIdsStr
- indexObj.ToZoneId = toZoneIdsStr
- indexObj.FromZoneName = req.FromZoneNames
- indexObj.ToZoneName = req.ToZoneNames
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = GetKplerGranularity(req.Granularity)
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.ApiQueryUrl = apiQueryUrl
- indexObj.ModifyTime = time.Now()
-
- //修改数据
- updateColsArr := make([]string, 0)
- updateColsArr = append(updateColsArr, "index_name")
- updateColsArr = append(updateColsArr, "classify_id")
- updateColsArr = append(updateColsArr, "product_names")
- updateColsArr = append(updateColsArr, "from_zone_id")
- updateColsArr = append(updateColsArr, "to_zone_id")
- updateColsArr = append(updateColsArr, "from_zone_name")
- updateColsArr = append(updateColsArr, "to_zone_name")
- updateColsArr = append(updateColsArr, "flow_direction")
- updateColsArr = append(updateColsArr, "granularity")
- updateColsArr = append(updateColsArr, "split")
- updateColsArr = append(updateColsArr, "frequency")
- updateColsArr = append(updateColsArr, "modify_time")
- updateColsArr = append(updateColsArr, "api_query_url")
- e := indexObj.Update(updateColsArr)
- if e != nil {
- fmt.Println("Index Update Err:" + e.Error())
- return
- }
- }
- dataObj := new(models.BaseFromKplerData)
- exitDataMap := make(map[string]*models.BaseFromKplerData)
- //获取已存在的所有数据
- var exitDataList []*models.BaseFromKplerData
- exitDataList, err = dataObj.GetByIndexCode(indexInfo.IndexCode)
- if err != nil {
- err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
- return
- }
- fmt.Println("exitDataListLen:", len(exitDataList))
- for _, v := range exitDataList {
- dateStr := v.DataTime
- exitDataMap[dateStr] = v
- }
- addDataList := make([]*models.BaseFromKplerData, 0)
-
- // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
- for _, value := range indexInfo.IndexData {
- date := value.DataTime
- value := value.Value
- if findData, ok := exitDataMap[date]; !ok {
- _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- return
- }
- var saveDataTime time.Time
- if strings.Contains(date, "00:00:00") {
- saveDataTime, err = time.Parse(utils.FormatDateTime, date)
- } else {
- saveDataTime, err = time.Parse(utils.FormatDate, date)
- }
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- continue
- }
- timestamp := saveDataTime.UnixNano() / 1e6
-
- dataItem := new(models.BaseFromKplerData)
- dataItem.BaseFromKplerIndexId = int(indexId)
- dataItem.IndexCode = indexInfo.IndexCode
- dataItem.DataTime = date
- dataItem.Value = value
- dataItem.CreateTime = time.Now()
- dataItem.ModifyTime = time.Now()
- dataItem.DataTimestamp = timestamp
- addDataList = append(addDataList, dataItem)
- if len(addDataList) > 500 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- addDataList = make([]*models.BaseFromKplerData, 0)
- }
- } else {
- if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
- // 过滤0.50和0.5的比较
- oldV, _ := strconv.ParseFloat(findData.Value, 64)
- newV, _ := strconv.ParseFloat(value, 64)
- if oldV == newV {
- continue
- }
- dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
- dataObj.Value = value
- dataObj.ModifyTime = time.Now()
-
- updateDataColsArr := make([]string, 0)
- updateDataColsArr = append(updateDataColsArr, "value")
- updateDataColsArr = append(updateDataColsArr, "modify_time")
- dataObj.Update(updateDataColsArr)
- }
- }
- }
-
- if len(addDataList) > 0 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- }
-
- var dateItem *models.EdbInfoMaxAndMinInfo
- dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexInfo.IndexCode)
- if err != nil {
- err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
- return
- }
-
- go func() {
- indexObj.ModifyIndexMaxAndMinDate(indexInfo.IndexCode, dateItem)
- }()
-
- // 同步刷新ETA指标库的指标
- {
- // 获取指标详情
- baseObj := new(models.BaseFromKpler)
- var edbInfo *models.EdbInfo
- edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexInfo.IndexCode)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexInfo.IndexCode, err.Error()))
- return
- } else {
- err = nil
- }
- }
-
- // 已经加入到指标库的话,那么就去更新ETA指标库吧
- if edbInfo != nil {
- go logic.RefreshBaseEdbInfo(edbInfo, ``)
- }
- }
- }
- return
- }
- // HandleKplerIndex 处理Kpler的excel数据
- func HandleKplerIndex(req *models.HandleKplerExcelDataReq) (err error) {
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
- // 查询所有的一级分类
- classifyObj := new(models.BaseFromKplerClassify)
- classifyList, err := classifyObj.GetParentClassify()
- if err != nil {
- err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
- return
- }
- classifyMap := make(map[string]int, 0)
- for _, v := range classifyList {
- classifyMap[v.ClassifyName] = int(v.ClassifyId)
- }
- for _, v := range req.List {
- err = handleKplerIndex(v, req.TerminalCode, classifyMap)
- if err != nil {
- errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
- return
- }
- }
- return
- }
- // getKplerFlowDataLib 获取开普勒流数据
- func getKplerFlowDataLib(libUrl string, dataMap *models.KplerFlowDataLibReq) (resp *models.KplerFlowDataLibResp, err error) {
- postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getFlowData")
- postData, err := json.Marshal(dataMap)
- if err != nil {
- return
- }
- result, err := HttpPost(postUrl, string(postData), "application/json")
- if err != nil {
- return
- }
- utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";" + string(postData) + ";result:" + string(result))
- err = json.Unmarshal(result, &resp)
- if err != nil {
- return
- }
- return resp, nil
- }
- func getKplerProductLib(libUrl string, req *models.KplerProductLibReq) (resp *models.KplerProductLibResp, err error) {
- postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getProductData")
- postData, err := json.Marshal(req)
- if err != nil {
- return
- }
- result, err := HttpPost(postUrl, string(postData), "application/json")
- if err != nil {
- return
- }
- utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
- err = json.Unmarshal(result, &resp)
- if err != nil {
- return
- }
- return resp, nil
- }
- func getKplerZoneLib(libUrl string) (resp *models.KplerZoneLibResp, err error) {
- postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getZoneData")
- result, err := HttpPost(postUrl, "", "application/json")
- if err != nil {
- return
- }
- utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
- err = json.Unmarshal(result, &resp)
- if err != nil {
- return
- }
- return resp, nil
- }
- func handleKplerIndex(req *models.HandleKplerExcelData, terminalCode string, classifyMap map[string]int) (err error) {
- indexName := req.IndexName
- indexCode := req.IndexCode
- excelDataMap := req.ExcelDataMap
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
- indexObj := new(models.BaseFromKplerIndex)
- dataObj := new(models.BaseFromKplerData)
- classifyObj := new(models.BaseFromKplerClassify)
- var indexId int64
- addDataList := make([]*models.BaseFromKplerData, 0)
- exitDataMap := make(map[string]*models.BaseFromKplerData)
- // 修改指标信息
- if indexName == "" {
- utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
- return
- }
- if req.ClassifyName == "" {
- // 获取产品的名称,把产品的第一个名称作为分类ID
- productNames := strings.Split(req.ProductNames, ",")
- if len(productNames) > 0 {
- req.ClassifyName = productNames[0]
- }
- }
- fromZoneNamesStr := req.FromZoneNames
- flowDirection := req.FlowDirection
- toZoneNamesStr := req.ToZoneNames
- productNamesStr := req.ProductNames
- // 查询fromZoneIds
- zoneObj := new(models.BaseFromKplerZone)
- fromZoneNames := strings.Split(fromZoneNamesStr, ",")
- fromZoneIdsStr := ""
- // 查询 zone
- if len(fromZoneNames) > 0 {
- fromZoneIds, er := zoneObj.GetIdsByZoneNames(fromZoneNames)
- if er != nil {
- err = fmt.Errorf("查询区域失败 Err:%s", err)
- return
- }
- for _, v := range fromZoneIds {
- fromZoneIdsStr = fmt.Sprintf("%s%d,", fromZoneIdsStr, v)
- }
- }
-
- // 查询toZoneIds
- toZoneNames := strings.Split(toZoneNamesStr, ",")
- toZoneIdsStr := ""
- if len(toZoneNames) > 0 {
- toZoneIds, er := zoneObj.GetIdsByZoneNames(toZoneNames)
- if er != nil {
- err = fmt.Errorf("查询区域失败 Err:%s", err)
- return
- }
- for _, v := range toZoneIds {
- toZoneIdsStr = fmt.Sprintf("%s%d,", toZoneIdsStr, v)
- }
- }
- // 拼接指标编码
- prefixIndexCode := fmt.Sprintf("k%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
- // 获取首字母
- lastIndexCode := strings.ToUpper(req.Granularity[:1])
- indexCode = fmt.Sprintf("%s%s%s", prefixIndexCode,req.SplitName, lastIndexCode)
- // 判断目录是否存在
- var classifyId int64
- now := time.Now()
- if req.ClassifyName != "" {
- classifyParentId := 0
- level := 1
- classifyParentId, _ = classifyMap[req.ParentClassifyName]
- if classifyParentId > 0 {
- level = 2
- }
- classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
- if err != nil {
- if err.Error() == utils.ErrNoRow() {
- //新增分类
- classifyObj = &models.BaseFromKplerClassify{
- ClassifyName: req.ClassifyName,
- ClassifyNameEn: req.ClassifyName,
- ParentId: classifyParentId,
- SysUserId: 0,
- SysUserRealName: "",
- Level: level,
- Sort: req.ClassifySort,
- ModifyTime: now,
- CreateTime: now,
- }
- classifyId, err = classifyObj.Add()
- if err != nil {
- err = fmt.Errorf("新增分类失败 Err:%s", err)
- return
- }
- classifyObj.ClassifyId = int(classifyId)
- } else {
- return
- }
- }else {
- classifyId = int64(classifyObj.ClassifyId)
- }
- }
- //判断指标是否存在
- var isAdd int
- item, err := indexObj.GetByIndexCode(indexCode)
- if err != nil {
- if err.Error() == utils.ErrNoRow() {
- isAdd = 1
- err = nil
- } else {
- isAdd = -1
- err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
- return
- }
- }
- if item != nil && item.BaseFromKplerIndexId > 0 {
- fmt.Println("item:", item)
- isAdd = 2
- } else {
- isAdd = 1
- }
- if isAdd == 1 {
- indexObj.IndexCode = indexCode
- indexObj.IndexName = indexName
- indexObj.Frequency = getKplerFrequency(req.Granularity)
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneId = fromZoneIdsStr
- indexObj.ToZoneId = toZoneIdsStr
- indexObj.FromZoneName = fromZoneNamesStr
- indexObj.ToZoneName = toZoneNamesStr
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = req.Granularity
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.Sort = req.Sort
- indexObj.ExcelQueryUrl = req.ExcelQueryUrl
- indexObj.ModifyTime = time.Now()
- indexObj.CreateTime = time.Now()
- indexObj.TerminalCode = terminalCode
- err = indexObj.Add()
- if err != nil {
- err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
- return
- }
- indexId = int64(indexObj.BaseFromKplerIndexId)
- } else if isAdd == 2 {
- indexId = int64(item.BaseFromKplerIndexId)
- if item.TerminalCode == `` && terminalCode != `` {
- item.TerminalCode = terminalCode
- err = item.Update([]string{"TerminalCode"})
- if err != nil {
- err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
- return
- }
- }
- indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
- indexObj.IndexName = indexName
- indexObj.Frequency = getKplerFrequency(req.Granularity)
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneName = fromZoneNamesStr
- indexObj.ToZoneName = toZoneNamesStr
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = req.Granularity
- indexObj.Split = req.Split
- indexObj.ExcelQueryUrl = req.ExcelQueryUrl
- indexObj.Unit = req.Unit
- indexObj.Sort = req.Sort
- indexObj.ModifyTime = time.Now()
- //修改数据
- updateColsArr := make([]string, 0)
- updateColsArr = append(updateColsArr, "index_name")
- updateColsArr = append(updateColsArr, "classify_id")
- updateColsArr = append(updateColsArr, "product_names")
- updateColsArr = append(updateColsArr, "from_zone_id")
- updateColsArr = append(updateColsArr, "from_zone_name")
- updateColsArr = append(updateColsArr, "to_zone_id")
- updateColsArr = append(updateColsArr, "to_zone_name")
- updateColsArr = append(updateColsArr, "flow_direction")
- updateColsArr = append(updateColsArr, "granularity")
- updateColsArr = append(updateColsArr, "split")
- updateColsArr = append(updateColsArr, "frequency")
- updateColsArr = append(updateColsArr, "sort")
- updateColsArr = append(updateColsArr, "modify_time")
- e := indexObj.Update(updateColsArr)
- if e != nil {
- fmt.Println("Index Update Err:" + e.Error())
- return
- }
- }
- //获取已存在的所有数据
- var exitDataList []*models.BaseFromKplerData
- exitDataList, err = dataObj.GetByIndexCode(indexCode)
- if err != nil {
- err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
- return
- }
- fmt.Println("exitDataListLen:", len(exitDataList))
- for _, v := range exitDataList {
- dateStr := v.DataTime
- exitDataMap[dateStr] = v
- }
- // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
- for date, value := range excelDataMap {
- if findData, ok := exitDataMap[date]; !ok {
- _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- return
- }
- var saveDataTime time.Time
- if strings.Contains(date, "00:00:00") {
- saveDataTime, err = time.Parse(utils.FormatDateTime, date)
- } else {
- saveDataTime, err = time.Parse(utils.FormatDate, date)
- }
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- continue
- }
- timestamp := saveDataTime.UnixNano() / 1e6
- dataItem := new(models.BaseFromKplerData)
- dataItem.BaseFromKplerIndexId = int(indexId)
- dataItem.IndexCode = indexCode
- dataItem.DataTime = date
- dataItem.Value = value
- dataItem.CreateTime = time.Now()
- dataItem.ModifyTime = time.Now()
- dataItem.DataTimestamp = timestamp
- addDataList = append(addDataList, dataItem)
- if len(addDataList) > 500 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- addDataList = make([]*models.BaseFromKplerData, 0)
- }
- } else {
- if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
- // 过滤0.50和0.5的比较
- oldV, _ := strconv.ParseFloat(findData.Value, 64)
- newV, _ := strconv.ParseFloat(value, 64)
- if oldV == newV {
- continue
- }
- dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
- dataObj.Value = value
- dataObj.ModifyTime = time.Now()
- updateDataColsArr := make([]string, 0)
- updateDataColsArr = append(updateDataColsArr, "value")
- updateDataColsArr = append(updateDataColsArr, "modify_time")
- dataObj.Update(updateDataColsArr)
- }
- }
- }
- if len(addDataList) > 0 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- }
- var dateItem *models.EdbInfoMaxAndMinInfo
- dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
- if err != nil {
- err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
- return
- }
- go func() {
- indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
- }()
- // 同步刷新ETA指标库的指标
- {
- // 获取指标详情
- baseObj := new(models.BaseFromKpler)
- var edbInfo *models.EdbInfo
- edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
- return
- } else {
- err = nil
- }
- }
- // 已经加入到指标库的话,那么就去更新ETA指标库吧
- if edbInfo != nil {
- go logic.RefreshBaseEdbInfo(edbInfo, ``)
- }
- }
- return
- }
- func HttpPost(url, postData string, params ...string) ([]byte, error) {
- body := ioutil.NopCloser(strings.NewReader(postData))
- client := &http.Client{}
- req, err := http.NewRequest("POST", url, body)
- if err != nil {
- return nil, err
- }
- contentType := "application/x-www-form-urlencoded;charset=utf-8"
- if len(params) > 0 && params[0] != "" {
- contentType = params[0]
- }
- req.Header.Set("Content-Type", contentType)
- req.Header.Set("authorization", utils.MD5(utils.APP_EDB_DATA_ANALYSIS+utils.EDB_DATA_ANALYSIS_Md5_KEY))
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- b, err := ioutil.ReadAll(resp.Body)
- utils.FileLog.Debug("HttpPost:" + string(b))
- return b, err
- }
- func getKplerFrequency(granularity string) (frequency string) {
- switch granularity {
- case "daily", "Daily", "days":
- return "日度"
- case "weekly", "Weekly", "weeks":
- return "周度"
- case "monthly", "Monthly", "months":
- return "月度"
- case "yearly", "Yearly", "years":
- return "年度"
- }
- return ""
- }
- func GetKplerGranularity(frequency string) (granularity string) {
- switch frequency {
- case "daily", "Daily", "days":
- return "days"
- case "weekly", "Weekly", "weeks":
- return "weeks"
- case "monthly", "Monthly", "months":
- return "months"
- case "yearly", "Yearly", "years":
- return "years"
- case "quarterly", "Quarters", "quarters":
- return "quarters"
- }
- return ""
- }
- func InitKplerProduct ()(err error) {
- libUrl := "http://127.0.0.1:8915"
- libResp, err := getKplerProductLib(libUrl, &models.KplerProductLibReq{
- AncestorFamilyIds: "",
- AncestorFamilyNames: "",
- AncestorGroupIds: "",
- AncestorGroupNames: "",
- AncestorProductIds: "",
- AncestorProductNames: "",
- AncestorGradeIds: "",
- AncestorGradeNames: "",
- Products: "",
- ProductIds: "",
- })
- if err != nil {
- return
- }
- if libResp.Ret != 200 {
- fmt.Println("获取开普勒产品库失败", libResp.ErrMsg)
- utils.FileLog.Info("获取开普勒产品库失败", libResp.ErrMsg)
- return
- }
- classifyList, err := models.GetAllBaseFromKplerClassify()
- if err != nil {
- return
- }
- classifyMap := make(map[int]*models.BaseFromKplerClassify)
- for _, v := range classifyList {
- classifyMap[v.ProductId] = v
- }
- // 批量插入分类表中
- // family > group(commodity) > product() > grade
- sort := 0
- for _, v := range libResp.Data {
- id, _ := strconv.Atoi(v.Id)
- if classifyObj, ok := classifyMap[id]; !ok {
- classifyObj = new(models.BaseFromKplerClassify)
- classifyObj.ClassifyName = v.Name
- classifyObj.ClassifyNameEn = v.Name
- classifyObj.ModifyTime = time.Now()
- classifyObj.CreateTime = time.Now()
- classifyObj.Sort = sort
- classifyObj.Level = 1
- classifyObj.ParentId = 0
- classifyObj.SysUserId = 0
- classifyObj.SysUserRealName = ""
- classifyObj.ProductId = id
- classifyObj.ProductName = v.Name
- classifyObj.AncestorFamilyId, _ = strconv.Atoi(v.FamilyId)
- classifyObj.AncestorFamilyName = v.Family
- classifyObj.AncestorGroupId, _ = strconv.Atoi(v.GroupId)
- classifyObj.AncestorGroupName = v.Group
- classifyObj.AncestorProductId, _ = strconv.Atoi(v.ProductId)
- classifyObj.AncestorProductName = v.Product
- classifyObj.AncestorGradeId, _ = strconv.Atoi(v.GradeId)
- classifyObj.AncestorGradeName = v.Grade
- classifyObj.ClassifyType = v.Type
- _, err = classifyObj.Add()
- if err != nil {
- fmt.Println("新增开普勒产品库失败", err)
- utils.FileLog.Info("新增开普勒产品库失败", err)
- }
- classifyMap[id] = classifyObj
- }
- }
-
- for _, v := range classifyMap {
- if v.ParentId == 0 {
- if v.AncestorGradeId > 0 && v.AncestorGradeName != v.ClassifyName {
- v.ParentId = classifyMap[v.AncestorGradeId].ClassifyId
- } else if v.AncestorProductId > 0 && v.AncestorProductName != v.ClassifyName {
- v.ParentId = classifyMap[v.AncestorProductId].ClassifyId
- } else if v.AncestorGroupId > 0 && v.AncestorGroupName != v.ClassifyName {
- v.ParentId = classifyMap[v.AncestorGroupId].ClassifyId
- }
- err = v.Update([]string{"ParentId"})
- if err != nil {
- fmt.Println("更新开普勒产品库失败", err)
- utils.FileLog.Info("更新开普勒产品库失败", err)
- }
- }
- }
-
- fmt.Println("classifyList:", classifyList)
- return
- }
- func InitKplerProductGrade ()(err error) {
- libUrl := "http://127.0.0.1:8915"
-
- classifyList, err := models.GetAllBaseFromKplerClassifyByClassifyType("subgrade4")
- if err != nil {
- return
- }
- for _, c := range classifyList {
- c.ModifyTime = time.Now()
- err = c.Update([]string{"ModifyTime"})
- if err != nil {
- fmt.Println("更新开普勒产品库失败", err)
- utils.FileLog.Info("更新开普勒产品库失败", err)
- }
- // 每个分类都发起分组请求
- libResp, er := getKplerProductLib(libUrl, &models.KplerProductLibReq{
- // AncestorFamilyIds: familyIds,
- // AncestorFamilyNames: familyNames,
- // AncestorGroupIds: groupIds,
- // AncestorGroupNames: groupNames,
- // AncestorProductIds: productIds,
- // AncestorProductNames: productNames,
- AncestorGradeIds: strconv.Itoa(c.ProductId),
- AncestorGradeNames: c.ProductName,
- })
- if er != nil {
- fmt.Println("获取开普勒产品库失败", er)
- utils.FileLog.Info("获取开普勒产品库失败", er)
- continue
- }
- if libResp.Ret != 200 {
- fmt.Println("获取开普勒产品库失败", libResp.ErrMsg)
- utils.FileLog.Info("获取开普勒产品库失败", libResp.ErrMsg)
- continue
- }
- classifyObj := new(models.BaseFromKplerClassify)
- for _, v := range libResp.Data {
- id, _ := strconv.Atoi(v.Id)
- if id == 0 {
- continue
- }
- if id == c.ProductId {
- continue
- }
- // 更新对应的分类等级和父级
- // 查找所有子分类
- classifyItem, er := classifyObj.GetByProductId(id)
- if er != nil {
- err = fmt.Errorf("子分类不存在 Err:%s", er)
- continue
- }
-
- classifyItem.Level = c.Level + 1
- classifyItem.ParentId = c.ClassifyId
- classifyItem.ClassifyType = "subgrade5"
- classifyItem.ModifyTime = time.Now()
- err = classifyItem.Update([]string{"Level", "ParentId", "ClassifyType", "ModifyTime"})
- if err != nil {
- fmt.Println("更新开普勒产品库失败", err)
- utils.FileLog.Info("更新开普勒产品库失败", err)
- }
- }
-
- }
-
- fmt.Println("classifyList:", classifyList)
-
- return
- }
- func InitKplerZone() (err error) {
- libUrl := "http://127.0.0.1:8915"
- libResp, err := getKplerZoneLib(libUrl)
- if err != nil {
- return
- }
- if libResp.Ret != 200 {
- fmt.Println("获取开普勒区域库失败", libResp.ErrMsg)
- utils.FileLog.Info("获取开普勒区域库失败", libResp.ErrMsg)
- return
- }
- // 批量插入区域表中
- zoneMap := make(map[int]*models.BaseFromKplerZone)
- for _, v := range libResp.Data {
- zoneObj := new(models.BaseFromKplerZone)
- descendantId, _ := strconv.Atoi(v.DescendantId)
- ancestorId, _ := strconv.Atoi(v.AncestorId)
- if _, ok := zoneMap[descendantId]; !ok {
- zoneObj.ZoneName = v.DescendantName
- //zoneObj.ZoneType = v.DescendantType
- zoneObj.AncestorId = ancestorId
- zoneObj.AncestorType = v.AncestorType
- zoneObj.AncestorName = v.AncestorName
- zoneObj.DescendantId = descendantId
- zoneObj.DescendantName = v.DescendantName
- zoneObj.ModifyTime = time.Now()
- zoneObj.CreateTime = time.Now()
- _, err = zoneObj.Add()
- if err != nil {
- fmt.Println("新增开普勒区域库失败", err)
- utils.FileLog.Info("新增开普勒区域库失败", err)
- }
- zoneMap[descendantId] = zoneObj
- if _, ok := zoneMap[ancestorId]; !ok {
- zoneObj := new(models.BaseFromKplerZone)
- zoneObj.ZoneName = v.AncestorName
- zoneObj.ZoneType = v.AncestorType
- zoneObj.DescendantId = ancestorId
- zoneObj.DescendantName = v.AncestorName
- zoneObj.ModifyTime = time.Now()
- zoneObj.CreateTime = time.Now()
- _, err = zoneObj.Add()
- if err != nil {
- fmt.Println("新增开普勒区域库失败", err)
- utils.FileLog.Info("新增开普勒区域库失败", err)
- }
- zoneMap[ancestorId] = zoneObj
- }
- }
- }
- return
- }
|