gn.go 17 KB


  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models/data_manage"
  6. "eta_gn/eta_task/services/alarm_msg"
  7. "eta_gn/eta_task/services/data"
  8. "eta_gn/eta_task/utils"
  9. "fmt"
  10. "github.com/rdlucklib/rdluck_tools/paging"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // 同步指标信息锁
  18. var lockSyncGnIndex sync.Mutex
  19. const GnEdbListUri = `/index_data/gn/edb/list` // 国能指标列表接口
  20. // CurrLevelParentClassifyMap 当前层级分类map
  21. var CurrLevelParentClassifyMap map[int64]map[int64]map[string]CurrClassify
  22. // CurrEdbInfoMap 当前库里已有的指标map
  23. var CurrEdbInfoMap map[string]*data_manage.EdbInfo
  24. type CurrClassify struct {
  25. ClassifyId int64
  26. ParentId int64
  27. ClassifyName string
  28. }
  29. // SyncGnIndex
  30. // @Description: 定时同步指标信息
  31. // @author: Roc
  32. // @datetime 2024-03-07 17:39:34
  33. // @param cont context.Context
  34. // @return err error
  35. func SyncGnIndex(cont context.Context) (err error) {
  36. fmt.Println("开始同步指标")
  37. lockSyncGnIndex.Lock()
  38. errMsgList := make([]string, 0)
  39. defer func() {
  40. if err != nil {
  41. tips := "SyncGnIndex-定时同步国能的指标信息到ETA失败, ErrMsg:\n" + err.Error()
  42. utils.FileLog.Info(tips)
  43. go alarm_msg.SendAlarmMsg(tips, 3)
  44. }
  45. if len(errMsgList) > 0 {
  46. tips := "SyncGnIndex-定时同步国能的指标信息到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  47. utils.FileLog.Info(tips)
  48. go alarm_msg.SendAlarmMsg(tips, 3)
  49. }
  50. lockSyncGnIndex.Unlock()
  51. }()
  52. initCurrEdbInfoMap()
  53. initCurrLevelParentClassifyMap()
  54. var lastUpdateTimeStr string // 上一次更新的时间
  55. err, errMsgList = syncGnIndex(1, utils.SyncCrmIndexNum, lastUpdateTimeStr)
  56. return
  57. }
  58. // initCurrLevelParentClassifyMap
  59. // @Description: 初始化当前层级分类map
  60. func initCurrLevelParentClassifyMap() {
  61. var condition string
  62. var pars []interface{}
  63. // 普通指标分类
  64. condition = " AND classify_type = ? "
  65. pars = append(pars, 0)
  66. classifyList, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars)
  67. if err != nil {
  68. utils.FileLog.Error("获取分类列表数据失败:" + err.Error())
  69. return
  70. }
  71. // 清空缓存
  72. CurrLevelParentClassifyMap = make(map[int64]map[int64]map[string]CurrClassify)
  73. for _, v := range classifyList {
  74. currParentClassifyMap, ok := CurrLevelParentClassifyMap[v.Level]
  75. if !ok {
  76. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  77. }
  78. currClassifyMap, ok := currParentClassifyMap[v.ParentID]
  79. if !ok {
  80. currClassifyMap = make(map[string]CurrClassify)
  81. }
  82. classifyName := strings.TrimSpace(v.ClassifyName)
  83. currClassifyMap[classifyName] = CurrClassify{
  84. ClassifyId: v.ClassifyID,
  85. ParentId: v.ParentID,
  86. ClassifyName: classifyName,
  87. }
  88. currParentClassifyMap[v.ParentID] = currClassifyMap
  89. CurrLevelParentClassifyMap[v.Level] = currParentClassifyMap
  90. }
  91. }
  92. // initCurrEdbInfoMap
  93. // @Description: 初始化当前指标map
  94. func initCurrEdbInfoMap() {
  95. // 获取指标列表
  96. edbInfoList, err := data_manage.GetAllBaseEdbInfo()
  97. if err != nil {
  98. utils.FileLog.Error("获取指标列表数据失败:" + err.Error())
  99. return
  100. }
  101. // 清空缓存
  102. CurrEdbInfoMap = make(map[string]*data_manage.EdbInfo)
  103. for _, v := range edbInfoList {
  104. CurrEdbInfoMap[v.OriginalEdbCode] = v
  105. }
  106. }
  107. // EtaBridgeGnIndexListResp
  108. // @Description: 指标列表返回数据
  109. type EtaBridgeGnIndexListResp struct {
  110. Code int `json:"code" description:"状态码"`
  111. Msg string `json:"msg" description:"提示信息"`
  112. Data IndexListResp `json:"data" description:"返回数据"`
  113. }
  114. // IndexListResp
  115. // @Description: 指标列表数据
  116. type IndexListResp struct {
  117. Paging paging.PagingItem `description:"分页数据"`
  118. List []IndexInfo
  119. }
  120. // IndexInfo
  121. // @Description: 指标信息
  122. type IndexInfo struct {
  123. ClassifyNameOne string `description:"一级目录"`
  124. ClassifyNameTwo string `description:"二级目录"`
  125. ClassifyNameThree string `description:"三级目录"`
  126. DataIndexCode string `description:"数据节点指标编码"`
  127. SourceEdbCode string `description:"数据源指标原始编码"`
  128. EdbName string `description:"指标名称"`
  129. Frequency string `description:"频度"`
  130. Unit string `description:"单位"`
  131. SourceName string `description:"来源"`
  132. }
  133. // BridgeGnIndexParams
  134. // @Description: 桥接服务-获取国能指标数据入参
  135. type BridgeGnIndexParams struct {
  136. LastModifyTime string `json:"last_modify_time" description:"最近一次更新时间"`
  137. PageIndex int `json:"page_index" description:"当前页码"`
  138. PageSize int `json:"page_size" description:"每页数量"`
  139. }
  140. // syncCrmIndex
  141. // @Description: 开始同步CRM指标信息
  142. // @author: Roc
  143. // @datetime 2024-05-17 15:55:11
  144. // @param assetPkgCd string
  145. // @param currIndex int
  146. // @param pageSize int
  147. // @param lastUpdateTimeStr string
  148. // @return err error
  149. // @return errMsgList []string
  150. func syncGnIndex(currIndex, pageSize int, baseLastUpdateTimeStr string) (err error, errMsgList []string) {
  151. fmt.Println("开始第", currIndex, "页的更新")
  152. errMsgList = make([]string, 0)
  153. lastUpdateTimeStr := baseLastUpdateTimeStr
  154. if lastUpdateTimeStr != `` {
  155. lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr)
  156. }
  157. params := BridgeGnIndexParams{
  158. LastModifyTime: lastUpdateTimeStr,
  159. PageIndex: currIndex,
  160. PageSize: pageSize,
  161. }
  162. bResult, err, _ := HttpEtaBridgePost(utils.SyncIndexPath, params)
  163. if err != nil {
  164. return
  165. }
  166. var result EtaBridgeGnIndexListResp
  167. err = json.Unmarshal(bResult, &result)
  168. if err != nil {
  169. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult))
  170. utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult))
  171. return
  172. }
  173. //totalPage := result.Data.Paging.Pages
  174. // 处理指标信息
  175. for _, v := range result.Data.List {
  176. tmpErr := handleIndex(v)
  177. if tmpErr != nil {
  178. errMsgList = append(errMsgList, tmpErr.Error())
  179. }
  180. }
  181. fmt.Println(currIndex, "是否已结束:", result.Data.Paging.IsEnd)
  182. // 如果还有下一页,那么就继续请求下一页
  183. if !result.Data.Paging.IsEnd {
  184. _, tmpErrMsgList := syncGnIndex(currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr)
  185. errMsgList = append(errMsgList, tmpErrMsgList...)
  186. }
  187. return
  188. }
  189. // handleIndex
  190. // @Description: 指标处理
  191. // @param index
  192. // @return err
  193. func handleIndex(index IndexInfo) (err error) {
  194. // 处理分类(如果不存在就创建)
  195. _, _, thirdClassifyId, err := handleClassify(index)
  196. if err != nil {
  197. return
  198. }
  199. // 处理指标(如果不存在就创建)
  200. err = handleEdbInfo(index, thirdClassifyId)
  201. return
  202. }
  203. // handleClassify
  204. // @Description: 分类处理
  205. // @param index
  206. // @return firstClassifyId
  207. // @return secondClassifyId
  208. // @return thirdClassifyId
  209. // @return err
  210. func handleClassify(index IndexInfo) (firstClassifyId, secondClassifyId, thirdClassifyId int64, err error) {
  211. firstClassifyName := strings.TrimSpace(index.ClassifyNameOne)
  212. secondClassifyName := strings.TrimSpace(index.ClassifyNameTwo)
  213. thirdClassifyName := strings.TrimSpace(index.ClassifyNameThree)
  214. var oneLevel, twoLevel, threeLevel int64
  215. oneLevel = 1
  216. twoLevel = 2
  217. threeLevel = 3
  218. // 一级分类
  219. {
  220. var parentId int64
  221. parentId = 0
  222. classifyName := firstClassifyName
  223. level := oneLevel
  224. // 获取层级下的父级分类map
  225. currParentClassifyMap, ok := CurrLevelParentClassifyMap[level]
  226. if !ok {
  227. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  228. }
  229. // 获取父级id下的分类列表
  230. currClassifyListMap, ok := currParentClassifyMap[parentId]
  231. if !ok {
  232. currClassifyListMap = make(map[string]CurrClassify)
  233. }
  234. // 根据分类名称获取分类
  235. currClassify, ok := currClassifyListMap[classifyName]
  236. if !ok {
  237. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  238. classifyInfo := &data_manage.EdbClassify{
  239. //ClassifyId: 0,
  240. ClassifyType: 0,
  241. ClassifyName: classifyName,
  242. ClassifyNameEn: classifyName,
  243. ParentID: parentId,
  244. RootID: 0,
  245. HasData: 0,
  246. CreateTime: time.Now(),
  247. ModifyTime: time.Now(),
  248. SysUserID: 0,
  249. SysUserRealName: "",
  250. Level: level,
  251. UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)),
  252. Sort: 0,
  253. }
  254. err = data_manage.AddEdbClassify(classifyInfo)
  255. if err != nil {
  256. return
  257. }
  258. classifyInfo.RootID = classifyInfo.ClassifyID
  259. err = classifyInfo.Update([]string{"root_id"})
  260. if err != nil {
  261. return
  262. }
  263. currClassify = CurrClassify{
  264. ClassifyId: classifyInfo.ClassifyID,
  265. ParentId: classifyInfo.ParentID,
  266. ClassifyName: classifyInfo.ClassifyName,
  267. }
  268. currClassifyListMap[classifyName] = currClassify
  269. currParentClassifyMap[parentId] = currClassifyListMap
  270. CurrLevelParentClassifyMap[level] = currParentClassifyMap
  271. }
  272. firstClassifyId = currClassify.ClassifyId
  273. }
  274. // 二级分类
  275. {
  276. parentId := firstClassifyId
  277. classifyName := secondClassifyName
  278. level := twoLevel
  279. // 获取层级下的父级分类map
  280. currParentClassifyMap, ok := CurrLevelParentClassifyMap[level]
  281. if !ok {
  282. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  283. }
  284. // 获取父级id下的分类列表
  285. currClassifyListMap, ok := currParentClassifyMap[parentId]
  286. if !ok {
  287. currClassifyListMap = make(map[string]CurrClassify)
  288. }
  289. // 根据分类名称获取分类
  290. currClassify, ok := currClassifyListMap[classifyName]
  291. if !ok {
  292. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  293. classifyInfo := &data_manage.EdbClassify{
  294. //ClassifyId: 0,
  295. ClassifyType: 0,
  296. ClassifyName: classifyName,
  297. ClassifyNameEn: classifyName,
  298. ParentID: parentId,
  299. RootID: firstClassifyId,
  300. HasData: 0,
  301. CreateTime: time.Now(),
  302. ModifyTime: time.Now(),
  303. SysUserID: 0,
  304. SysUserRealName: "",
  305. Level: level,
  306. UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)),
  307. Sort: 0,
  308. }
  309. err = data_manage.AddEdbClassify(classifyInfo)
  310. if err != nil {
  311. return
  312. }
  313. currClassify = CurrClassify{
  314. ClassifyId: classifyInfo.ClassifyID,
  315. ParentId: classifyInfo.ParentID,
  316. ClassifyName: classifyInfo.ClassifyName,
  317. }
  318. currClassifyListMap[classifyName] = currClassify
  319. currParentClassifyMap[parentId] = currClassifyListMap
  320. CurrLevelParentClassifyMap[level] = currParentClassifyMap
  321. }
  322. secondClassifyId = currClassify.ClassifyId
  323. }
  324. // 三级分类
  325. {
  326. parentId := secondClassifyId
  327. classifyName := thirdClassifyName
  328. level := threeLevel
  329. // 获取层级下的父级分类map
  330. currParentClassifyMap, ok := CurrLevelParentClassifyMap[level]
  331. if !ok {
  332. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  333. }
  334. // 获取父级id下的分类列表
  335. currClassifyListMap, ok := currParentClassifyMap[parentId]
  336. if !ok {
  337. currClassifyListMap = make(map[string]CurrClassify)
  338. }
  339. // 根据分类名称获取分类
  340. currClassify, ok := currClassifyListMap[classifyName]
  341. if !ok {
  342. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  343. classifyInfo := &data_manage.EdbClassify{
  344. //ClassifyId: 0,
  345. ClassifyType: 0,
  346. ClassifyName: classifyName,
  347. ClassifyNameEn: classifyName,
  348. ParentID: parentId,
  349. RootID: firstClassifyId,
  350. HasData: 1,
  351. CreateTime: time.Now(),
  352. ModifyTime: time.Now(),
  353. SysUserID: 0,
  354. SysUserRealName: "",
  355. Level: level,
  356. UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)),
  357. Sort: 0,
  358. }
  359. err = data_manage.AddEdbClassify(classifyInfo)
  360. if err != nil {
  361. return
  362. }
  363. currClassify = CurrClassify{
  364. ClassifyId: classifyInfo.ClassifyID,
  365. ParentId: classifyInfo.ParentID,
  366. ClassifyName: classifyInfo.ClassifyName,
  367. }
  368. currClassifyListMap[classifyName] = currClassify
  369. currParentClassifyMap[parentId] = currClassifyListMap
  370. CurrLevelParentClassifyMap[level] = currParentClassifyMap
  371. }
  372. thirdClassifyId = currClassify.ClassifyId
  373. }
  374. return
  375. }
  376. // handleEdbInfo
  377. // @Description: 处理指标
  378. // @param index
  379. // @param thirdClassifyId
  380. // @return err
  381. func handleEdbInfo(index IndexInfo, thirdClassifyId int64) (err error) {
  382. edbInfo, ok := CurrEdbInfoMap[index.DataIndexCode]
  383. frequency := Frequency(strings.TrimSpace(index.Frequency))
  384. unit := strings.TrimSpace(index.Unit)
  385. sourceName, sourceId, err := GetSource(strings.TrimSpace(index.SourceName))
  386. if err != nil {
  387. return
  388. }
  389. if !ok {
  390. endDate := time.Date(1900, 1, 1, 0, 0, 0, 0, time.Local)
  391. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  392. edbInfo = &data_manage.EdbInfo{
  393. EdbInfoId: 0,
  394. EdbInfoType: utils.EDB_INFO_TYPE,
  395. SourceName: sourceName,
  396. Source: sourceId,
  397. EdbCode: index.SourceEdbCode,
  398. EdbName: index.EdbName,
  399. EdbNameEn: index.EdbName,
  400. EdbNameSource: index.EdbName,
  401. Frequency: frequency,
  402. Unit: unit,
  403. UnitEn: unit,
  404. StartDate: endDate,
  405. EndDate: endDate,
  406. ClassifyId: int(thirdClassifyId),
  407. SysUserId: 0,
  408. SysUserRealName: "",
  409. UniqueCode: utils.MD5(fmt.Sprint(index.SourceEdbCode, "_", utils.DATA_PREFIX+"_"+timestamp)),
  410. CreateTime: time.Now(),
  411. ModifyTime: time.Now(),
  412. BaseModifyTime: time.Now(),
  413. MinValue: 0,
  414. MaxValue: 0,
  415. CalculateFormula: "",
  416. EdbType: utils.EdbTypeBase,
  417. Sort: 0,
  418. LatestDate: "",
  419. LatestValue: 0,
  420. EndValue: 0,
  421. MoveType: 0,
  422. MoveFrequency: "",
  423. NoUpdate: 0,
  424. ServerUrl: "",
  425. ChartImage: "", // 缩略图
  426. Calendar: "",
  427. DataDateType: "",
  428. ManualSave: 0,
  429. EmptyType: 0,
  430. MaxEmptyType: 0,
  431. TerminalCode: "",
  432. DataUpdateTime: "",
  433. ErDataUpdateDate: "",
  434. SourceIndexName: index.EdbName,
  435. SubSource: 0,
  436. SubSourceName: "",
  437. IndicatorCode: "",
  438. StockCode: "",
  439. Extra: "",
  440. IsJoinPermission: 0,
  441. OriginalEdbCode: index.DataIndexCode,
  442. }
  443. err = data_manage.AddEdbInfo(edbInfo)
  444. if err != nil {
  445. return
  446. }
  447. CurrEdbInfoMap[index.DataIndexCode] = edbInfo
  448. // TODO 刷新指标明细数据
  449. fmt.Println(data.RefreshEdbData(edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EndDate.Format(utils.FormatDate)))
  450. return
  451. }
  452. updateCols := make([]string, 0)
  453. if edbInfo.EdbNameEn == edbInfo.EdbName && edbInfo.EdbName != index.EdbName {
  454. edbInfo.EdbNameEn = index.EdbName
  455. updateCols = append(updateCols, "edb_name_en")
  456. }
  457. if edbInfo.EdbName != index.EdbName {
  458. edbInfo.EdbName = index.EdbName
  459. updateCols = append(updateCols, "edb_name")
  460. }
  461. if edbInfo.Frequency != index.Frequency {
  462. edbInfo.Frequency = index.Frequency
  463. updateCols = append(updateCols, "frequency")
  464. }
  465. if edbInfo.UnitEn == edbInfo.Unit && edbInfo.Unit != unit {
  466. edbInfo.UnitEn = unit
  467. updateCols = append(updateCols, "unit_en")
  468. }
  469. if edbInfo.Unit != unit {
  470. edbInfo.Unit = unit
  471. updateCols = append(updateCols, "unit")
  472. }
  473. if edbInfo.ClassifyId != int(thirdClassifyId) {
  474. edbInfo.ClassifyId = int(thirdClassifyId)
  475. updateCols = append(updateCols, "classify_id")
  476. }
  477. if len(updateCols) > 0 {
  478. err = edbInfo.Update(updateCols)
  479. }
  480. return
  481. }
  482. // Frequency
  483. // @Description: 获取频度
  484. // @param unit
  485. // @return string
  486. func Frequency(unit string) string {
  487. switch unit {
  488. case "半月度":
  489. unit = `周度`
  490. case "不定期":
  491. unit = `日度`
  492. case `日度`, `周度`, `旬度`, `月度`, `季度`, `半年度`, `年度`:
  493. default:
  494. unit = ``
  495. }
  496. return unit
  497. }
  498. // GetSource
  499. // @Description: 获取来源
  500. // @param sourceName
  501. // @return gnSourceName
  502. // @return source
  503. // @return err
  504. func GetSource(sourceName string) (gnSourceName string, source int, err error) {
  505. gnSourceName = sourceName
  506. var tableNameSuffix, indexNamePrefix string
  507. tableNamePrefix := "edb_data_gn_"
  508. switch sourceName {
  509. case "CCTD":
  510. tableNameSuffix = "cctd"
  511. case "mysteel":
  512. tableNameSuffix = "mysteel"
  513. case "wind":
  514. tableNameSuffix = "wind"
  515. case "卓创":
  516. tableNameSuffix = "sci"
  517. case "CCI":
  518. tableNameSuffix = "cci"
  519. //return
  520. default:
  521. if strings.Contains(sourceName, "国能购销辅助决策系统") {
  522. gnSourceName = `国能购销辅助决策系统`
  523. } else if strings.Contains(sourceName, "国能市场分析平台") {
  524. gnSourceName = `国能市场分析平台`
  525. }
  526. }
  527. sourceItem := data_manage.GetEdbSourceBySourceName(gnSourceName)
  528. // 如果找不到,说明是
  529. if sourceItem == nil {
  530. indexNamePrefix = strings.ToUpper(tableNameSuffix)
  531. sourceItem = &data_manage.EdbSource{
  532. EdbSourceId: 0,
  533. SourceName: gnSourceName,
  534. TableName: tableNamePrefix + tableNameSuffix,
  535. EdbAddMethod: "gn_index/add",
  536. EdbRefreshMethod: "gn_index/refresh",
  537. IsBase: 1,
  538. FromBridge: 1,
  539. BridgeFlag: "bridge_gn",
  540. SourceExtend: gnSourceName,
  541. EdbCodeRequired: 1,
  542. IndexTableName: "",
  543. SourceNameEn: gnSourceName,
  544. }
  545. err = data_manage.AddEdbSource(sourceItem, indexNamePrefix)
  546. if err != nil {
  547. return
  548. }
  549. }
  550. source = sourceItem.EdbSourceId
  551. return
  552. }