send_wechat_msg.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package task
  2. import (
  3. "context"
  4. "fmt"
  5. "eta/eta_menu_sync/global"
  6. "eta/eta_menu_sync/models"
  7. "eta/eta_menu_sync/services"
  8. "eta/eta_menu_sync/services/alarm_msg"
  9. "eta/eta_menu_sync/utils"
  10. "strings"
  11. "time"
  12. )
  13. func SendMsg() {
  14. //fmt.Println("start task")
  15. errMsgList := make([]string, 0)
  16. defer func() {
  17. if len(errMsgList) > 0 {
  18. fmt.Println("定时发送消息失败:")
  19. for _, errMsg := range errMsgList {
  20. fmt.Println(errMsg)
  21. }
  22. }
  23. }()
  24. wechatHelperObj := models.WechatHelper{}
  25. list, err := wechatHelperObj.GetAllHelper()
  26. if err != nil {
  27. fmt.Println("err:", err)
  28. return
  29. }
  30. wechatMsgPushRecordObj := models.WechatMsgPushRecord{}
  31. for _, helper := range list {
  32. //fmt.Println(helper)
  33. recordList, tmpErr := wechatMsgPushRecordObj.GetNoPushListByHelperIdV2(helper.WechatHelperId, helper.Num)
  34. if tmpErr != nil {
  35. errMsgList = append(errMsgList, fmt.Sprint(helper.WechatHelperName, "发送微信消息失败,获取未发送记录失败,Err:", tmpErr.Error()))
  36. continue
  37. }
  38. // 如果没有待推送数据,那么就退出当前循环,进入下一循环
  39. if len(recordList) <= 0 {
  40. continue
  41. }
  42. {
  43. recordIdList := make([]int, 0)
  44. for _, record := range recordList {
  45. recordIdList = append(recordIdList, record.WechatMsgId)
  46. }
  47. tmpErr = wechatMsgPushRecordObj.MultiUpdateStatus(recordIdList)
  48. if tmpErr != nil {
  49. errMsgList = append(errMsgList, fmt.Sprint(helper.WechatHelperName, "发送微信消息失败,变更消息发送中状态失败,Err:", tmpErr.Error()))
  50. continue
  51. }
  52. }
  53. go PushWxMsgList(recordList, helper)
  54. }
  55. }
  56. // PushWxMsgList 处理发送消息列表
  57. func PushWxMsgList(list []*models.WechatMsgPushRecordItem, helper *models.WechatHelper) {
  58. num := len(list)
  59. // 平均延迟时间
  60. avgDelayTime := 60 / num
  61. // 最小延迟时间
  62. minDelayTime := 5
  63. if avgDelayTime < 2 {
  64. minDelayTime = 0
  65. } else if avgDelayTime <= 6 {
  66. minDelayTime = 3
  67. }
  68. //fmt.Println("minDelayTime:", minDelayTime)
  69. //fmt.Println("avgDelayTime:", avgDelayTime)
  70. for _, record := range list {
  71. // 发送消息
  72. go PushWxMsg(record, helper)
  73. // 本次延迟时间
  74. currDelayTime := utils.GetRandInt(minDelayTime, avgDelayTime)
  75. fmt.Println("下一条消息时间是 ", currDelayTime, " 秒后")
  76. time.Sleep(time.Duration(currDelayTime) * time.Second)
  77. }
  78. }
  79. // PushWxMsg 真正开始处理推送消息业务
  80. func PushWxMsg(record *models.WechatMsgPushRecordItem, wxHelper *models.WechatHelper) {
  81. fmt.Println(time.Now().Format(utils.FormatDateTime), " :", record.WechatMsgId, "开始推送了")
  82. var errMsg, resultStr string
  83. defer func() {
  84. record.ModifyTime = time.Now()
  85. record.Status = 2
  86. record.Remark = errMsg
  87. record.ServerResult = resultStr
  88. record.WechatHelperId = wxHelper.WechatHelperId
  89. if errMsg != `` { // 发送失败了
  90. record.Status = -1
  91. // TODO 发送邮件提醒?
  92. } else {
  93. record.Remark = `发送成功`
  94. }
  95. obj := models.WechatMsgPushRecord{}
  96. err := obj.UpdateV2(record.WechatMsgId, record.WechatHelperId, record.Status, record.Remark, record.ServerResult)
  97. if err != nil {
  98. fmt.Println("err:", err)
  99. }
  100. //record.Update([]string{"Status", "Remark", "ServerResult", "ModifyTime"})
  101. }()
  102. var resp services.WxPushResp
  103. var err error
  104. switch record.MsgType { // 消息类型,1:h5链接;2:小程序,3:文字;4:图片
  105. case 1: // h5链接
  106. if record.Content == `` {
  107. errMsg = `发送数据内容为空`
  108. return
  109. }
  110. resp, resultStr, err = services.SendH5(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, record.Title, record.Pic, record.Content, record.JumpUrl)
  111. case 2: // 小程序
  112. //GetByJumpPath
  113. wechatListenMsgRecordObj := models.WechatListenMsgRecord{}
  114. listenMsg, tmpErr := wechatListenMsgRecordObj.GetByJumpPath(models.ListenWxAppType, record.JumpUrl)
  115. if tmpErr != nil {
  116. if tmpErr != utils.ErrNoRow {
  117. err = tmpErr
  118. }
  119. break
  120. }
  121. // 替换消息内容,将原来内容里面的 发送人微信标识 替换为 小助手微信标识
  122. xml := strings.Replace(listenMsg.Msg, listenMsg.FinalFromWechatId, wxHelper.WechatId, -1)
  123. resp, resultStr, err = services.SendXml(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, xml)
  124. case 3: // 文字
  125. if record.Content == `` {
  126. errMsg = `发送数据内容为空`
  127. return
  128. }
  129. resp, resultStr, err = services.SendText(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, record.Content)
  130. case 4: // 图片
  131. resp, resultStr, err = services.SendPic(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, record.Pic)
  132. default:
  133. errMsg = `错误的推送类型`
  134. return
  135. }
  136. if err != nil {
  137. errMsg = "发送消息给微信失败,ERR:" + err.Error()
  138. return
  139. }
  140. if resp.Code != 200 {
  141. errMsg = "发送消息给微信,服务返回失败信息,ERR:" + resp.Msg
  142. return
  143. }
  144. return
  145. }
  146. func ListenWechatOnline() {
  147. offlineHelperList := make([]*models.WechatHelper, 0)
  148. baseKey := `listen:wechat_helper:`
  149. defer func() {
  150. if len(offlineHelperList) > 0 {
  151. errMsgList := make([]string, 0)
  152. for _, helper := range offlineHelperList {
  153. result := global.Redis.SetNX(context.TODO(), fmt.Sprint(baseKey, helper.WechatId), 1, 30*time.Minute)
  154. if result.Val() {
  155. errMsgList = append(errMsgList, fmt.Sprint("机器人:", helper.WechatHelperName, ";所属服务器:", helper.ServerUrl))
  156. }
  157. }
  158. if len(errMsgList) > 0 {
  159. email := `984198890@qq.com;317699326@qq.com;pdzhao@hzinsights.com`
  160. go alarm_msg.SendAlarmMsg(time.Now().Format(utils.FormatDateTime)+":微信小助手掉线检测:<br/>"+strings.Join(errMsgList, "<br/>"), 3, email)
  161. }
  162. }
  163. }()
  164. wechatHelperObj := models.WechatHelper{}
  165. list, err := wechatHelperObj.GetAllHelper()
  166. if err != nil {
  167. fmt.Println("err:", err)
  168. return
  169. }
  170. for _, helper := range list {
  171. resp, _, tmpErr := services.Listen(helper.ServerUrl, helper.WechatId)
  172. if tmpErr != nil {
  173. offlineHelperList = append(offlineHelperList, helper)
  174. continue
  175. }
  176. if resp.Code != 200 {
  177. offlineHelperList = append(offlineHelperList, helper)
  178. continue
  179. }
  180. }
  181. }