gn.go 17 KB


  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models/data_manage"
  6. "eta_gn/eta_task/services/alarm_msg"
  7. "eta_gn/eta_task/services/data"
  8. "eta_gn/eta_task/utils"
  9. "fmt"
  10. "github.com/rdlucklib/rdluck_tools/paging"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // 同步指标信息锁
  18. var lockSyncGnIndex sync.Mutex
  19. const GnEdbListUri = `/index_data/gn/edb/list` // 国能指标列表接口
  20. // CurrLevelParentClassifyMap 当前层级分类map
  21. var CurrLevelParentClassifyMap map[int64]map[int64]map[string]CurrClassify
  22. // CurrEdbInfoMap 当前库里已有的指标map
  23. var CurrEdbInfoMap map[string]*data_manage.EdbInfo
  24. type CurrClassify struct {
  25. ClassifyId int64
  26. ParentId int64
  27. ClassifyName string
  28. }
  29. // SyncGnIndex
  30. // @Description: 定时同步指标信息
  31. // @author: Roc
  32. // @datetime 2024-03-07 17:39:34
  33. // @param cont context.Context
  34. // @return err error
  35. func SyncGnIndex(cont context.Context) (err error) {
  36. fmt.Println("开始同步指标")
  37. lockSyncGnIndex.Lock()
  38. errMsgList := make([]string, 0)
  39. defer func() {
  40. if err != nil {
  41. tips := "SyncGnIndex-定时同步国能的指标信息到ETA失败, ErrMsg:\n" + err.Error()
  42. utils.FileLog.Info(tips)
  43. go alarm_msg.SendAlarmMsg(tips, 3)
  44. }
  45. if len(errMsgList) > 0 {
  46. tips := "SyncGnIndex-定时同步国能的指标信息到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  47. utils.FileLog.Info(tips)
  48. go alarm_msg.SendAlarmMsg(tips, 3)
  49. }
  50. lockSyncGnIndex.Unlock()
  51. }()
  52. initCurrEdbInfoMap()
  53. initCurrLevelParentClassifyMap()
  54. var lastUpdateTimeStr string // 上一次更新的时间
  55. err, errMsgList = syncGnIndex(1, utils.SyncCrmIndexNum, lastUpdateTimeStr)
  56. return
  57. }
  58. // initCurrLevelParentClassifyMap
  59. // @Description: 初始化当前层级分类map
  60. func initCurrLevelParentClassifyMap() {
  61. var condition string
  62. var pars []interface{}
  63. // 普通指标分类
  64. condition = " AND classify_type = ? "
  65. pars = append(pars, 0)
  66. classifyList, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars)
  67. if err != nil {
  68. utils.FileLog.Error("获取分类列表数据失败:" + err.Error())
  69. return
  70. }
  71. // 清空缓存
  72. CurrLevelParentClassifyMap = make(map[int64]map[int64]map[string]CurrClassify)
  73. for _, v := range classifyList {
  74. currParentClassifyMap, ok := CurrLevelParentClassifyMap[v.Level]
  75. if !ok {
  76. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  77. }
  78. currClassifyMap, ok := currParentClassifyMap[v.ParentID]
  79. if !ok {
  80. currClassifyMap = make(map[string]CurrClassify)
  81. }
  82. classifyName := strings.TrimSpace(v.ClassifyName)
  83. currClassifyMap[classifyName] = CurrClassify{
  84. ClassifyId: v.ClassifyID,
  85. ParentId: v.ParentID,
  86. ClassifyName: classifyName,
  87. }
  88. currParentClassifyMap[v.ParentID] = currClassifyMap
  89. CurrLevelParentClassifyMap[v.Level] = currParentClassifyMap
  90. }
  91. }
  92. // initCurrEdbInfoMap
  93. // @Description: 初始化当前指标map
  94. func initCurrEdbInfoMap() {
  95. // 获取指标列表
  96. edbInfoList, err := data_manage.GetAllBaseEdbInfo()
  97. if err != nil {
  98. utils.FileLog.Error("获取指标列表数据失败:" + err.Error())
  99. return
  100. }
  101. // 清空缓存
  102. CurrEdbInfoMap = make(map[string]*data_manage.EdbInfo)
  103. for _, v := range edbInfoList {
  104. CurrEdbInfoMap[v.OriginalEdbCode] = v
  105. }
  106. }
  107. // EtaBridgeGnIndexListResp
  108. // @Description: 指标列表返回数据
  109. type EtaBridgeGnIndexListResp struct {
  110. Code int `json:"code" description:"状态码"`
  111. Msg string `json:"msg" description:"提示信息"`
  112. Data IndexListResp `json:"data" description:"返回数据"`
  113. }
  114. // IndexListResp
  115. // @Description: 指标列表数据
  116. type IndexListResp struct {
  117. Paging paging.PagingItem `description:"分页数据"`
  118. List []IndexInfo
  119. }
  120. // IndexInfo
  121. // @Description: 指标信息
  122. type IndexInfo struct {
  123. ClassifyNameOne string `description:"一级目录"`
  124. ClassifyNameTwo string `description:"二级目录"`
  125. ClassifyNameThree string `description:"三级目录"`
  126. DataIndexCode string `description:"数据节点指标编码"`
  127. SourceEdbCode string `description:"数据源指标原始编码"`
  128. EdbName string `description:"指标名称"`
  129. Frequency string `description:"频度"`
  130. Unit string `description:"单位"`
  131. SourceName string `description:"来源"`
  132. }
  133. // BridgeGnIndexParams
  134. // @Description: 桥接服务-获取国能指标数据入参
  135. type BridgeGnIndexParams struct {
  136. LastModifyTime string `json:"last_modify_time" description:"最近一次更新时间"`
  137. PageIndex int `json:"page_index" description:"当前页码"`
  138. PageSize int `json:"page_size" description:"每页数量"`
  139. }
  140. // syncCrmIndex
  141. // @Description: 开始同步CRM指标信息
  142. // @author: Roc
  143. // @datetime 2024-05-17 15:55:11
  144. // @param assetPkgCd string
  145. // @param currIndex int
  146. // @param pageSize int
  147. // @param lastUpdateTimeStr string
  148. // @return err error
  149. // @return errMsgList []string
  150. func syncGnIndex(currIndex, pageSize int, baseLastUpdateTimeStr string) (err error, errMsgList []string) {
  151. fmt.Println("开始第", currIndex, "页的更新")
  152. errMsgList = make([]string, 0)
  153. lastUpdateTimeStr := baseLastUpdateTimeStr
  154. if lastUpdateTimeStr != `` {
  155. lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr)
  156. }
  157. params := BridgeGnIndexParams{
  158. LastModifyTime: lastUpdateTimeStr,
  159. PageIndex: currIndex,
  160. PageSize: pageSize,
  161. }
  162. bResult, err, _ := HttpEtaBridgePost(GnEdbListUri, params)
  163. if err != nil {
  164. return
  165. }
  166. var result EtaBridgeGnIndexListResp
  167. err = json.Unmarshal(bResult, &result)
  168. if err != nil {
  169. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult))
  170. utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult))
  171. return
  172. }
  173. //totalPage := result.Data.Paging.Pages
  174. // 处理指标信息
  175. for _, v := range result.Data.List {
  176. tmpErr := handleIndex(v)
  177. if tmpErr != nil {
  178. errMsgList = append(errMsgList, tmpErr.Error())
  179. }
  180. }
  181. // 如果还有下一页,那么就继续请求下一页
  182. if currIndex < result.Data.Paging.Pages {
  183. _, tmpErrMsgList := syncGnIndex(currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr)
  184. errMsgList = append(errMsgList, tmpErrMsgList...)
  185. }
  186. return
  187. }
  188. // handleIndex
  189. // @Description: 指标处理
  190. // @param index
  191. // @return err
  192. func handleIndex(index IndexInfo) (err error) {
  193. // 处理分类(如果不存在就创建)
  194. _, _, thirdClassifyId, err := handleClassify(index)
  195. if err != nil {
  196. return
  197. }
  198. // 处理指标(如果不存在就创建)
  199. err = handleEdbInfo(index, thirdClassifyId)
  200. return
  201. }
  202. // handleClassify
  203. // @Description: 分类处理
  204. // @param index
  205. // @return firstClassifyId
  206. // @return secondClassifyId
  207. // @return thirdClassifyId
  208. // @return err
  209. func handleClassify(index IndexInfo) (firstClassifyId, secondClassifyId, thirdClassifyId int64, err error) {
  210. firstClassifyName := strings.TrimSpace(index.ClassifyNameOne)
  211. secondClassifyName := strings.TrimSpace(index.ClassifyNameTwo)
  212. thirdClassifyName := strings.TrimSpace(index.ClassifyNameThree)
  213. var oneLevel, twoLevel, threeLevel int64
  214. oneLevel = 1
  215. twoLevel = 2
  216. threeLevel = 3
  217. // 一级分类
  218. {
  219. var parentId int64
  220. parentId = 0
  221. classifyName := firstClassifyName
  222. level := oneLevel
  223. // 获取层级下的父级分类map
  224. currParentClassifyMap, ok := CurrLevelParentClassifyMap[level]
  225. if !ok {
  226. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  227. }
  228. // 获取父级id下的分类列表
  229. currClassifyListMap, ok := currParentClassifyMap[parentId]
  230. if !ok {
  231. currClassifyListMap = make(map[string]CurrClassify)
  232. }
  233. // 根据分类名称获取分类
  234. currClassify, ok := currClassifyListMap[classifyName]
  235. if !ok {
  236. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  237. classifyInfo := &data_manage.EdbClassify{
  238. //ClassifyId: 0,
  239. ClassifyType: 0,
  240. ClassifyName: classifyName,
  241. ClassifyNameEn: classifyName,
  242. ParentID: parentId,
  243. RootID: 0,
  244. HasData: 0,
  245. CreateTime: time.Now(),
  246. ModifyTime: time.Now(),
  247. SysUserID: 0,
  248. SysUserRealName: "",
  249. Level: level,
  250. UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)),
  251. Sort: 0,
  252. }
  253. err = data_manage.AddEdbClassify(classifyInfo)
  254. if err != nil {
  255. return
  256. }
  257. classifyInfo.RootID = classifyInfo.ClassifyID
  258. err = classifyInfo.Update([]string{"root_id"})
  259. if err != nil {
  260. return
  261. }
  262. currClassify = CurrClassify{
  263. ClassifyId: classifyInfo.ClassifyID,
  264. ParentId: classifyInfo.ParentID,
  265. ClassifyName: classifyInfo.ClassifyName,
  266. }
  267. currClassifyListMap[classifyName] = currClassify
  268. currParentClassifyMap[parentId] = currClassifyListMap
  269. CurrLevelParentClassifyMap[level] = currParentClassifyMap
  270. }
  271. firstClassifyId = currClassify.ClassifyId
  272. }
  273. // 二级分类
  274. {
  275. parentId := firstClassifyId
  276. classifyName := secondClassifyName
  277. level := twoLevel
  278. // 获取层级下的父级分类map
  279. currParentClassifyMap, ok := CurrLevelParentClassifyMap[level]
  280. if !ok {
  281. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  282. }
  283. // 获取父级id下的分类列表
  284. currClassifyListMap, ok := currParentClassifyMap[parentId]
  285. if !ok {
  286. currClassifyListMap = make(map[string]CurrClassify)
  287. }
  288. // 根据分类名称获取分类
  289. currClassify, ok := currClassifyListMap[classifyName]
  290. if !ok {
  291. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  292. classifyInfo := &data_manage.EdbClassify{
  293. //ClassifyId: 0,
  294. ClassifyType: 0,
  295. ClassifyName: classifyName,
  296. ClassifyNameEn: classifyName,
  297. ParentID: parentId,
  298. RootID: firstClassifyId,
  299. HasData: 0,
  300. CreateTime: time.Now(),
  301. ModifyTime: time.Now(),
  302. SysUserID: 0,
  303. SysUserRealName: "",
  304. Level: level,
  305. UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)),
  306. Sort: 0,
  307. }
  308. err = data_manage.AddEdbClassify(classifyInfo)
  309. if err != nil {
  310. return
  311. }
  312. currClassify = CurrClassify{
  313. ClassifyId: classifyInfo.ClassifyID,
  314. ParentId: classifyInfo.ParentID,
  315. ClassifyName: classifyInfo.ClassifyName,
  316. }
  317. currClassifyListMap[classifyName] = currClassify
  318. currParentClassifyMap[parentId] = currClassifyListMap
  319. CurrLevelParentClassifyMap[level] = currParentClassifyMap
  320. }
  321. secondClassifyId = currClassify.ClassifyId
  322. }
  323. // 三级分类
  324. {
  325. parentId := secondClassifyId
  326. classifyName := thirdClassifyName
  327. level := threeLevel
  328. // 获取层级下的父级分类map
  329. currParentClassifyMap, ok := CurrLevelParentClassifyMap[level]
  330. if !ok {
  331. currParentClassifyMap = make(map[int64]map[string]CurrClassify)
  332. }
  333. // 获取父级id下的分类列表
  334. currClassifyListMap, ok := currParentClassifyMap[parentId]
  335. if !ok {
  336. currClassifyListMap = make(map[string]CurrClassify)
  337. }
  338. // 根据分类名称获取分类
  339. currClassify, ok := currClassifyListMap[classifyName]
  340. if !ok {
  341. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  342. classifyInfo := &data_manage.EdbClassify{
  343. //ClassifyId: 0,
  344. ClassifyType: 0,
  345. ClassifyName: classifyName,
  346. ClassifyNameEn: classifyName,
  347. ParentID: parentId,
  348. RootID: firstClassifyId,
  349. HasData: 1,
  350. CreateTime: time.Now(),
  351. ModifyTime: time.Now(),
  352. SysUserID: 0,
  353. SysUserRealName: "",
  354. Level: level,
  355. UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)),
  356. Sort: 0,
  357. }
  358. err = data_manage.AddEdbClassify(classifyInfo)
  359. if err != nil {
  360. return
  361. }
  362. currClassify = CurrClassify{
  363. ClassifyId: classifyInfo.ClassifyID,
  364. ParentId: classifyInfo.ParentID,
  365. ClassifyName: classifyInfo.ClassifyName,
  366. }
  367. currClassifyListMap[classifyName] = currClassify
  368. currParentClassifyMap[parentId] = currClassifyListMap
  369. CurrLevelParentClassifyMap[level] = currParentClassifyMap
  370. }
  371. thirdClassifyId = currClassify.ClassifyId
  372. }
  373. return
  374. }
  375. // handleEdbInfo
  376. // @Description: 处理指标
  377. // @param index
  378. // @param thirdClassifyId
  379. // @return err
  380. func handleEdbInfo(index IndexInfo, thirdClassifyId int64) (err error) {
  381. edbInfo, ok := CurrEdbInfoMap[index.DataIndexCode]
  382. frequency := Frequency(strings.TrimSpace(index.Frequency))
  383. unit := strings.TrimSpace(index.Unit)
  384. sourceName, sourceId, err := GetSource(strings.TrimSpace(index.SourceName))
  385. if err != nil {
  386. return
  387. }
  388. if !ok {
  389. endDate := time.Date(1900, 1, 1, 0, 0, 0, 0, time.Local)
  390. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  391. edbInfo = &data_manage.EdbInfo{
  392. EdbInfoId: 0,
  393. EdbInfoType: utils.EDB_INFO_TYPE,
  394. SourceName: sourceName,
  395. Source: sourceId,
  396. EdbCode: index.SourceEdbCode,
  397. EdbName: index.EdbName,
  398. EdbNameEn: index.EdbName,
  399. EdbNameSource: index.EdbName,
  400. Frequency: frequency,
  401. Unit: unit,
  402. UnitEn: unit,
  403. StartDate: endDate,
  404. EndDate: endDate,
  405. ClassifyId: int(thirdClassifyId),
  406. SysUserId: 0,
  407. SysUserRealName: "",
  408. UniqueCode: utils.MD5(fmt.Sprint(index.SourceEdbCode, "_", utils.DATA_PREFIX+"_"+timestamp)),
  409. CreateTime: time.Now(),
  410. ModifyTime: time.Now(),
  411. BaseModifyTime: time.Now(),
  412. MinValue: 0,
  413. MaxValue: 0,
  414. CalculateFormula: "",
  415. EdbType: utils.EdbTypeBase,
  416. Sort: 0,
  417. LatestDate: "",
  418. LatestValue: 0,
  419. EndValue: 0,
  420. MoveType: 0,
  421. MoveFrequency: "",
  422. NoUpdate: 0,
  423. ServerUrl: "",
  424. ChartImage: "", // 缩略图
  425. Calendar: "",
  426. DataDateType: "",
  427. ManualSave: 0,
  428. EmptyType: 0,
  429. MaxEmptyType: 0,
  430. TerminalCode: "",
  431. DataUpdateTime: "",
  432. ErDataUpdateDate: "",
  433. SourceIndexName: index.EdbName,
  434. SubSource: 0,
  435. SubSourceName: "",
  436. IndicatorCode: "",
  437. StockCode: "",
  438. Extra: "",
  439. IsJoinPermission: 0,
  440. OriginalEdbCode: index.DataIndexCode,
  441. }
  442. err = data_manage.AddEdbInfo(edbInfo)
  443. if err != nil {
  444. return
  445. }
  446. CurrEdbInfoMap[index.DataIndexCode] = edbInfo
  447. // TODO 刷新指标明细数据
  448. fmt.Println(data.RefreshEdbData(edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EndDate.Format(utils.FormatDate)))
  449. return
  450. }
  451. updateCols := make([]string, 0)
  452. if edbInfo.EdbNameEn == edbInfo.EdbName && edbInfo.EdbName != index.EdbName {
  453. edbInfo.EdbNameEn = index.EdbName
  454. updateCols = append(updateCols, "edb_name_en")
  455. }
  456. if edbInfo.EdbName != index.EdbName {
  457. edbInfo.EdbName = index.EdbName
  458. updateCols = append(updateCols, "edb_name")
  459. }
  460. if edbInfo.Frequency != index.Frequency {
  461. edbInfo.Frequency = index.Frequency
  462. updateCols = append(updateCols, "frequency")
  463. }
  464. if edbInfo.UnitEn == edbInfo.Unit && edbInfo.Unit != unit {
  465. edbInfo.UnitEn = unit
  466. updateCols = append(updateCols, "unit_en")
  467. }
  468. if edbInfo.Unit != unit {
  469. edbInfo.Unit = unit
  470. updateCols = append(updateCols, "unit")
  471. }
  472. if edbInfo.ClassifyId != int(thirdClassifyId) {
  473. edbInfo.ClassifyId = int(thirdClassifyId)
  474. updateCols = append(updateCols, "classify_id")
  475. }
  476. if len(updateCols) > 0 {
  477. err = edbInfo.Update(updateCols)
  478. }
  479. return
  480. }
  481. // Frequency
  482. // @Description: 获取频度
  483. // @param unit
  484. // @return string
  485. func Frequency(unit string) string {
  486. switch unit {
  487. case "半月度":
  488. unit = `周度`
  489. case "不定期":
  490. unit = `日度`
  491. case `日度`, `周度`, `旬度`, `月度`, `季度`, `半年度`, `年度`:
  492. default:
  493. unit = ``
  494. }
  495. return unit
  496. }
  497. // GetSource
  498. // @Description: 获取来源
  499. // @param sourceName
  500. // @return gnSourceName
  501. // @return source
  502. // @return err
  503. func GetSource(sourceName string) (gnSourceName string, source int, err error) {
  504. gnSourceName = sourceName
  505. var tableNameSuffix, indexNamePrefix string
  506. tableNamePrefix := "edb_data_gn_"
  507. switch sourceName {
  508. case "CCTD":
  509. tableNameSuffix = "cctd"
  510. case "mysteel":
  511. tableNameSuffix = "mysteel"
  512. case "wind":
  513. tableNameSuffix = "wind"
  514. case "卓创":
  515. tableNameSuffix = "sci"
  516. case "CCI":
  517. tableNameSuffix = "cci"
  518. //return
  519. default:
  520. if strings.Contains(sourceName, "国能购销辅助决策系统") {
  521. gnSourceName = `国能购销辅助决策系统`
  522. } else if strings.Contains(sourceName, "国能市场分析平台") {
  523. gnSourceName = `国能市场分析平台`
  524. }
  525. }
  526. sourceItem := data_manage.GetEdbSourceBySourceName(gnSourceName)
  527. // 如果找不到,说明是
  528. if sourceItem == nil {
  529. indexNamePrefix = strings.ToUpper(tableNameSuffix)
  530. sourceItem = &data_manage.EdbSource{
  531. EdbSourceId: 0,
  532. SourceName: gnSourceName,
  533. TableName: tableNamePrefix + tableNameSuffix,
  534. EdbAddMethod: "gn_index/add",
  535. EdbRefreshMethod: "gn_index/refresh",
  536. IsBase: 1,
  537. FromBridge: 1,
  538. BridgeFlag: "bridge_gn",
  539. SourceExtend: gnSourceName,
  540. EdbCodeRequired: 1,
  541. IndexTableName: "",
  542. SourceNameEn: gnSourceName,
  543. }
  544. err = data_manage.AddEdbSource(sourceItem, indexNamePrefix)
  545. if err != nil {
  546. return
  547. }
  548. }
  549. source = sourceItem.EdbSourceId
  550. return
  551. }