base_from_kpler.go 33 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_index_lib/logic"
  5. "eta/eta_index_lib/models"
  6. "eta/eta_index_lib/services/alarm_msg"
  7. "eta/eta_index_lib/services/elastic"
  8. "eta/eta_index_lib/utils"
  9. "fmt"
  10. "io/ioutil"
  11. "net/http"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. // GetKplerDataByApi 获取开普勒数据
  17. func GetKplerDataByApi(params models.KplerSearchEdbReq, terminalCode string, isRefresh bool) (indexes []*models.KplerIndexItem, apiQueryUrl string, terminalInfo *models.EdbTerminal, err error) {
  18. terminal, e := GetApiTerminal(utils.DATA_SOURCE_KPLER, terminalCode)
  19. if e != nil {
  20. err = fmt.Errorf("获取开普勒终端配置失败, %v", e)
  21. return
  22. }
  23. if terminal.ServerUrl == "" {
  24. err = fmt.Errorf("开普勒终端地址未配置")
  25. return
  26. }
  27. // 走API
  28. if terminal.IsApi == 1 {
  29. indexes, apiQueryUrl, err = getKplerDataByApi(params, terminal.ServerUrl, isRefresh)
  30. if err != nil {
  31. err = fmt.Errorf("获取开普勒指标数据失败, %v", err)
  32. return
  33. }
  34. terminalInfo = terminal
  35. return
  36. }
  37. return
  38. }
  39. // getEdbDataFromThsHfHttp API-获取高频指标数据
  40. func getKplerDataByApi(params models.KplerSearchEdbReq, serverUrl string, isRefresh bool) (list []*models.KplerIndexItem, apiQueryUrl string, err error) {
  41. defer func() {
  42. if err != nil {
  43. tips := fmt.Sprintf("开普勒指标API-getKplerDataByApi err: %v", err)
  44. utils.FileLog.Info(tips)
  45. }
  46. }()
  47. // todo 判断指标是否已存在
  48. productNamesStr := params.ProductNames
  49. fromZoneNamesStr := params.FromZoneNames
  50. toZoneNamesStr := params.ToZoneNames
  51. granularity := GetKplerGranularity(params.Granularity)
  52. libreq := new(models.KplerFlowDataLibReq)
  53. libreq.Products = productNamesStr
  54. libreq.FromZones = fromZoneNamesStr
  55. libreq.ToZones = toZoneNamesStr
  56. libreq.Split = params.Split
  57. libreq.FlowDirection = params.FlowDirection
  58. libreq.Granularity = granularity
  59. libreq.Unit = params.Unit
  60. libreq.WithIntraRegion = "true"
  61. libreq.WithForecast = "true"
  62. libreq.OnlyRealized = "false"
  63. libreq.StartDate = "2013-01-01"
  64. libreq.EndDate = time.Now().Format(utils.FormatDate)
  65. // 请求接口
  66. apiResp, err := getKplerFlowDataLib(serverUrl, libreq)
  67. if err != nil {
  68. err = fmt.Errorf("开普勒指标API-getKplerDataByApi err: %v", err)
  69. return
  70. }
  71. if apiResp.Ret != 200 {
  72. err = fmt.Errorf("开普勒指标API-状态码: %d, 提示信息: %s", apiResp.Ret, apiResp.ErrMsg)
  73. return
  74. }
  75. indexes := make([]*models.KplerIndexItem, 0)
  76. if len(apiResp.Data.List) == 0 {
  77. utils.FileLog.Info("开普勒指标API-无数据")
  78. return
  79. }
  80. indexCodeList := make([]string, 0)
  81. existIndexCodeMap := make(map[string]bool)
  82. flowDirection := params.FlowDirection
  83. prefixIndexName := fmt.Sprintf("kpler%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
  84. lastIndexName := params.Granularity
  85. prefixIndexCode := fmt.Sprintf("k%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
  86. // 获取首字母
  87. lastIndexCode := strings.ToUpper(params.Granularity[:1])
  88. apiQueryUrl = apiResp.Data.ApiQueryUrl
  89. // Tables中的每一个对应一个证券代码
  90. for _, v := range apiResp.Data.List {
  91. index := new(models.KplerIndexItem)
  92. indexName := ""
  93. indexCode := ""
  94. indexName = fmt.Sprintf("%s%s%s", prefixIndexName, v.SplitItem, lastIndexName)
  95. indexCode = fmt.Sprintf("%s%s%s", prefixIndexCode,v.SplitItem, lastIndexCode)
  96. indexCodeList = append(indexCodeList, indexCode)
  97. index.IndexCode = indexCode
  98. index.IndexName = indexName
  99. index.Frequency = getKplerFrequency(params.Granularity)
  100. index.Unit = params.Unit
  101. index.IndexData = v.IndexData
  102. indexes = append(indexes, index)
  103. }
  104. indexObj := new(models.BaseFromKplerIndex)
  105. // 查询指标是否存在
  106. existList, err := indexObj.GetByIndexCodes(indexCodeList)
  107. if err != nil {
  108. err = fmt.Errorf("查询指标是否存在失败, %v", err)
  109. return
  110. }
  111. for _, v := range existList {
  112. existIndexCodeMap[v.IndexCode] = true
  113. }
  114. // 过滤已经存在的指标, 如果是刷新指标,则无需过滤
  115. list = make([]*models.KplerIndexItem, 0)
  116. for _, v := range indexes {
  117. if _, ok := existIndexCodeMap[v.IndexCode]; !ok || isRefresh {
  118. list = append(list, v)
  119. }
  120. }
  121. return
  122. }
  123. func AddKplerIndexByApi(indexList []*models.KplerIndexItem, req *models.KplerSearchEdbReq, apiQueryUrl string, classifyId int, terminalCode string) (err error) {
  124. errMsgList := make([]string, 0)
  125. defer func() {
  126. if len(errMsgList) > 0 {
  127. msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
  128. utils.FileLog.Info(msg)
  129. go alarm_msg.SendAlarmMsg(msg, 3)
  130. }
  131. }()
  132. maxSort := 0
  133. for _, indexInfo := range indexList {
  134. indexObj := new(models.BaseFromKplerIndex)
  135. indexId := 0
  136. //判断指标是否存在
  137. var isAdd int
  138. item, er := indexObj.GetByIndexCode(indexInfo.IndexCode)
  139. if er != nil {
  140. if er.Error() == utils.ErrNoRow() {
  141. isAdd = 1
  142. err = nil
  143. } else {
  144. errMsgList = append(errMsgList, fmt.Sprintf("查询数据源指标库失败 GetByIndexCode Err:%s", er))
  145. continue
  146. }
  147. }
  148. if item != nil && item.BaseFromKplerIndexId > 0 {
  149. fmt.Println("item:", item)
  150. isAdd = 2
  151. } else {
  152. isAdd = 1
  153. }
  154. fromZoneIdsStr := ""
  155. toZoneIdsStr := ""
  156. if len(req.FromZoneIds) > 0 {
  157. for _, v := range req.FromZoneIds {
  158. fromZoneIdsStr = fmt.Sprintf("%s%d,", fromZoneIdsStr, v)
  159. }
  160. }
  161. if len(req.ToZoneIds) > 0 {
  162. for _, v := range req.ToZoneIds {
  163. toZoneIdsStr = fmt.Sprintf("%s%d,", toZoneIdsStr, v)
  164. }
  165. }
  166. // 批量新增指标
  167. if isAdd == 1 {
  168. indexObj.IndexCode = indexInfo.IndexCode
  169. indexObj.IndexName = indexInfo.IndexName
  170. indexObj.Frequency = indexInfo.Frequency
  171. indexObj.ClassifyId = int(classifyId)
  172. indexObj.ProductNames = req.ProductNames
  173. indexObj.FromZoneId = fromZoneIdsStr
  174. indexObj.ToZoneId = toZoneIdsStr
  175. indexObj.FromZoneName = req.FromZoneNames
  176. indexObj.ToZoneName = req.ToZoneNames
  177. indexObj.FlowDirection = req.FlowDirection
  178. indexObj.Granularity = GetKplerGranularity(req.Granularity)
  179. indexObj.Split = req.Split
  180. indexObj.Unit = req.Unit
  181. indexObj.Sort = maxSort
  182. indexObj.ApiQueryUrl = apiQueryUrl
  183. indexObj.ModifyTime = time.Now()
  184. indexObj.CreateTime = time.Now()
  185. indexObj.TerminalCode = terminalCode
  186. err = indexObj.Add()
  187. if err != nil {
  188. err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
  189. return
  190. }
  191. indexId = indexObj.BaseFromKplerIndexId
  192. maxSort++
  193. } else if isAdd == 2 {
  194. indexId = item.BaseFromKplerIndexId
  195. if item.TerminalCode == `` && terminalCode != `` {
  196. item.TerminalCode = terminalCode
  197. err = item.Update([]string{"TerminalCode"})
  198. if err != nil {
  199. err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
  200. return
  201. }
  202. }
  203. indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
  204. indexObj.IndexName = indexInfo.IndexName
  205. indexObj.Frequency = indexInfo.Frequency
  206. indexObj.ClassifyId = int(classifyId)
  207. indexObj.ProductNames = req.ProductNames
  208. indexObj.FromZoneId = fromZoneIdsStr
  209. indexObj.ToZoneId = toZoneIdsStr
  210. indexObj.FromZoneName = req.FromZoneNames
  211. indexObj.ToZoneName = req.ToZoneNames
  212. indexObj.FlowDirection = req.FlowDirection
  213. indexObj.Granularity = GetKplerGranularity(req.Granularity)
  214. indexObj.Split = req.Split
  215. indexObj.Unit = req.Unit
  216. indexObj.ApiQueryUrl = apiQueryUrl
  217. indexObj.ModifyTime = time.Now()
  218. //修改数据
  219. updateColsArr := make([]string, 0)
  220. updateColsArr = append(updateColsArr, "index_name")
  221. updateColsArr = append(updateColsArr, "classify_id")
  222. updateColsArr = append(updateColsArr, "product_names")
  223. updateColsArr = append(updateColsArr, "from_zone_id")
  224. updateColsArr = append(updateColsArr, "to_zone_id")
  225. updateColsArr = append(updateColsArr, "from_zone_name")
  226. updateColsArr = append(updateColsArr, "to_zone_name")
  227. updateColsArr = append(updateColsArr, "flow_direction")
  228. updateColsArr = append(updateColsArr, "granularity")
  229. updateColsArr = append(updateColsArr, "split")
  230. updateColsArr = append(updateColsArr, "frequency")
  231. updateColsArr = append(updateColsArr, "modify_time")
  232. updateColsArr = append(updateColsArr, "api_query_url")
  233. e := indexObj.Update(updateColsArr)
  234. if e != nil {
  235. fmt.Println("Index Update Err:" + e.Error())
  236. return
  237. }
  238. }
  239. dataObj := new(models.BaseFromKplerData)
  240. exitDataMap := make(map[string]*models.BaseFromKplerData)
  241. //获取已存在的所有数据
  242. var exitDataList []*models.BaseFromKplerData
  243. exitDataList, err = dataObj.GetByIndexCode(indexInfo.IndexCode)
  244. if err != nil {
  245. err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
  246. return
  247. }
  248. fmt.Println("exitDataListLen:", len(exitDataList))
  249. for _, v := range exitDataList {
  250. dateStr := v.DataTime
  251. exitDataMap[dateStr] = v
  252. }
  253. addDataList := make([]*models.BaseFromKplerData, 0)
  254. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  255. for _, value := range indexInfo.IndexData {
  256. date := value.DataTime
  257. value := value.Value
  258. if findData, ok := exitDataMap[date]; !ok {
  259. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  260. if err != nil {
  261. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  262. return
  263. }
  264. var saveDataTime time.Time
  265. if strings.Contains(date, "00:00:00") {
  266. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  267. } else {
  268. saveDataTime, err = time.Parse(utils.FormatDate, date)
  269. }
  270. if err != nil {
  271. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  272. continue
  273. }
  274. timestamp := saveDataTime.UnixNano() / 1e6
  275. dataItem := new(models.BaseFromKplerData)
  276. dataItem.BaseFromKplerIndexId = int(indexId)
  277. dataItem.IndexCode = indexInfo.IndexCode
  278. dataItem.DataTime = date
  279. dataItem.Value = value
  280. dataItem.CreateTime = time.Now()
  281. dataItem.ModifyTime = time.Now()
  282. dataItem.DataTimestamp = timestamp
  283. addDataList = append(addDataList, dataItem)
  284. if len(addDataList) > 500 {
  285. err = dataObj.AddMulti(addDataList)
  286. if err != nil {
  287. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  288. return
  289. }
  290. addDataList = make([]*models.BaseFromKplerData, 0)
  291. }
  292. } else {
  293. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  294. // 过滤0.50和0.5的比较
  295. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  296. newV, _ := strconv.ParseFloat(value, 64)
  297. if oldV == newV {
  298. continue
  299. }
  300. dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
  301. dataObj.Value = value
  302. dataObj.ModifyTime = time.Now()
  303. updateDataColsArr := make([]string, 0)
  304. updateDataColsArr = append(updateDataColsArr, "value")
  305. updateDataColsArr = append(updateDataColsArr, "modify_time")
  306. dataObj.Update(updateDataColsArr)
  307. }
  308. }
  309. }
  310. if len(addDataList) > 0 {
  311. err = dataObj.AddMulti(addDataList)
  312. if err != nil {
  313. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  314. return
  315. }
  316. }
  317. var dateItem *models.EdbInfoMaxAndMinInfo
  318. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexInfo.IndexCode)
  319. if err != nil {
  320. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  321. return
  322. }
  323. go func() {
  324. indexObj.ModifyIndexMaxAndMinDate(indexInfo.IndexCode, dateItem)
  325. UpdateKplerIndexEs(indexInfo.IndexCode)
  326. }()
  327. // 同步刷新ETA指标库的指标
  328. {
  329. // 获取指标详情
  330. baseObj := new(models.BaseFromKpler)
  331. var edbInfo *models.EdbInfo
  332. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexInfo.IndexCode)
  333. if err != nil {
  334. if !utils.IsErrNoRow(err) {
  335. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexInfo.IndexCode, err.Error()))
  336. return
  337. } else {
  338. err = nil
  339. }
  340. }
  341. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  342. if edbInfo != nil {
  343. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  344. }
  345. }
  346. }
  347. return
  348. }
  349. // HandleKplerIndex 处理Kpler的excel数据
  350. func HandleKplerIndex(req *models.HandleKplerExcelDataReq) (err error) {
  351. errMsgList := make([]string, 0)
  352. defer func() {
  353. if len(errMsgList) > 0 {
  354. msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
  355. utils.FileLog.Info(msg)
  356. go alarm_msg.SendAlarmMsg(msg, 3)
  357. }
  358. }()
  359. // 查询所有的一级分类
  360. classifyObj := new(models.BaseFromKplerClassify)
  361. classifyList, err := classifyObj.GetParentClassify()
  362. if err != nil {
  363. err = fmt.Errorf("查询一级目录信息失败 Err:%s", err)
  364. return
  365. }
  366. classifyMap := make(map[string]int, 0)
  367. for _, v := range classifyList {
  368. classifyMap[v.ClassifyName] = int(v.ClassifyId)
  369. }
  370. for _, v := range req.List {
  371. err = handleKplerIndex(v, req.TerminalCode, classifyMap)
  372. if err != nil {
  373. errMsgList = append(errMsgList, fmt.Sprintf("新增指标异常,指标编码:%s, Err: %s", v.IndexCode, err))
  374. return
  375. }
  376. }
  377. return
  378. }
  379. // getKplerFlowDataLib 获取开普勒流数据
  380. func getKplerFlowDataLib(libUrl string, dataMap *models.KplerFlowDataLibReq) (resp *models.KplerFlowDataLibResp, err error) {
  381. postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getFlowData")
  382. postData, err := json.Marshal(dataMap)
  383. if err != nil {
  384. return
  385. }
  386. result, err := HttpPost(postUrl, string(postData), "application/json")
  387. if err != nil {
  388. return
  389. }
  390. utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";" + string(postData) + ";result:" + string(result))
  391. err = json.Unmarshal(result, &resp)
  392. if err != nil {
  393. return
  394. }
  395. return resp, nil
  396. }
  397. func getKplerProductLib(libUrl string, req *models.KplerProductLibReq) (resp *models.KplerProductLibResp, err error) {
  398. postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getProductData")
  399. postData, err := json.Marshal(req)
  400. if err != nil {
  401. return
  402. }
  403. result, err := HttpPost(postUrl, string(postData), "application/json")
  404. if err != nil {
  405. return
  406. }
  407. utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
  408. err = json.Unmarshal(result, &resp)
  409. if err != nil {
  410. return
  411. }
  412. return resp, nil
  413. }
  414. func getKplerZoneLib(libUrl string) (resp *models.KplerZoneLibResp, err error) {
  415. postUrl := fmt.Sprintf("%s%s", libUrl, "/v1/kpler/getZoneData")
  416. result, err := HttpPost(postUrl, "", "application/json")
  417. if err != nil {
  418. return
  419. }
  420. utils.FileLog.Info("postRefreshEdbData:" + postUrl + ";result:" + string(result))
  421. err = json.Unmarshal(result, &resp)
  422. if err != nil {
  423. return
  424. }
  425. return resp, nil
  426. }
  427. func handleKplerIndex(req *models.HandleKplerExcelData, terminalCode string, classifyMap map[string]int) (err error) {
  428. indexName := req.IndexName
  429. indexCode := req.IndexCode
  430. excelDataMap := req.ExcelDataMap
  431. errMsgList := make([]string, 0)
  432. defer func() {
  433. if len(errMsgList) > 0 {
  434. msg := fmt.Sprint("数据源-Kpler数据处理失败,err:", strings.Join(errMsgList, "\n"))
  435. utils.FileLog.Info(msg)
  436. go alarm_msg.SendAlarmMsg(msg, 3)
  437. }
  438. }()
  439. indexObj := new(models.BaseFromKplerIndex)
  440. dataObj := new(models.BaseFromKplerData)
  441. classifyObj := new(models.BaseFromKplerClassify)
  442. var indexId int64
  443. addDataList := make([]*models.BaseFromKplerData, 0)
  444. exitDataMap := make(map[string]*models.BaseFromKplerData)
  445. // 修改指标信息
  446. if indexName == "" {
  447. utils.FileLog.Info("未刷新到指标数据:indexName:" + indexName)
  448. return
  449. }
  450. if req.ClassifyName == "" {
  451. // 获取产品的名称,把产品的第一个名称作为分类ID
  452. productNames := strings.Split(req.ProductNames, ",")
  453. if len(productNames) > 0 {
  454. req.ClassifyName = productNames[0]
  455. }
  456. }
  457. fromZoneNamesStr := req.FromZoneNames
  458. flowDirection := req.FlowDirection
  459. toZoneNamesStr := req.ToZoneNames
  460. productNamesStr := req.ProductNames
  461. // 查询fromZoneIds
  462. zoneObj := new(models.BaseFromKplerZone)
  463. fromZoneNames := strings.Split(fromZoneNamesStr, ",")
  464. fromZoneIdsStr := ""
  465. // 查询 zone
  466. if len(fromZoneNames) > 0 {
  467. fromZoneIds, er := zoneObj.GetIdsByZoneNames(fromZoneNames)
  468. if er != nil {
  469. err = fmt.Errorf("查询区域失败 Err:%s", err)
  470. return
  471. }
  472. for _, v := range fromZoneIds {
  473. fromZoneIdsStr = fmt.Sprintf("%s%d,", fromZoneIdsStr, v)
  474. }
  475. }
  476. // 查询toZoneIds
  477. toZoneNames := strings.Split(toZoneNamesStr, ",")
  478. toZoneIdsStr := ""
  479. if len(toZoneNames) > 0 {
  480. toZoneIds, er := zoneObj.GetIdsByZoneNames(toZoneNames)
  481. if er != nil {
  482. err = fmt.Errorf("查询区域失败 Err:%s", err)
  483. return
  484. }
  485. for _, v := range toZoneIds {
  486. toZoneIdsStr = fmt.Sprintf("%s%d,", toZoneIdsStr, v)
  487. }
  488. }
  489. // 拼接指标编码
  490. prefixIndexCode := fmt.Sprintf("k%s%s%s%s", fromZoneNamesStr, flowDirection, toZoneNamesStr, productNamesStr)
  491. // 获取首字母
  492. lastIndexCode := strings.ToUpper(req.Granularity[:1])
  493. indexCode = fmt.Sprintf("%s%s%s", prefixIndexCode,req.SplitName, lastIndexCode)
  494. // 判断目录是否存在
  495. var classifyId int64
  496. now := time.Now()
  497. if req.ClassifyName != "" {
  498. classifyParentId := 0
  499. level := 1
  500. classifyParentId, _ = classifyMap[req.ParentClassifyName]
  501. if classifyParentId > 0 {
  502. level = 2
  503. }
  504. classifyObj, err = classifyObj.GetByClassifyName(req.ClassifyName)
  505. if err != nil {
  506. if err.Error() == utils.ErrNoRow() {
  507. //新增分类
  508. classifyObj = &models.BaseFromKplerClassify{
  509. ClassifyName: req.ClassifyName,
  510. ClassifyNameEn: req.ClassifyName,
  511. ParentId: classifyParentId,
  512. SysUserId: 0,
  513. SysUserRealName: "",
  514. Level: level,
  515. Sort: req.ClassifySort,
  516. ModifyTime: now,
  517. CreateTime: now,
  518. }
  519. classifyId, err = classifyObj.Add()
  520. if err != nil {
  521. err = fmt.Errorf("新增分类失败 Err:%s", err)
  522. return
  523. }
  524. classifyObj.ClassifyId = int(classifyId)
  525. } else {
  526. return
  527. }
  528. }else {
  529. classifyId = int64(classifyObj.ClassifyId)
  530. }
  531. }
  532. //判断指标是否存在
  533. var isAdd int
  534. item, err := indexObj.GetByIndexCode(indexCode)
  535. if err != nil {
  536. if err.Error() == utils.ErrNoRow() {
  537. isAdd = 1
  538. err = nil
  539. } else {
  540. isAdd = -1
  541. err = fmt.Errorf("查询数据源指标库失败 GetByIndexCode Err:%s", err)
  542. return
  543. }
  544. }
  545. if item != nil && item.BaseFromKplerIndexId > 0 {
  546. fmt.Println("item:", item)
  547. isAdd = 2
  548. } else {
  549. isAdd = 1
  550. }
  551. if isAdd == 1 {
  552. indexObj.IndexCode = indexCode
  553. indexObj.IndexName = indexName
  554. indexObj.Frequency = getKplerFrequency(req.Granularity)
  555. indexObj.ClassifyId = int(classifyId)
  556. indexObj.ProductNames = req.ProductNames
  557. indexObj.FromZoneId = fromZoneIdsStr
  558. indexObj.ToZoneId = toZoneIdsStr
  559. indexObj.FromZoneName = fromZoneNamesStr
  560. indexObj.ToZoneName = toZoneNamesStr
  561. indexObj.FlowDirection = req.FlowDirection
  562. indexObj.Granularity = req.Granularity
  563. indexObj.Split = req.Split
  564. indexObj.Unit = req.Unit
  565. indexObj.Sort = req.Sort
  566. indexObj.ExcelQueryUrl = req.ExcelQueryUrl
  567. indexObj.ModifyTime = time.Now()
  568. indexObj.CreateTime = time.Now()
  569. indexObj.TerminalCode = terminalCode
  570. err = indexObj.Add()
  571. if err != nil {
  572. err = fmt.Errorf("数据源新增Kpler指标失败 Err:%s", err)
  573. return
  574. }
  575. indexId = int64(indexObj.BaseFromKplerIndexId)
  576. } else if isAdd == 2 {
  577. indexId = int64(item.BaseFromKplerIndexId)
  578. if item.TerminalCode == `` && terminalCode != `` {
  579. item.TerminalCode = terminalCode
  580. err = item.Update([]string{"TerminalCode"})
  581. if err != nil {
  582. err = fmt.Errorf("数据源更新Kpler指标失败 Err:%s", err)
  583. return
  584. }
  585. }
  586. indexObj.BaseFromKplerIndexId = item.BaseFromKplerIndexId
  587. indexObj.IndexName = indexName
  588. indexObj.Frequency = getKplerFrequency(req.Granularity)
  589. indexObj.ClassifyId = int(classifyId)
  590. indexObj.ProductNames = req.ProductNames
  591. indexObj.FromZoneId = fromZoneIdsStr
  592. indexObj.ToZoneId = toZoneIdsStr
  593. indexObj.FromZoneName = fromZoneNamesStr
  594. indexObj.ToZoneName = toZoneNamesStr
  595. indexObj.FlowDirection = req.FlowDirection
  596. indexObj.Granularity = req.Granularity
  597. indexObj.Split = req.Split
  598. indexObj.ExcelQueryUrl = req.ExcelQueryUrl
  599. indexObj.Unit = req.Unit
  600. indexObj.Sort = req.Sort
  601. indexObj.ModifyTime = time.Now()
  602. //修改数据
  603. updateColsArr := make([]string, 0)
  604. updateColsArr = append(updateColsArr, "index_name")
  605. updateColsArr = append(updateColsArr, "classify_id")
  606. updateColsArr = append(updateColsArr, "product_names")
  607. updateColsArr = append(updateColsArr, "from_zone_id")
  608. updateColsArr = append(updateColsArr, "from_zone_name")
  609. updateColsArr = append(updateColsArr, "to_zone_id")
  610. updateColsArr = append(updateColsArr, "to_zone_name")
  611. updateColsArr = append(updateColsArr, "flow_direction")
  612. updateColsArr = append(updateColsArr, "granularity")
  613. updateColsArr = append(updateColsArr, "split")
  614. updateColsArr = append(updateColsArr, "frequency")
  615. updateColsArr = append(updateColsArr, "sort")
  616. updateColsArr = append(updateColsArr, "modify_time")
  617. e := indexObj.Update(updateColsArr)
  618. if e != nil {
  619. fmt.Println("Index Update Err:" + e.Error())
  620. return
  621. }
  622. }
  623. //获取已存在的所有数据
  624. var exitDataList []*models.BaseFromKplerData
  625. exitDataList, err = dataObj.GetByIndexCode(indexCode)
  626. if err != nil {
  627. err = fmt.Errorf("数据源查询Kpler指标数据失败 Err:%s", err)
  628. return
  629. }
  630. fmt.Println("exitDataListLen:", len(exitDataList))
  631. for _, v := range exitDataList {
  632. dateStr := v.DataTime
  633. exitDataMap[dateStr] = v
  634. }
  635. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  636. for date, value := range excelDataMap {
  637. if findData, ok := exitDataMap[date]; !ok {
  638. _, err = time.ParseInLocation(utils.FormatDate, date, time.Local)
  639. if err != nil {
  640. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  641. return
  642. }
  643. var saveDataTime time.Time
  644. if strings.Contains(date, "00:00:00") {
  645. saveDataTime, err = time.Parse(utils.FormatDateTime, date)
  646. } else {
  647. saveDataTime, err = time.Parse(utils.FormatDate, date)
  648. }
  649. if err != nil {
  650. err = fmt.Errorf("%s 转换日期格式失败 Err:%s", date, err)
  651. continue
  652. }
  653. timestamp := saveDataTime.UnixNano() / 1e6
  654. dataItem := new(models.BaseFromKplerData)
  655. dataItem.BaseFromKplerIndexId = int(indexId)
  656. dataItem.IndexCode = indexCode
  657. dataItem.DataTime = date
  658. dataItem.Value = value
  659. dataItem.CreateTime = time.Now()
  660. dataItem.ModifyTime = time.Now()
  661. dataItem.DataTimestamp = timestamp
  662. addDataList = append(addDataList, dataItem)
  663. if len(addDataList) > 500 {
  664. err = dataObj.AddMulti(addDataList)
  665. if err != nil {
  666. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  667. return
  668. }
  669. addDataList = make([]*models.BaseFromKplerData, 0)
  670. }
  671. } else {
  672. if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  673. // 过滤0.50和0.5的比较
  674. oldV, _ := strconv.ParseFloat(findData.Value, 64)
  675. newV, _ := strconv.ParseFloat(value, 64)
  676. if oldV == newV {
  677. continue
  678. }
  679. dataObj.BaseFromKplerIndexId = findData.BaseFromKplerIndexId
  680. dataObj.Value = value
  681. dataObj.ModifyTime = time.Now()
  682. updateDataColsArr := make([]string, 0)
  683. updateDataColsArr = append(updateDataColsArr, "value")
  684. updateDataColsArr = append(updateDataColsArr, "modify_time")
  685. dataObj.Update(updateDataColsArr)
  686. }
  687. }
  688. }
  689. if len(addDataList) > 0 {
  690. err = dataObj.AddMulti(addDataList)
  691. if err != nil {
  692. err = fmt.Errorf("批量新增指标失败 Err:%s", err)
  693. return
  694. }
  695. }
  696. var dateItem *models.EdbInfoMaxAndMinInfo
  697. dateItem, err = dataObj.GetMaxAndMinDateByIndexCode(indexCode)
  698. if err != nil {
  699. err = fmt.Errorf("查询指标最新日期失败 Err:%s", err)
  700. return
  701. }
  702. go func() {
  703. indexObj.ModifyIndexMaxAndMinDate(indexCode, dateItem)
  704. UpdateKplerIndexEs(indexCode)
  705. }()
  706. // 同步刷新ETA指标库的指标
  707. {
  708. // 获取指标详情
  709. baseObj := new(models.BaseFromKpler)
  710. var edbInfo *models.EdbInfo
  711. edbInfo, err = models.GetEdbInfoByEdbCode(baseObj.GetSource(), indexCode)
  712. if err != nil {
  713. if !utils.IsErrNoRow(err) {
  714. errMsgList = append(errMsgList, fmt.Sprint("刷新ETA指标异常,指标编码:", indexCode, err.Error()))
  715. return
  716. } else {
  717. err = nil
  718. }
  719. }
  720. // 已经加入到指标库的话,那么就去更新ETA指标库吧
  721. if edbInfo != nil {
  722. go logic.RefreshBaseEdbInfo(edbInfo, ``)
  723. }
  724. }
  725. return
  726. }
  727. func HttpPost(url, postData string, params ...string) ([]byte, error) {
  728. body := ioutil.NopCloser(strings.NewReader(postData))
  729. client := &http.Client{}
  730. req, err := http.NewRequest("POST", url, body)
  731. if err != nil {
  732. return nil, err
  733. }
  734. contentType := "application/x-www-form-urlencoded;charset=utf-8"
  735. if len(params) > 0 && params[0] != "" {
  736. contentType = params[0]
  737. }
  738. req.Header.Set("Content-Type", contentType)
  739. req.Header.Set("authorization", utils.MD5(utils.APP_EDB_DATA_ANALYSIS+utils.EDB_DATA_ANALYSIS_Md5_KEY))
  740. resp, err := client.Do(req)
  741. if err != nil {
  742. return nil, err
  743. }
  744. defer resp.Body.Close()
  745. b, err := ioutil.ReadAll(resp.Body)
  746. utils.FileLog.Debug("HttpPost:" + string(b))
  747. return b, err
  748. }
  749. func getKplerFrequency(granularity string) (frequency string) {
  750. switch granularity {
  751. case "daily", "Daily", "days":
  752. return "日度"
  753. case "weekly", "Weekly", "weeks":
  754. return "周度"
  755. case "monthly", "Monthly", "months":
  756. return "月度"
  757. case "yearly", "Yearly", "years":
  758. return "年度"
  759. }
  760. return ""
  761. }
  762. func GetKplerGranularity(frequency string) (granularity string) {
  763. switch frequency {
  764. case "daily", "Daily", "days":
  765. return "days"
  766. case "weekly", "Weekly", "weeks":
  767. return "weeks"
  768. case "monthly", "Monthly", "months":
  769. return "months"
  770. case "yearly", "Yearly", "years":
  771. return "years"
  772. case "quarterly", "Quarters", "quarters":
  773. return "quarters"
  774. }
  775. return ""
  776. }
  777. func InitKplerProduct ()(err error) {
  778. libUrl := "http://127.0.0.1:8915"
  779. libResp, err := getKplerProductLib(libUrl, &models.KplerProductLibReq{
  780. AncestorFamilyIds: "",
  781. AncestorFamilyNames: "",
  782. AncestorGroupIds: "",
  783. AncestorGroupNames: "",
  784. AncestorProductIds: "",
  785. AncestorProductNames: "",
  786. AncestorGradeIds: "",
  787. AncestorGradeNames: "",
  788. Products: "",
  789. ProductIds: "",
  790. })
  791. if err != nil {
  792. return
  793. }
  794. if libResp.Ret != 200 {
  795. fmt.Println("获取开普勒产品库失败", libResp.ErrMsg)
  796. utils.FileLog.Info("获取开普勒产品库失败", libResp.ErrMsg)
  797. return
  798. }
  799. classifyList, err := models.GetAllBaseFromKplerClassify()
  800. if err != nil {
  801. return
  802. }
  803. classifyMap := make(map[int]*models.BaseFromKplerClassify)
  804. for _, v := range classifyList {
  805. classifyMap[v.ProductId] = v
  806. }
  807. // 批量插入分类表中
  808. // family > group(commodity) > product() > grade
  809. sort := 0
  810. for _, v := range libResp.Data {
  811. id, _ := strconv.Atoi(v.Id)
  812. if classifyObj, ok := classifyMap[id]; !ok {
  813. classifyObj = new(models.BaseFromKplerClassify)
  814. classifyObj.ClassifyName = v.Name
  815. classifyObj.ClassifyNameEn = v.Name
  816. classifyObj.ModifyTime = time.Now()
  817. classifyObj.CreateTime = time.Now()
  818. classifyObj.Sort = sort
  819. classifyObj.Level = 1
  820. classifyObj.ParentId = 0
  821. classifyObj.SysUserId = 0
  822. classifyObj.SysUserRealName = ""
  823. classifyObj.ProductId = id
  824. classifyObj.ProductName = v.Name
  825. classifyObj.AncestorFamilyId, _ = strconv.Atoi(v.FamilyId)
  826. classifyObj.AncestorFamilyName = v.Family
  827. classifyObj.AncestorGroupId, _ = strconv.Atoi(v.GroupId)
  828. classifyObj.AncestorGroupName = v.Group
  829. classifyObj.AncestorProductId, _ = strconv.Atoi(v.ProductId)
  830. classifyObj.AncestorProductName = v.Product
  831. classifyObj.AncestorGradeId, _ = strconv.Atoi(v.GradeId)
  832. classifyObj.AncestorGradeName = v.Grade
  833. classifyObj.ClassifyType = v.Type
  834. _, err = classifyObj.Add()
  835. if err != nil {
  836. fmt.Println("新增开普勒产品库失败", err)
  837. utils.FileLog.Info("新增开普勒产品库失败", err)
  838. }
  839. classifyMap[id] = classifyObj
  840. }
  841. }
  842. for _, v := range classifyMap {
  843. if v.ParentId == 0 {
  844. if v.AncestorGradeId > 0 && v.AncestorGradeName != v.ClassifyName {
  845. v.ParentId = classifyMap[v.AncestorGradeId].ClassifyId
  846. } else if v.AncestorProductId > 0 && v.AncestorProductName != v.ClassifyName {
  847. v.ParentId = classifyMap[v.AncestorProductId].ClassifyId
  848. } else if v.AncestorGroupId > 0 && v.AncestorGroupName != v.ClassifyName {
  849. v.ParentId = classifyMap[v.AncestorGroupId].ClassifyId
  850. }
  851. err = v.Update([]string{"ParentId"})
  852. if err != nil {
  853. fmt.Println("更新开普勒产品库失败", err)
  854. utils.FileLog.Info("更新开普勒产品库失败", err)
  855. }
  856. }
  857. }
  858. fmt.Println("classifyList:", classifyList)
  859. return
  860. }
  861. func InitKplerProductGrade ()(err error) {
  862. libUrl := "http://127.0.0.1:8915"
  863. classifyList, err := models.GetAllBaseFromKplerClassifyByClassifyType("subgrade4")
  864. if err != nil {
  865. return
  866. }
  867. for _, c := range classifyList {
  868. c.ModifyTime = time.Now()
  869. err = c.Update([]string{"ModifyTime"})
  870. if err != nil {
  871. fmt.Println("更新开普勒产品库失败", err)
  872. utils.FileLog.Info("更新开普勒产品库失败", err)
  873. }
  874. // 每个分类都发起分组请求
  875. libResp, er := getKplerProductLib(libUrl, &models.KplerProductLibReq{
  876. // AncestorFamilyIds: familyIds,
  877. // AncestorFamilyNames: familyNames,
  878. // AncestorGroupIds: groupIds,
  879. // AncestorGroupNames: groupNames,
  880. // AncestorProductIds: productIds,
  881. // AncestorProductNames: productNames,
  882. AncestorGradeIds: strconv.Itoa(c.ProductId),
  883. AncestorGradeNames: c.ProductName,
  884. })
  885. if er != nil {
  886. fmt.Println("获取开普勒产品库失败", er)
  887. utils.FileLog.Info("获取开普勒产品库失败", er)
  888. continue
  889. }
  890. if libResp.Ret != 200 {
  891. fmt.Println("获取开普勒产品库失败", libResp.ErrMsg)
  892. utils.FileLog.Info("获取开普勒产品库失败", libResp.ErrMsg)
  893. continue
  894. }
  895. classifyObj := new(models.BaseFromKplerClassify)
  896. for _, v := range libResp.Data {
  897. id, _ := strconv.Atoi(v.Id)
  898. if id == 0 {
  899. continue
  900. }
  901. if id == c.ProductId {
  902. continue
  903. }
  904. // 更新对应的分类等级和父级
  905. // 查找所有子分类
  906. classifyItem, er := classifyObj.GetByProductId(id)
  907. if er != nil {
  908. err = fmt.Errorf("子分类不存在 Err:%s", er)
  909. continue
  910. }
  911. classifyItem.Level = c.Level + 1
  912. classifyItem.ParentId = c.ClassifyId
  913. classifyItem.ClassifyType = "subgrade5"
  914. classifyItem.ModifyTime = time.Now()
  915. err = classifyItem.Update([]string{"Level", "ParentId", "ClassifyType", "ModifyTime"})
  916. if err != nil {
  917. fmt.Println("更新开普勒产品库失败", err)
  918. utils.FileLog.Info("更新开普勒产品库失败", err)
  919. }
  920. }
  921. }
  922. fmt.Println("classifyList:", classifyList)
  923. return
  924. }
  925. func InitKplerZone() (err error) {
  926. libUrl := "http://127.0.0.1:8915"
  927. libResp, err := getKplerZoneLib(libUrl)
  928. if err != nil {
  929. return
  930. }
  931. if libResp.Ret != 200 {
  932. fmt.Println("获取开普勒区域库失败", libResp.ErrMsg)
  933. utils.FileLog.Info("获取开普勒区域库失败", libResp.ErrMsg)
  934. return
  935. }
  936. // 批量插入区域表中
  937. zoneMap := make(map[int]*models.BaseFromKplerZone)
  938. for _, v := range libResp.Data {
  939. zoneObj := new(models.BaseFromKplerZone)
  940. descendantId, _ := strconv.Atoi(v.DescendantId)
  941. ancestorId, _ := strconv.Atoi(v.AncestorId)
  942. if _, ok := zoneMap[descendantId]; !ok {
  943. zoneObj.ZoneName = v.DescendantName
  944. //zoneObj.ZoneType = v.DescendantType
  945. zoneObj.AncestorId = ancestorId
  946. zoneObj.AncestorType = v.AncestorType
  947. zoneObj.AncestorName = v.AncestorName
  948. zoneObj.DescendantId = descendantId
  949. zoneObj.DescendantName = v.DescendantName
  950. zoneObj.ModifyTime = time.Now()
  951. zoneObj.CreateTime = time.Now()
  952. _, err = zoneObj.Add()
  953. if err != nil {
  954. fmt.Println("新增开普勒区域库失败", err)
  955. utils.FileLog.Info("新增开普勒区域库失败", err)
  956. }
  957. zoneMap[descendantId] = zoneObj
  958. // if _, ok := zoneMap[ancestorId]; !ok {
  959. // zoneObj := new(models.BaseFromKplerZone)
  960. // zoneObj.ZoneName = v.AncestorName
  961. // zoneObj.ZoneType = v.AncestorType
  962. // zoneObj.DescendantId = ancestorId
  963. // zoneObj.DescendantName = v.AncestorName
  964. // zoneObj.ModifyTime = time.Now()
  965. // zoneObj.CreateTime = time.Now()
  966. // _, err = zoneObj.Add()
  967. // if err != nil {
  968. // fmt.Println("新增开普勒区域库失败", err)
  969. // utils.FileLog.Info("新增开普勒区域库失败", err)
  970. // }
  971. // zoneMap[ancestorId] = zoneObj
  972. // }
  973. }
  974. }
  975. return
  976. }
  977. // UpdateKplerIndexEs 更新Kpler指标Es
  978. func UpdateKplerIndexEs(indexCode string) (err error) {
  979. defer func() {
  980. if err != nil {
  981. tips := fmt.Sprintf("UpdateKplerIndexEs, IndexCode: %s, errMsg: %v", indexCode, err)
  982. fmt.Println(tips)
  983. utils.FileLog.Info(tips)
  984. }
  985. }()
  986. indexObj := new(models.BaseFromKplerIndex)
  987. item, e := indexObj.GetByIndexCode(indexCode)
  988. if e != nil {
  989. if utils.IsErrNoRow(e) {
  990. err = fmt.Errorf("指标不存在, IndexCode: %s", indexCode)
  991. return
  992. }
  993. }
  994. if item.BaseFromKplerIndexId <= 0 {
  995. err = fmt.Errorf("指标不存在, IndexCode: %s", indexCode)
  996. return
  997. }
  998. indexItem := new(models.SearchDataSource)
  999. indexItem.PrimaryId = item.BaseFromKplerIndexId
  1000. indexItem.IndexCode = item.IndexCode
  1001. indexItem.IndexName = item.IndexName
  1002. indexItem.ClassifyId = item.ClassifyId
  1003. indexItem.Unit = item.Unit
  1004. indexItem.Frequency = item.Frequency
  1005. indexItem.StartDate = item.StartDate
  1006. indexItem.EndDate = item.EndDate
  1007. indexItem.LatestValue = fmt.Sprint(item.EndValue)
  1008. indexItem.Source = utils.DATA_SOURCE_KPLER
  1009. indexItem.SourceName = utils.DATA_SOURCE_NAME_KPLER
  1010. indexItem.IsDeleted = 0
  1011. indexItem.CreateTime = item.CreateTime.Format(utils.FormatDateTime)
  1012. indexItem.ModifyTime = item.ModifyTime.Format(utils.FormatDateTime)
  1013. docId := fmt.Sprintf("%d-%d", utils.DATA_SOURCE_KPLER, item.BaseFromKplerIndexId)
  1014. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  1015. err = fmt.Errorf("指标写入ES失败, %v", e)
  1016. return
  1017. }
  1018. return
  1019. }