123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- package task
- import (
- "context"
- "fmt"
- "eta/eta_menu_sync/global"
- "eta/eta_menu_sync/models"
- "eta/eta_menu_sync/services"
- "eta/eta_menu_sync/services/alarm_msg"
- "eta/eta_menu_sync/utils"
- "strings"
- "time"
- )
- func SendMsg() {
- //fmt.Println("start task")
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- fmt.Println("定时发送消息失败:")
- for _, errMsg := range errMsgList {
- fmt.Println(errMsg)
- }
- }
- }()
- wechatHelperObj := models.WechatHelper{}
- list, err := wechatHelperObj.GetAllHelper()
- if err != nil {
- fmt.Println("err:", err)
- return
- }
- wechatMsgPushRecordObj := models.WechatMsgPushRecord{}
- for _, helper := range list {
- //fmt.Println(helper)
- recordList, tmpErr := wechatMsgPushRecordObj.GetNoPushListByHelperIdV2(helper.WechatHelperId, helper.Num)
- if tmpErr != nil {
- errMsgList = append(errMsgList, fmt.Sprint(helper.WechatHelperName, "发送微信消息失败,获取未发送记录失败,Err:", tmpErr.Error()))
- continue
- }
- // 如果没有待推送数据,那么就退出当前循环,进入下一循环
- if len(recordList) <= 0 {
- continue
- }
- {
- recordIdList := make([]int, 0)
- for _, record := range recordList {
- recordIdList = append(recordIdList, record.WechatMsgId)
- }
- tmpErr = wechatMsgPushRecordObj.MultiUpdateStatus(recordIdList)
- if tmpErr != nil {
- errMsgList = append(errMsgList, fmt.Sprint(helper.WechatHelperName, "发送微信消息失败,变更消息发送中状态失败,Err:", tmpErr.Error()))
- continue
- }
- }
- go PushWxMsgList(recordList, helper)
- }
- }
- // PushWxMsgList 处理发送消息列表
- func PushWxMsgList(list []*models.WechatMsgPushRecordItem, helper *models.WechatHelper) {
- num := len(list)
- // 平均延迟时间
- avgDelayTime := 60 / num
- // 最小延迟时间
- minDelayTime := 5
- if avgDelayTime < 2 {
- minDelayTime = 0
- } else if avgDelayTime <= 6 {
- minDelayTime = 3
- }
- //fmt.Println("minDelayTime:", minDelayTime)
- //fmt.Println("avgDelayTime:", avgDelayTime)
- for _, record := range list {
- // 发送消息
- go PushWxMsg(record, helper)
- // 本次延迟时间
- currDelayTime := utils.GetRandInt(minDelayTime, avgDelayTime)
- fmt.Println("下一条消息时间是 ", currDelayTime, " 秒后")
- time.Sleep(time.Duration(currDelayTime) * time.Second)
- }
- }
- // PushWxMsg 真正开始处理推送消息业务
- func PushWxMsg(record *models.WechatMsgPushRecordItem, wxHelper *models.WechatHelper) {
- fmt.Println(time.Now().Format(utils.FormatDateTime), " :", record.WechatMsgId, "开始推送了")
- var errMsg, resultStr string
- defer func() {
- record.ModifyTime = time.Now()
- record.Status = 2
- record.Remark = errMsg
- record.ServerResult = resultStr
- record.WechatHelperId = wxHelper.WechatHelperId
- if errMsg != `` { // 发送失败了
- record.Status = -1
- // TODO 发送邮件提醒?
- } else {
- record.Remark = `发送成功`
- }
- obj := models.WechatMsgPushRecord{}
- err := obj.UpdateV2(record.WechatMsgId, record.WechatHelperId, record.Status, record.Remark, record.ServerResult)
- if err != nil {
- fmt.Println("err:", err)
- }
- //record.Update([]string{"Status", "Remark", "ServerResult", "ModifyTime"})
- }()
- var resp services.WxPushResp
- var err error
- switch record.MsgType { // 消息类型,1:h5链接;2:小程序,3:文字;4:图片
- case 1: // h5链接
- if record.Content == `` {
- errMsg = `发送数据内容为空`
- return
- }
- resp, resultStr, err = services.SendH5(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, record.Title, record.Pic, record.Content, record.JumpUrl)
- case 2: // 小程序
- //GetByJumpPath
- wechatListenMsgRecordObj := models.WechatListenMsgRecord{}
- listenMsg, tmpErr := wechatListenMsgRecordObj.GetByJumpPath(models.ListenWxAppType, record.JumpUrl)
- if tmpErr != nil {
- if tmpErr != utils.ErrNoRow {
- err = tmpErr
- }
- break
- }
- // 替换消息内容,将原来内容里面的 发送人微信标识 替换为 小助手微信标识
- xml := strings.Replace(listenMsg.Msg, listenMsg.FinalFromWechatId, wxHelper.WechatId, -1)
- resp, resultStr, err = services.SendXml(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, xml)
- case 3: // 文字
- if record.Content == `` {
- errMsg = `发送数据内容为空`
- return
- }
- resp, resultStr, err = services.SendText(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, record.Content)
- case 4: // 图片
- resp, resultStr, err = services.SendPic(wxHelper.ServerUrl, wxHelper.WechatId, record.WechatGroupWechatId, record.Pic)
- default:
- errMsg = `错误的推送类型`
- return
- }
- if err != nil {
- errMsg = "发送消息给微信失败,ERR:" + err.Error()
- return
- }
- if resp.Code != 200 {
- errMsg = "发送消息给微信,服务返回失败信息,ERR:" + resp.Msg
- return
- }
- return
- }
- func ListenWechatOnline() {
- offlineHelperList := make([]*models.WechatHelper, 0)
- baseKey := `listen:wechat_helper:`
- defer func() {
- if len(offlineHelperList) > 0 {
- errMsgList := make([]string, 0)
- for _, helper := range offlineHelperList {
- result := global.Redis.SetNX(context.TODO(), fmt.Sprint(baseKey, helper.WechatId), 1, 30*time.Minute)
- if result.Val() {
- errMsgList = append(errMsgList, fmt.Sprint("机器人:", helper.WechatHelperName, ";所属服务器:", helper.ServerUrl))
- }
- }
- if len(errMsgList) > 0 {
- email := `984198890@qq.com;317699326@qq.com;pdzhao@hzinsights.com`
- go alarm_msg.SendAlarmMsg(time.Now().Format(utils.FormatDateTime)+":微信小助手掉线检测:<br/>"+strings.Join(errMsgList, "<br/>"), 3, email)
- }
- }
- }()
- wechatHelperObj := models.WechatHelper{}
- list, err := wechatHelperObj.GetAllHelper()
- if err != nil {
- fmt.Println("err:", err)
- return
- }
- for _, helper := range list {
- resp, _, tmpErr := services.Listen(helper.ServerUrl, helper.WechatId)
- if tmpErr != nil {
- offlineHelperList = append(offlineHelperList, helper)
- continue
- }
- if resp.Code != 200 {
- offlineHelperList = append(offlineHelperList, helper)
- continue
- }
- }
- }
|