edb_monitor_message.go 7.3 KB


  1. package edbmonitor
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "eta/eta_api/global"
  6. "eta/eta_api/models"
  7. edbmonitor "eta/eta_api/models/edb_monitor"
  8. "eta/eta_api/models/edb_monitor/response"
  9. "eta/eta_api/utils"
  10. "fmt"
  11. "time"
  12. "github.com/rdlucklib/rdluck_tools/paging"
  13. )
  14. func ReadEdbMonitorMessage(messageId, adminId int) (msg string, err error) {
  15. message, err := edbmonitor.GetEdbMonitorMessageById(messageId)
  16. if err != nil {
  17. if utils.IsErrNoRow(err) {
  18. msg = "消息不存在"
  19. return
  20. }
  21. msg = "获取消息失败"
  22. return
  23. }
  24. if message.AdminId != adminId {
  25. msg = "您没有权限查看该消息"
  26. err = errors.New("no permission")
  27. return
  28. }
  29. message.IsRead = 1
  30. err = message.Update([]string{"IsRead"})
  31. if err != nil {
  32. msg = "已读失败"
  33. return
  34. }
  35. return
  36. }
  37. func ReadEdbMonitorMessageList(messageId []int, adminId int) (msg string, err error) {
  38. err = edbmonitor.BatchModifyEdbMonitorMessageIsRead(messageId, adminId)
  39. if err != nil {
  40. msg = "已读失败"
  41. return
  42. }
  43. return
  44. }
  45. // func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
  46. // conn := MonitorMessageConn[adminId]
  47. // if conn == nil {
  48. // err = errors.New("no connection")
  49. // isClose = true
  50. // return
  51. // }
  52. // _, msg, err := conn.ReadMessage()
  53. // if err != nil {
  54. // isClose = true
  55. // return
  56. // }
  57. // if string(msg) == "ping" {
  58. // healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId)
  59. // err = utils.Rc.Put(healthKey, "1", time.Minute*1)
  60. // if err != nil {
  61. // return
  62. // }
  63. // }
  64. // return
  65. // }
  66. func LogMessage(content, uniqueCode string, triggerTime time.Time, edbInfoId, edbInfoType, adminId, isRead, classifyId int) (err error) {
  67. message := &edbmonitor.EdbMonitorMessage{
  68. EdbInfoId: edbInfoId,
  69. EdbInfoType: edbInfoType,
  70. EdbUniqueCode: uniqueCode,
  71. EdbClassifyId: classifyId,
  72. AdminId: adminId,
  73. IsRead: isRead,
  74. Message: content + "触发预警",
  75. MonitorTriggerTime: triggerTime,
  76. CreateTime: time.Now(),
  77. }
  78. _, err = message.Insert()
  79. return err
  80. }
  81. func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUniqueCode, message string, triggerTime string) (err error) {
  82. conn := global.AdminWebSocketConnMap[adminId]
  83. if conn == nil {
  84. err = errors.New("no connection")
  85. return
  86. }
  87. resp := models.WebsocketMessageResponse{
  88. MessageType: 0,
  89. Data: response.EdbMonitorMessageResp{
  90. EdbInfoId: edbInfoId,
  91. EdbInfoType: edbInfoType,
  92. EdbUniqueCode: edbUniqueCode,
  93. EdbClassifyId: classifyId,
  94. Message: message,
  95. TriggerTime: triggerTime,
  96. },
  97. }
  98. jsonData, err := json.Marshal(resp)
  99. if err != nil {
  100. err = fmt.Errorf("json marshal failed, err:%s", err.Error())
  101. return
  102. }
  103. ok := conn.Send(jsonData)
  104. if !ok {
  105. err = fmt.Errorf("send message failed, err:%s", err.Error())
  106. return
  107. }
  108. return
  109. }
  110. func GetHistoryMessages(adminId int) (items []*response.EdbMonitorMessageResp, err error) {
  111. messageList, err := edbmonitor.GetEdbMonitorMessageByAdminId(adminId)
  112. if err != nil {
  113. return
  114. }
  115. items = toEdbMonitorMessageResp(messageList)
  116. return
  117. }
  118. func GetMessageList(adminid int, currentIndex, pageSize int) (resp response.EdbMonitorMessageListResp, err error) {
  119. startSize := utils.StartIndex(currentIndex, pageSize)
  120. total, err := edbmonitor.GetEdbMonitorMessageCountByAdminId(adminid)
  121. if err != nil {
  122. return
  123. }
  124. if total == 0 {
  125. resp.List = make([]*response.EdbMonitorMessageResp, 0)
  126. resp.Paging = paging.GetPaging(currentIndex, pageSize, total)
  127. return
  128. }
  129. messageList, err := edbmonitor.GetEdbMonitorMessagePageByAdminId(adminid, startSize, pageSize)
  130. if err != nil {
  131. return
  132. }
  133. resp.List = toEdbMonitorMessageResp(messageList)
  134. resp.Paging = paging.GetPaging(currentIndex, pageSize, total)
  135. return
  136. }
  137. func toEdbMonitorMessageResp(items []*edbmonitor.EdbMonitorMessage) (list []*response.EdbMonitorMessageResp) {
  138. list = make([]*response.EdbMonitorMessageResp, 0)
  139. for _, message := range items {
  140. item := response.EdbMonitorMessageResp{
  141. EdbMonitorMessageId: message.EdbMonitorMessageId,
  142. EdbInfoId: message.EdbInfoId,
  143. EdbInfoType: message.EdbInfoType,
  144. EdbUniqueCode: message.EdbUniqueCode,
  145. EdbClassifyId: message.EdbClassifyId,
  146. IsRead: message.IsRead,
  147. Message: message.Message,
  148. TriggerTime: utils.TimeTransferString(utils.FormatDateTime, message.MonitorTriggerTime),
  149. }
  150. list = append(list, &item)
  151. }
  152. return
  153. }
  154. func AutoCheckMonitorMessageList(admins []int) (err error) {
  155. if len(admins) == 0 {
  156. return nil
  157. }
  158. //utils.FileLog.Info("检查是否有预警信息,活跃用户数: %d", len(admins))
  159. // 设置缓存防止重复发送,使用较短的锁定时间
  160. cacheKey := fmt.Sprintf("%s", utils.CACHE_EDB_MONITOR_MESSAGE)
  161. if !utils.Rc.SetNX(cacheKey, 1, 30*time.Second) {
  162. utils.FileLog.Info("其他进程正在处理预警信息,跳过本次检查")
  163. return nil
  164. }
  165. defer func() {
  166. _ = utils.Rc.Delete(cacheKey)
  167. }()
  168. messageList, er := edbmonitor.GetEdbMonitorMessageUnreadListByAdminIds(admins)
  169. if er != nil {
  170. err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
  171. return
  172. }
  173. readList := make([]int, 0)
  174. adminMsgMap := make(map[int]int)
  175. for _, msg := range messageList {
  176. if _, ok := adminMsgMap[msg.AdminId]; !ok {
  177. adminMsgMap[msg.AdminId] = msg.EdbMonitorMessageId
  178. triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
  179. err := SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
  180. if err != nil {
  181. utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
  182. }
  183. }
  184. readList = append(readList, msg.EdbMonitorMessageId)
  185. }
  186. err = edbmonitor.SetEdbMonitorMessageReadByIds(readList)
  187. if err != nil {
  188. err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
  189. return
  190. }
  191. return
  192. }
  193. func AutoCheckMonitorMessageListByAdminId(adminId int) (err error){
  194. // 设置缓存防止重复发送
  195. cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_MONITOR_MESSAGE, adminId)
  196. if !utils.Rc.SetNX(cacheKey, 1, 10*time.Minute) {
  197. err = fmt.Errorf("系统处理中,请稍后重试!")
  198. utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
  199. return
  200. }
  201. defer func() {
  202. if err != nil {
  203. utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
  204. }
  205. _ = utils.Rc.Delete(cacheKey)
  206. }()
  207. utils.FileLog.Info("检查是否有预警信息")
  208. messageList, er := edbmonitor.GetEdbMonitorMessageUnreadByAdminId(adminId)
  209. if er != nil {
  210. err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
  211. return
  212. }
  213. if len(messageList) == 0 {
  214. return
  215. }
  216. readList := make([]int, 0)
  217. for k, msg := range messageList {
  218. if k == 0 {
  219. triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
  220. err = SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
  221. if err != nil {
  222. utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
  223. return
  224. }
  225. }
  226. readList = append(readList, msg.EdbMonitorMessageId)
  227. }
  228. err = edbmonitor.SetEdbMonitorMessageReadByAdminId(adminId, readList)
  229. if err != nil {
  230. err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
  231. return
  232. }
  233. return
  234. }