|
@@ -10,6 +10,7 @@ import (
|
|
|
"sort"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
func NewClient() (client *elastic.Client, err error) {
|
|
@@ -34,6 +35,256 @@ func NewClient() (client *elastic.Client, err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+//创建文章阅读记录的Es索引
|
|
|
+func CreateIndexNameArticleHistory() {
|
|
|
+ indexName := utils.IndexNameArticleHistory
|
|
|
+ mappingJson := `{
|
|
|
+ "mappings": {
|
|
|
+ "dynamic": true,
|
|
|
+ "properties": {
|
|
|
+ "ArticleId": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "Id": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "ArticleType": {
|
|
|
+ "type": "short"
|
|
|
+ },
|
|
|
+ "CompanyArticleHistoryNum": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "CompanyName": {
|
|
|
+ "type": "keyword"
|
|
|
+ },
|
|
|
+ "CompanyId": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "CreateTime": {
|
|
|
+ "type": "date",
|
|
|
+ "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
|
|
|
+ },
|
|
|
+ "Email": {
|
|
|
+ "type": "keyword"
|
|
|
+ },
|
|
|
+ "Mobile": {
|
|
|
+ "type": "keyword"
|
|
|
+ },
|
|
|
+ "PublishDate": {
|
|
|
+ "type": "date",
|
|
|
+ "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
|
|
|
+ },
|
|
|
+ "RealName": {
|
|
|
+ "type": "keyword"
|
|
|
+ },
|
|
|
+ "SellerName": {
|
|
|
+ "type": "keyword"
|
|
|
+ },
|
|
|
+ "SellerId": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "StopTime": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "Title": {
|
|
|
+ "type": "keyword"
|
|
|
+ },
|
|
|
+ "UserId": {
|
|
|
+ "type": "integer"
|
|
|
+ },
|
|
|
+ "UserArticleHistoryNum": {
|
|
|
+ "type": "integer"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}`
|
|
|
+ EsCreateIndex(indexName, mappingJson)
|
|
|
+}
|
|
|
+
|
|
|
+//func UpdateWxUserLabel(cont context.Context) (err error) {
|
|
|
+func AddAllArticleHistory(cont context.Context) (err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ go utils.SendAlarmMsg("同步阅读记录到es失败;Err:"+err.Error(), 2)
|
|
|
+ go utils.SendEmail("同步阅读记录到es失败"+"【"+utils.APPNAME+"】"+time.Now().Format(utils.FormatDateTime), ";Err:"+err.Error(), utils.EmailSendToUsers)
|
|
|
+ utils.FileLog.Info("同步阅读记录到es失败,Err:%s", err.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ var updateUserIds string //更改过的用户ID
|
|
|
+ userIdMap := make(map[int]int)
|
|
|
+ condition := ` AND r.create_time < '` + time.Now().Format(utils.FormatDate) + `' AND r.company_id IN (
|
|
|
+ SELECT a.company_id
|
|
|
+ FROM company AS a INNER JOIN company_product AS b ON a.company_id = b.company_id
|
|
|
+ WHERE a.enabled = 1 AND b.STATUS IN ( '正式', '试用', '冻结' )) `
|
|
|
+ totalAll, err := models.GetCygxArticleHistoryCountByCompany(condition)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetCygxArticleHistoryCountByCompany Err:totalAll", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ //更改阅读总数
|
|
|
+ err = models.UpdateConfigByCode(strconv.Itoa(totalAll), "company_article_history_num")
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("UpdateConfigByCode Err:totalAll", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ //处理前一天新增的数据
|
|
|
+ condition += ` AND r.create_time >='` + time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + `'`
|
|
|
+ total, err := models.GetCygxArticleHistoryCountByCompany(condition)
|
|
|
+ //fmt.Println(total)
|
|
|
+ //return
|
|
|
+ fmt.Println(total)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetCygxArticleHistoryCountByCompany Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ for i := 0; i <= total/1000; i++ {
|
|
|
+ allList, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 1000*i, 1000)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetCygxArticleHistoryRecordByCompanyList Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ for k, v := range allList {
|
|
|
+ fmt.Println(v.Id, "___", k)
|
|
|
+ err := EsAddArticleHistoryData(v)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("EsAddOrEditData Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if _, ok := userIdMap[v.UserId]; !ok {
|
|
|
+ updateUserIds += strconv.Itoa(v.UserId) + ","
|
|
|
+ userIdMap[v.UserId] = v.UserId
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //处理前一天新增的数据 end
|
|
|
+
|
|
|
+ //处理前一天被移动的用户
|
|
|
+
|
|
|
+ startDate := time.Now().AddDate(0, 0, -1).Format(utils.FormatDate)
|
|
|
+ endDate := time.Now().Format(utils.FormatDate)
|
|
|
+ var mobiles string
|
|
|
+ listUpdateUser, err := models.GetWxUserOpLogList(startDate, endDate)
|
|
|
+ if err != nil && err.Error() != utils.ErrNoRow() {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if len(listUpdateUser) > 0 {
|
|
|
+ for _, v := range listUpdateUser {
|
|
|
+ mobiles += "'" + v.Mobile + "',"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mobiles = strings.TrimRight(mobiles, ",")
|
|
|
+ if mobiles != "" {
|
|
|
+ condition = ` AND r.mobile IN (` + mobiles + `)`
|
|
|
+ //修改用户的阅读记录(es 自动判断,如果有他会修改数据)
|
|
|
+ listUpdatePv, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 0, 0)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetArticleHistoryRecordAllByMobileList ,Err" + err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, v := range listUpdatePv {
|
|
|
+ err := EsAddArticleHistoryData(v)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("EsAddOrEditData Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //处理前一天被移动的用户 end
|
|
|
+
|
|
|
+ //处理前一天被删除的用户
|
|
|
+ {
|
|
|
+ listDeleteUser, err := models.GetWxUserOpLogDeleteList(startDate, endDate)
|
|
|
+ if err != nil && err.Error() != utils.ErrNoRow() {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ mobiles = ""
|
|
|
+ if len(listUpdateUser) > 0 {
|
|
|
+ for _, v := range listDeleteUser {
|
|
|
+ mobiles += "'" + v.Mobile + "',"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mobiles = strings.TrimRight(mobiles, ",")
|
|
|
+ if mobiles != "" {
|
|
|
+ condition = ` AND r.mobile IN (` + mobiles + `)`
|
|
|
+ listDeletePv, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 0, 0)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetArticleHistoryRecordAllByMobileList ,Err" + err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ //fmt.Println("Es 删除")
|
|
|
+ for _, v := range listDeletePv {
|
|
|
+ err := EsDeleteData(utils.IndexNameArticleHistory, strconv.Itoa(v.Id))
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("EsAddOrEditData Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //处理前一天被删除的用户 end
|
|
|
+
|
|
|
+ //处理新增的阅读记录的用户阅读数量、机构阅读数量(暂未找到批量修改的方法,后期优化处理 2022.7.11)
|
|
|
+ updateUserIds = strings.TrimRight(updateUserIds, ",")
|
|
|
+ if updateUserIds != "" {
|
|
|
+ condition = ` AND r.create_time >='` + time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + `' AND r.user_id IN (` + updateUserIds + `)`
|
|
|
+ total, err := models.GetCygxArticleHistoryCountByCompany(condition)
|
|
|
+ //fmt.Println(total)
|
|
|
+ //return
|
|
|
+ fmt.Println(total)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetCygxArticleHistoryCountByCompany Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ for i := 0; i <= total/1000; i++ {
|
|
|
+ allList, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 1000*i, 1000)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("GetCygxArticleHistoryRecordByCompanyList Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ for k, v := range allList {
|
|
|
+ fmt.Println(v.Id, "___", k)
|
|
|
+ err := EsAddArticleHistoryData(v)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("EsAddOrEditData Err:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if _, ok := userIdMap[v.UserId]; !ok {
|
|
|
+ updateUserIds += strconv.Itoa(v.UserId) + ","
|
|
|
+ userIdMap[v.UserId] = v.UserId
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+//新增数据
|
|
|
+func EsAddArticleHistoryData(item *models.EsUserInteraction) (err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("EsAddOrEditData Err:", err.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ client := utils.Client
|
|
|
+
|
|
|
+ resp, err := client.Index().Index(utils.IndexNameArticleHistory).Id(strconv.Itoa(item.Id)).BodyJson(item).Do(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("新增失败:", err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if resp.Status == 0 && resp.Result == "created" {
|
|
|
+ //fmt.Println("新增成功")
|
|
|
+ err = nil
|
|
|
+ return err
|
|
|
+ } else {
|
|
|
+ fmt.Println("AddData", resp.Status, resp.Result)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
//indexName:索引名称
|
|
|
//mappingJson:表结构
|
|
|
func EsCreateIndex(indexName, mappingJson string) (err error) {
|