123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881 |
- package services
- import (
- "encoding/json"
- "eta/eta_index_lib/logic"
- "eta/eta_index_lib/models"
- "eta/eta_index_lib/services/alarm_msg"
- "eta/eta_index_lib/utils"
- "fmt"
- "io/ioutil"
- "net/http"
- "strconv"
- "strings"
- "time"
- )
- // GetKplerDataByApi 获取开普勒数据
- func GetKplerDataByApi(params models.KplerSearchEdbReq, terminalCode string) (indexes []*models.KplerIndexItem, apiQueryUrl string, terminalInfo *models.EdbTerminal, err error) {
- terminal, e := GetApiTerminal(utils.DATA_SOURCE_KPLER, terminalCode)
- if e != nil {
- err = fmt.Errorf("获取开普勒终端配置失败, %v", e)
- return
- }
-
- if terminal.ServerUrl == "" {
- err = fmt.Errorf("开普勒终端地址未配置")
- return
- }
-
- // 走API
- if terminal.IsApi == 1 {
- indexes, apiQueryUrl, err = getKplerDataByApi(params, terminal.ServerUrl)
- if err != nil {
- err = fmt.Errorf("获取开普勒指标数据失败, %v", err)
- return
- }
- terminalInfo = terminal
- return
- }
- return
- }
- // getEdbDataFromThsHfHttp API-获取高频指标数据
- func getKplerDataByApi(params models.KplerSearchEdbReq, serverUrl string) (list []*models.KplerIndexItem, apiQueryUrl string, err error) {
- defer func() {
- if err != nil {
- tips := fmt.Sprintf("开普勒指标API-getKplerDataByApi err: %v", err)
- utils.FileLog.Info(tips)
- }
- }()
- // todo 判断指标是否已存在
- productNamesStr := params.ProductNames
- fromZoneNamesStr := params.FromZoneNames
- toZoneNamesStr := params.ToZoneNames
-
- libreq := new(models.KplerFlowDataLibReq)
- libreq.Products = productNamesStr
- libreq.FromZones = fromZoneNamesStr
- libreq.ToZones = toZoneNamesStr
- libreq.Split = params.Split
- libreq.FlowDirection = params.FlowDirection
- libreq.Granularity = params.Granularity
- libreq.Unit = params.Unit
- libreq.WithIntraRegion = "true"
- libreq.WithForecast = "true"
- libreq.OnlyRealized = "false"
- libreq.StartDate = "2025-01-01"
- libreq.EndDate = time.Now().Format(utils.FormatDate)
- // 请求接口
- apiResp, err := getKplerFlowDataLib(serverUrl, libreq)
- if err != nil {
- err = fmt.Errorf("开普勒指标API-getKplerDataByApi err: %v", err)
- return
- }
- if apiResp.Ret != 200 {
- err = fmt.Errorf("开普勒指标API-状态码: %d, 提示信息: %s", apiResp.Ret, apiResp.ErrMsg)
- return
- }
- indexes := make([]*models.KplerIndexItem, 0)
- if len(apiResp.Data) == 0 {
- utils.FileLog.Info("开普勒指标API-无数据")
- return
- }
- indexCodeList := make([]string, 0)
- existIndexCodeMap := make(map[string]bool)
- flowDirection := params.FlowDirection
- prefixIndexName := fmt.Sprintf("kpler%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
- lastIndexName := params.Granularity
- prefixIndexCode := fmt.Sprintf("k%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
- // 获取首字母
- lastIndexCode := strings.ToUpper(params.Granularity[:1])
- // Tables中的每一个对应一个证券代码
- for _, v := range apiResp.Data {
- index := new(models.KplerIndexItem)
- indexName := ""
- indexCode := ""
-
- indexName = fmt.Sprintf("%s%s%s", prefixIndexName, v.SplitItem, lastIndexName)
- indexCode = fmt.Sprintf("%s%s%s", prefixIndexCode,v.SplitItem, lastIndexCode)
-
- indexCodeList = append(indexCodeList, indexCode)
- index.IndexCode = indexCode
- index.IndexName = indexName
- index.Frequency = getKplerFrequency(params.Granularity)
- index.Unit = params.Unit
- index.IndexData = v.IndexData
- indexes = append(indexes, index)
- }
- indexObj := new(models.BaseFromKplerIndex)
- // 查询指标是否存在
- existList, err := indexObj.GetByIndexCodes(indexCodeList)
- if err != nil {
- err = fmt.Errorf("查询指标是否存在失败, %v", err)
- return
- }
- for _, v := range existList {
- existIndexCodeMap[v.IndexCode] = true
- }
- // 过滤已经存在的指标
- list = make([]*models.KplerIndexItem, 0)
- for _, v := range indexes {
- if _, ok := existIndexCodeMap[v.IndexCode]; !ok {
- list = append(list, v)
- }
- }
- return
- }
- func AddKplerIndexByApi(indexList []*models.KplerIndexItem, req *models.KplerSearchEdbReq, ApiQueryUrl string, classifyId int, terminalCode string) (err error) {
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
-
- maxSort := 0
- for _, indexInfo := range indexList {
- indexObj := new(models.BaseFromKplerIndex)
- indexId := 0
- //判断指标是否存在
- var isAdd int
- item, er := indexObj.GetByIndexCode(indexInfo.IndexCode)
- if er != nil {
- if er.Error() == utils.ErrNoRow() {
- isAdd = 1
- err = nil
- } else {
- errMsgList = append(errMsgList, fmt.Sprintf("查询数据源指标库失败 GetByIndexCode Err:%s", er))
- continue
- }
- }
- if item != nil && item.BaseFromKplerIndexId > 0 {
- fmt.Println("item:", item)
- isAdd = 2
- } else {
- isAdd = 1
- }
- // 批量新增指标
- if isAdd == 1 {
- indexObj.IndexCode = indexInfo.IndexCode
- indexObj.IndexName = indexInfo.IndexName
- indexObj.Frequency = indexInfo.Frequency
- indexObj.ClassifyId = int(classifyId)
- indexObj.FromZoneName = req.FromZoneNames
- indexObj.ToZoneName = req.ToZoneNames
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = req.Granularity
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.Sort = maxSort
- indexObj.ApiQueryUrl = ApiQueryUrl
- indexObj.ModifyTime = time.Now()
- indexObj.CreateTime = time.Now()
- indexObj.TerminalCode = terminalCode
- err = indexObj.Add()
- if err != nil {
- err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
- return
- }
- indexId = indexObj.BaseFromKplerIndexId
- maxSort++
- } else if isAdd == 2 {
- indexId = item.BaseFromKplerIndexId
- if item.TerminalCode == `` && terminalCode != `` {
- item.TerminalCode = terminalCode
- err = item.Update([]string{"TerminalCode"})
- if err != nil {
- err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
- return
- }
- }
-
- indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
- indexObj.IndexName = indexInfo.IndexName
- indexObj.Frequency = indexInfo.Frequency
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneName = req.FromZoneNames
- indexObj.ToZoneName = req.ToZoneNames
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = req.Granularity
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.ApiQueryUrl = ApiQueryUrl
- indexObj.ModifyTime = time.Now()
-
- //修改数据
- updateColsArr := make([]string, 0)
- updateColsArr = append(updateColsArr, "index_name")
- updateColsArr = append(updateColsArr, "classify_id")
- updateColsArr = append(updateColsArr, "product_names")
- updateColsArr = append(updateColsArr, "from_zone_name")
- updateColsArr = append(updateColsArr, "to_zone_name")
- updateColsArr = append(updateColsArr, "flow_direction")
- updateColsArr = append(updateColsArr, "granularity")
- updateColsArr = append(updateColsArr, "split")
- updateColsArr = append(updateColsArr, "frequency")
- updateColsArr = append(updateColsArr, "modify_time")
-
- e := indexObj.Update(updateColsArr)
- if e != nil {
- fmt.Println("Index Update Err:" + e.Error())
- return
- }
- }
- dataObj := new(models.BaseFromKplerData)
- exitDataMap := make(map[string]*models.BaseFromKplerData)
- //获取已存在的所有数据
- var exitDataList []*models.BaseFromKplerData
- exitDataList, err = dataObj.GetByIndexCode(indexInfo.IndexCode)
- if err != nil {
- err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
- return
- }
- fmt.Println("exitDataListLen:", len(exitDataList))
- for _, v := range exitDataList {
- dateStr := v.DataTime
- exitDataMap[dateStr] = v
- }
- addDataList := make([]*models.BaseFromKplerData, 0)
-
- // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
- for _, value := range indexInfo.IndexData {
- date := value.DataTime
- value := value.Value
- if findData, ok := exitDataMap[date]; !ok {
- _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- return
- }
- var saveDataTime time.Time
- if strings.Contains(date, "00:00:00") {
- saveDataTime, err = time.Parse(utils.FormatDateTime, date)
- } else {
- saveDataTime, err = time.Parse(utils.FormatDate, date)
- }
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- continue
- }
- timestamp := saveDataTime.UnixNano() / 1e6
-
- dataItem := new(models.BaseFromKplerData)
- dataItem.BaseFromKplerIndexId = int(indexId)
- dataItem.IndexCode = indexInfo.IndexCode
- dataItem.DataTime = date
- dataItem.Value = value
- dataItem.CreateTime = time.Now()
- dataItem.ModifyTime = time.Now()
- dataItem.DataTimestamp = timestamp
- addDataList = append(addDataList, dataItem)
- if len(addDataList) > 500 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- addDataList = make([]*models.BaseFromKplerData, 0)
- }
- } else {
- if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
- // 过滤0.50和0.5的比较
- oldV, _ := strconv.ParseFloat(findData.Value, 64)
- newV, _ := strconv.ParseFloat(value, 64)
- if oldV == newV {
- continue
- }
- dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
- dataObj.Value = value
- dataObj.ModifyTime = time.Now()
-
- updateDataColsArr := make([]string, 0)
- updateDataColsArr = append(updateDataColsArr, "value")
- updateDataColsArr = append(updateDataColsArr, "modify_time")
- dataObj.Update(updateDataColsArr)
- }
- }
- }
-
- if len(addDataList) > 0 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- }
-
- var dateItem *models.EdbInfoMaxAndMinInfo
- dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexInfo.IndexCode)
- if err != nil {
- err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
- return
- }
-
- go func() {
- indexObj.ModifyIndexMaxAndMinDate(indexInfo.IndexCode, dateItem)
- }()
-
- // 同步刷新ETA指标库的指标
- {
- // 获取指标详情
- baseObj := new(models.BaseFromKpler)
- var edbInfo *models.EdbInfo
- edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexInfo.IndexCode)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexInfo.IndexCode, err.Error()))
- return
- } else {
- err = nil
- }
- }
-
- // 已经加入到指标库的话,那么就去更新ETA指标库吧
- if edbInfo != nil {
- go logic.RefreshBaseEdbInfo(edbInfo, ``)
- }
- }
- }
- return
- }
- // HandleKplerIndex 处理Kpler的excel数据
- func HandleKplerIndex(req *models.HandleKplerExcelDataReq) (err error) {
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
- // 查询所有的一级分类
- classifyObj := new(models.BaseFromKplerClassify)
- classifyList, err := classifyObj.GetParentClassify()
- if err != nil {
- err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
- return
- }
- classifyMap := make(map[string]int, 0)
- for _, v := range classifyList {
- classifyMap[v.ClassifyName] = int(v.ClassifyId)
- }
- for _, v := range req.List {
- if v.IndexName == "" || v.IndexCode == "" {
- errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码%s或者指标ID%s为空:", v.IndexCode, v.IndexName))
- continue
- }
- err = handleKplerIndex(v, req.TerminalCode, classifyMap)
- if err != nil {
- errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
- return
- }
- }
- return
- }
- // getKplerFlowDataLib 获取开普勒流数据
- func getKplerFlowDataLib(libUrl string, dataMap *models.KplerFlowDataLibReq) (resp *models.KplerFlowDataLibResp, err error) {
- postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getFlowData")
- postData, err := json.Marshal(dataMap)
- if err != nil {
- return
- }
- result, err := HttpPost(postUrl, string(postData), "application/json")
- if err != nil {
- return
- }
- utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";" + string(postData) + ";result:" + string(result))
- err = json.Unmarshal(result, &resp)
- if err != nil {
- return
- }
- return resp, nil
- }
- func getKplerProductLib(libUrl string) (resp *models.KplerProductLibResp, err error) {
- postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getProductData")
- result, err := HttpPost(postUrl, "", "application/json")
- if err != nil {
- return
- }
- utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
- err = json.Unmarshal(result, &resp)
- if err != nil {
- return
- }
- return resp, nil
- }
- func getKplerZoneLib(libUrl string) (resp *models.KplerZoneLibResp, err error) {
- postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getZoneData")
- result, err := HttpPost(postUrl, "", "application/json")
- if err != nil {
- return
- }
- utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
- err = json.Unmarshal(result, &resp)
- if err != nil {
- return
- }
- return resp, nil
- }
- func handleKplerIndex(req *models.HandleKplerExcelData, terminalCode string, classifyMap map[string]int) (err error) {
- indexName := req.IndexName
- indexCode := req.IndexCode
- excelDataMap := req.ExcelDataMap
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
- indexObj := new(models.BaseFromKplerIndex)
- dataObj := new(models.BaseFromKplerData)
- classifyObj := new(models.BaseFromKplerClassify)
- var indexId int64
- addDataList := make([]*models.BaseFromKplerData, 0)
- exitDataMap := make(map[string]*models.BaseFromKplerData)
- // 修改指标信息
- if indexName == "" {
- utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
- return
- }
- // 判断目录是否存在
- var classifyId int64
- now := time.Now()
- if req.ClassifyName != "" {
- classifyParentId := 0
- level := 1
- classifyParentId, _ = classifyMap[req.ParentClassifyName]
- if classifyParentId > 0 {
- level = 2
- }
- classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
- if err != nil {
- if err.Error() == utils.ErrNoRow() {
- //新增分类
- classifyObj = &models.BaseFromKplerClassify{
- ClassifyName: req.ClassifyName,
- ClassifyNameEn: req.ClassifyName,
- ParentId: classifyParentId,
- SysUserId: 0,
- SysUserRealName: "",
- Level: level,
- Sort: req.ClassifySort,
- ModifyTime: now,
- CreateTime: now,
- }
- classifyId, err = classifyObj.Add()
- if err != nil {
- err = fmt.Errorf("新增分类失败 Err:%s", err)
- return
- }
- classifyObj.ClassifyId = int(classifyId)
- } else {
- return
- }
- } else {
- classifyId = int64(classifyObj.ClassifyId)
- classifyObj.ModifyTime = now
- classifyObj.ParentId = classifyParentId
- e := classifyObj.Update([]string{"ParentId", "ModifyTime"})
- if e != nil {
- fmt.Println("classifyObj Update Err:" + e.Error())
- return
- }
- }
- }
- //判断指标是否存在
- var isAdd int
- item, err := indexObj.GetByIndexCode(indexCode)
- if err != nil {
- if err.Error() == utils.ErrNoRow() {
- isAdd = 1
- err = nil
- } else {
- isAdd = -1
- err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
- return
- }
- }
- if item != nil && item.BaseFromKplerIndexId > 0 {
- fmt.Println("item:", item)
- isAdd = 2
- } else {
- isAdd = 1
- }
- if isAdd == 1 {
- indexObj.IndexCode = indexCode
- indexObj.IndexName = indexName
- indexObj.Frequency = req.Frequency
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneId = req.FromZoneId
- indexObj.FromZoneName = req.FromZoneName
- indexObj.ToZoneId = req.ToZoneId
- indexObj.ToZoneName = req.ToZoneName
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = req.Granularity
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.Sort = req.Sort
- indexObj.ModifyTime = time.Now()
- indexObj.CreateTime = time.Now()
- indexObj.TerminalCode = terminalCode
- err = indexObj.Add()
- if err != nil {
- err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
- return
- }
- indexId = int64(indexObj.BaseFromKplerIndexId)
- } else if isAdd == 2 {
- indexId = int64(item.BaseFromKplerIndexId)
- if item.TerminalCode == `` && terminalCode != `` {
- item.TerminalCode = terminalCode
- err = item.Update([]string{"TerminalCode"})
- if err != nil {
- err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
- return
- }
- }
- indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
- indexObj.IndexName = indexName
- indexObj.Frequency = req.Frequency
- indexObj.ClassifyId = int(classifyId)
- indexObj.ProductNames = req.ProductNames
- indexObj.FromZoneId = req.FromZoneId
- indexObj.FromZoneName = req.FromZoneName
- indexObj.ToZoneId = req.ToZoneId
- indexObj.ToZoneName = req.ToZoneName
- indexObj.FlowDirection = req.FlowDirection
- indexObj.Granularity = req.Granularity
- indexObj.Split = req.Split
- indexObj.Unit = req.Unit
- indexObj.Sort = req.Sort
- indexObj.ModifyTime = time.Now()
- //修改数据
- updateColsArr := make([]string, 0)
- updateColsArr = append(updateColsArr, "index_name")
- updateColsArr = append(updateColsArr, "classify_id")
- updateColsArr = append(updateColsArr, "product_names")
- updateColsArr = append(updateColsArr, "from_zone_id")
- updateColsArr = append(updateColsArr, "from_zone_name")
- updateColsArr = append(updateColsArr, "to_zone_id")
- updateColsArr = append(updateColsArr, "to_zone_name")
- updateColsArr = append(updateColsArr, "flow_direction")
- updateColsArr = append(updateColsArr, "granularity")
- updateColsArr = append(updateColsArr, "split")
- updateColsArr = append(updateColsArr, "frequency")
- updateColsArr = append(updateColsArr, "sort")
- updateColsArr = append(updateColsArr, "modify_time")
- e := indexObj.Update(updateColsArr)
- if e != nil {
- fmt.Println("Index Update Err:" + e.Error())
- return
- }
- }
- //获取已存在的所有数据
- var exitDataList []*models.BaseFromKplerData
- exitDataList, err = dataObj.GetByIndexCode(indexCode)
- if err != nil {
- err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
- return
- }
- fmt.Println("exitDataListLen:", len(exitDataList))
- for _, v := range exitDataList {
- dateStr := v.DataTime
- exitDataMap[dateStr] = v
- }
- // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
- for date, value := range excelDataMap {
- if findData, ok := exitDataMap[date]; !ok {
- _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- return
- }
- var saveDataTime time.Time
- if strings.Contains(date, "00:00:00") {
- saveDataTime, err = time.Parse(utils.FormatDateTime, date)
- } else {
- saveDataTime, err = time.Parse(utils.FormatDate, date)
- }
- if err != nil {
- err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
- continue
- }
- timestamp := saveDataTime.UnixNano() / 1e6
- dataItem := new(models.BaseFromKplerData)
- dataItem.BaseFromKplerIndexId = int(indexId)
- dataItem.IndexCode = indexCode
- dataItem.DataTime = date
- dataItem.Value = value
- dataItem.CreateTime = time.Now()
- dataItem.ModifyTime = time.Now()
- dataItem.DataTimestamp = timestamp
- addDataList = append(addDataList, dataItem)
- if len(addDataList) > 500 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- addDataList = make([]*models.BaseFromKplerData, 0)
- }
- } else {
- if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
- // 过滤0.50和0.5的比较
- oldV, _ := strconv.ParseFloat(findData.Value, 64)
- newV, _ := strconv.ParseFloat(value, 64)
- if oldV == newV {
- continue
- }
- dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
- dataObj.Value = value
- dataObj.ModifyTime = time.Now()
- updateDataColsArr := make([]string, 0)
- updateDataColsArr = append(updateDataColsArr, "value")
- updateDataColsArr = append(updateDataColsArr, "modify_time")
- dataObj.Update(updateDataColsArr)
- }
- }
- }
- if len(addDataList) > 0 {
- err = dataObj.AddMulti(addDataList)
- if err != nil {
- err = fmt.Errorf("批量新增指标失败 Err:%s", err)
- return
- }
- }
- var dateItem *models.EdbInfoMaxAndMinInfo
- dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
- if err != nil {
- err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
- return
- }
- go func() {
- indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
- }()
- // 同步刷新ETA指标库的指标
- {
- // 获取指标详情
- baseObj := new(models.BaseFromKpler)
- var edbInfo *models.EdbInfo
- edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
- return
- } else {
- err = nil
- }
- }
- // 已经加入到指标库的话,那么就去更新ETA指标库吧
- if edbInfo != nil {
- go logic.RefreshBaseEdbInfo(edbInfo, ``)
- }
- }
- return
- }
- func HttpPost(url, postData string, params ...string) ([]byte, error) {
- body := ioutil.NopCloser(strings.NewReader(postData))
- client := &http.Client{}
- req, err := http.NewRequest("POST", url, body)
- if err != nil {
- return nil, err
- }
- contentType := "application/x-www-form-urlencoded;charset=utf-8"
- if len(params) > 0 && params[0] != "" {
- contentType = params[0]
- }
- req.Header.Set("Content-Type", contentType)
- req.Header.Set("authorization", utils.MD5(utils.APP_EDB_DATA_ANALYSIS+utils.EDB_DATA_ANALYSIS_Md5_KEY))
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- b, err := ioutil.ReadAll(resp.Body)
- utils.FileLog.Debug("HttpPost:" + string(b))
- return b, err
- }
- func getKplerFrequency(granularity string) (frequency string) {
- switch granularity {
- case "daily":
- return "日度"
- case "weekly":
- return "周度"
- case "monthly":
- return "月度"
- case "yearly":
- return "年度"
- }
- return ""
- }
- func InitKplerProduct ()(err error) {
- libUrl := "http://127.0.0.1:8915"
- libResp, err := getKplerProductLib(libUrl)
- if err != nil {
- return
- }
- if libResp.Ret != 200 {
- fmt.Println("获取开普勒产品库失败", libResp.ErrMsg)
- utils.FileLog.Info("获取开普勒产品库失败", libResp.ErrMsg)
- return
- }
- classifyList, err := models.GetAllBaseFromKplerClassify()
- if err != nil {
- return
- }
- classifyMap := make(map[int]*models.BaseFromKplerClassify)
- for _, v := range classifyList {
- classifyMap[v.ProductId] = v
- }
- // 批量插入分类表中
- // family > group(commodity) > product() > grade
- sort := 0
- for _, v := range libResp.Data {
- id, _ := strconv.Atoi(v.Id)
- if classifyObj, ok := classifyMap[id]; !ok {
- classifyObj = new(models.BaseFromKplerClassify)
- classifyObj.ClassifyName = v.Name
- classifyObj.ClassifyNameEn = v.Name
- classifyObj.ModifyTime = time.Now()
- classifyObj.CreateTime = time.Now()
- classifyObj.Sort = sort
- classifyObj.Level = 1
- classifyObj.ParentId = 0
- classifyObj.SysUserId = 0
- classifyObj.SysUserRealName = ""
- classifyObj.ProductId = id
- classifyObj.ProductName = v.Name
- classifyObj.AncestorFamilyId, _ = strconv.Atoi(v.FamilyId)
- classifyObj.AncestorFamilyName = v.Family
- classifyObj.AncestorGroupId, _ = strconv.Atoi(v.GroupId)
- classifyObj.AncestorGroupName = v.Group
- classifyObj.AncestorProductId, _ = strconv.Atoi(v.ProductId)
- classifyObj.AncestorProductName = v.Product
- classifyObj.AncestorGradeId, _ = strconv.Atoi(v.GradeId)
- classifyObj.AncestorGradeName = v.Grade
- classifyObj.ClassifyType = v.Type
- _, err = classifyObj.Add()
- if err != nil {
- fmt.Println("新增开普勒产品库失败", err)
- utils.FileLog.Info("新增开普勒产品库失败", err)
- }
- classifyMap[id] = classifyObj
- }
- }
-
- for _, v := range classifyMap {
- if v.ParentId == 0 {
- if v.AncestorGradeId > 0 && v.AncestorGradeName != v.ClassifyName {
- v.ParentId = classifyMap[v.AncestorGradeId].ClassifyId
- } else if v.AncestorProductId > 0 && v.AncestorProductName != v.ClassifyName {
- v.ParentId = classifyMap[v.AncestorProductId].ClassifyId
- } else if v.AncestorGroupId > 0 && v.AncestorGroupName != v.ClassifyName {
- v.ParentId = classifyMap[v.AncestorGroupId].ClassifyId
- }
- err = v.Update([]string{"ParentId"})
- if err != nil {
- fmt.Println("更新开普勒产品库失败", err)
- utils.FileLog.Info("更新开普勒产品库失败", err)
- }
- }
- }
-
- fmt.Println("classifyList:", classifyList)
- return
- }
- func InitKplerZone() (err error) {
- libUrl := "http://127.0.0.1:8915"
- libResp, err := getKplerZoneLib(libUrl)
- if err != nil {
- return
- }
- if libResp.Ret != 200 {
- fmt.Println("获取开普勒区域库失败", libResp.ErrMsg)
- utils.FileLog.Info("获取开普勒区域库失败", libResp.ErrMsg)
- return
- }
- // 批量插入区域表中
- zoneMap := make(map[int]*models.BaseFromKplerZone)
- for _, v := range libResp.Data {
- zoneObj := new(models.BaseFromKplerZone)
- descendantId, _ := strconv.Atoi(v.DescendantId)
- ancestorId, _ := strconv.Atoi(v.AncestorId)
- if _, ok := zoneMap[descendantId]; !ok {
- zoneObj.ZoneName = v.DescendantName
- //zoneObj.ZoneType = v.DescendantType
- zoneObj.AncestorId = ancestorId
- zoneObj.AncestorType = v.AncestorType
- zoneObj.AncestorName = v.AncestorName
- zoneObj.DescendantId = descendantId
- zoneObj.DescendantName = v.DescendantName
- zoneObj.ModifyTime = time.Now()
- zoneObj.CreateTime = time.Now()
- _, err = zoneObj.Add()
- if err != nil {
- fmt.Println("新增开普勒区域库失败", err)
- utils.FileLog.Info("新增开普勒区域库失败", err)
- }
- zoneMap[descendantId] = zoneObj
- if _, ok := zoneMap[ancestorId]; !ok {
- zoneObj := new(models.BaseFromKplerZone)
- zoneObj.ZoneName = v.AncestorName
- zoneObj.ZoneType = v.AncestorType
- zoneObj.DescendantId = ancestorId
- zoneObj.DescendantName = v.AncestorName
- zoneObj.ModifyTime = time.Now()
- zoneObj.CreateTime = time.Now()
- _, err = zoneObj.Add()
- if err != nil {
- fmt.Println("新增开普勒区域库失败", err)
- utils.FileLog.Info("新增开普勒区域库失败", err)
- }
- zoneMap[ancestorId] = zoneObj
- }
- }
- }
- return
- }
|