table_info.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package data
  2. import (
  3. "context"
  4. "fmt"
  5. "hongze/hongze_task/models/data_manage"
  6. "hongze/hongze_task/utils"
  7. "strings"
  8. )
  9. // AppendDataToEdbTable 追加数据到ETA表格
  10. func AppendDataToEdbTable(cont context.Context) (err error) {
  11. errMsgList := make([]string, 0)
  12. defer func() {
  13. if len(errMsgList) > 0 {
  14. go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", "追加数据到ETA表格; ErrMsg:"+strings.Join(errMsgList, "\n"), utils.EmailSendToUsers)
  15. }
  16. }()
  17. tableInfoList, err := data_manage.GetAllTableInfoList()
  18. if err != nil {
  19. errMsgList = append(errMsgList, fmt.Sprint("获取ETA表格列表失败,Err:", err.Error()))
  20. return
  21. }
  22. for _, tableInfo := range tableInfoList {
  23. edbMappingList, tmpErr := data_manage.GetAllTableEdbMappingItemListByTableInfoId(tableInfo.TableInfoId)
  24. if tmpErr != nil {
  25. errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "获取关联关系失败,Err:", tmpErr.Error()))
  26. continue
  27. }
  28. dataMap := make(map[string]map[string]float64)
  29. dateList := make([]string, 0)
  30. mappingMap := make(map[int]string) //指标map的最后一次更新时间
  31. for _, v := range edbMappingList {
  32. //如果最近更新eta表格数据的时间早于最新数据时间,那么需要查询出该数据并插入到eta表格中
  33. if v.EdbDataEndDate.After(v.EndDate) {
  34. dataList, tmpErr := data_manage.GetEdbDataListAll(" and edb_info_id=? and data_time > ?", []interface{}{v.EdbInfoId, v.EndDate.Format(utils.FormatDate)}, v.Source, 1)
  35. if tmpErr != nil {
  36. errMsg := fmt.Sprint(v.TableInfoId, "获取指标数据失败,Err:", tmpErr.Error())
  37. errMsgList = append(errMsgList, errMsg)
  38. continue
  39. }
  40. if len(dataList) <= 0 {
  41. continue
  42. }
  43. for _, data := range dataList {
  44. //指标map的最后一次更新时间
  45. mappingMap[v.TableEdbMappingId] = data.DataTime
  46. tmpDataMap, ok := dataMap[data.DataTime]
  47. if !ok {
  48. tmpDataMap = make(map[string]float64)
  49. dateList = append(dateList, data.DataTime)
  50. }
  51. key := fmt.Sprint("data_col_", v.EdbDataColIndex)
  52. tmpDataMap[key] = data.Value
  53. dataMap[data.DataTime] = tmpDataMap
  54. }
  55. }
  56. }
  57. fmt.Println(dataMap)
  58. fmt.Println(dateList)
  59. for _, date := range dateList {
  60. tmpErr = insertOrUpdateTableData(tableInfo.TableInfoId, date, dataMap[date])
  61. if tmpErr != nil {
  62. errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "插入更新", date, "数据失败,Err:", tmpErr.Error()))
  63. continue
  64. }
  65. }
  66. for tableEdbMappingId, date := range mappingMap {
  67. tmpErr = updateTableEdbMappingEndDate(tableEdbMappingId, date)
  68. if tmpErr != nil {
  69. errMsgList = append(errMsgList, fmt.Sprint(tableEdbMappingId, "更新最后一次数据更新日期数据失败,Err:", tmpErr.Error()))
  70. continue
  71. }
  72. }
  73. }
  74. return
  75. }
  76. // insertOrUpdateTableData 插入/更新ETA表格数据
  77. func insertOrUpdateTableData(tableInfoId int, date string, dataMap map[string]float64) (err error) {
  78. item, err := data_manage.GetBetweenTableDataByTableInfoIdAndDate(tableInfoId, date)
  79. if err != nil && err.Error() != utils.ErrNoRow() {
  80. return
  81. }
  82. err = nil
  83. sql := ``
  84. //如果找不到数据,那么是插入数据
  85. if item == nil {
  86. sort, tmpErr := data_manage.GetMaxSortByTableInfoId(tableInfoId)
  87. if tmpErr != nil {
  88. err = tmpErr
  89. return
  90. }
  91. columnStr := ``
  92. valueStr := ``
  93. for columnValue, value := range dataMap {
  94. columnStr += columnValue + ","
  95. valueStr += fmt.Sprint(value, ",")
  96. }
  97. sort = sort + 1
  98. sql = fmt.Sprintf("INSERT INTO `table_data`( `table_info_id`, `date`, %s `data_type`, `sort`, `modify_time`, `create_time`) VALUES ( %d, '%s', %s 1, %d, now(), now());", columnStr, tableInfoId, date, valueStr, sort)
  99. } else {
  100. updateStr := ``
  101. for columnValue, value := range dataMap {
  102. updateStr += fmt.Sprint(columnValue, "=", value, ",")
  103. }
  104. sql = fmt.Sprintf("UPDATE `table_data` SET %s `modify_time` = now() WHERE `table_data_id` = %d;", updateStr, item.TableDataId)
  105. }
  106. fmt.Println(sql)
  107. err = data_manage.ExecTableDataSql(sql)
  108. return
  109. }
  110. func updateTableEdbMappingEndDate(tableEdbMappingId int, endDate string) (err error) {
  111. sql := fmt.Sprintf("UPDATE `table_edb_mapping` SET `end_date` = '%s', `modify_time` = now() WHERE `table_edb_mapping_id` = %d;", endDate, tableEdbMappingId)
  112. fmt.Println(sql)
  113. err = data_manage.ExecTableDataSql(sql)
  114. return
  115. }