浏览代码

Es存储用户阅读记录优化

xingzai 2 年之前
父节点
当前提交
5e81cc8db7
共有 7 个文件被更改,包括 356 次插入6 次删除
  1. 1 1
      controllers/article.go
  2. 83 0
      models/article_history_record_newpv.go
  3. 10 2
      models/send_company_user.go
  4. 251 0
      services/elastic.go
  5. 5 1
      services/task.go
  6. 3 2
      services/wx_user.go
  7. 3 0
      utils/config.go

+ 1 - 1
controllers/article.go

@@ -5,7 +5,6 @@ import (
 	"github.com/pdfcpu/pdfcpu/pkg/api"
 	"github.com/pdfcpu/pdfcpu/pkg/pdfcpu"
 	"io"
-
 	//"bufio"
 	"encoding/json"
 	"fmt"
@@ -578,6 +577,7 @@ func (this *ArticleCommonController) Detail() {
 	//		}
 	//	}
 	//}
+	detail.SellerAndMobile = "" //业务需要强制处理为空
 	resp.HasPermission = 1
 	detail.Abstract, _ = services.GetReportContentTextSub(detail.Abstract)
 	detail.SellerList = sellerList

+ 83 - 0
models/article_history_record_newpv.go

@@ -180,3 +180,86 @@ func UpdateCygxArticleHistoryRecordAll(wxUser *WxUserItem) (err error) {
 	}
 	return
 }
+
+type EsUserInteraction struct {
+	Id                       int    `description:"主键ID"`
+	ArticleId                int    `description:"文章id"`
+	ArticleType              int    `description:"文章类型 1:查研观向, 2:策略平台"`
+	Title                    string `description:"标题"`
+	PublishDate              string `description:"发布时间"`
+	CreateTime               string `description:"创建时间"`
+	StopTime                 string `description:"阅读停留时间"`
+	RealName                 string `description:"姓名"`
+	CompanyName              string `description:"公司名称"`
+	CompanyId                int    `description:"公司ID"`
+	SellerName               string `description:"所属销售"`
+	SellerId                 int    `description:"所属销售ID"`
+	Mobile                   string `description:"手机号"`
+	Email                    string `description:"邮箱"`
+	UserId                   int    `description:"用户ID"`
+	UserArticleHistoryNum    int    `description:"用户阅读数量"`
+	CompanyArticleHistoryNum int    `description:"机构阅读数量"`
+}
+
+//机构阅读记录列表
+func GetCygxArticleHistoryRecordByCompanyList(condition string, startSize, pageSize int) (items []*EsUserInteraction, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT
+			r.id,
+			art.title,
+			art.article_id,
+			art.article_id_md5,
+			art.publish_date,
+			art.category_name,
+			r.create_time,
+			r.mobile,
+			r.user_id,
+			r.company_name,
+			cp.seller_name,
+			cp.seller_id,
+			cp.company_id,
+			r.real_name,
+			r.stop_time,
+			ci.article_history_num AS company_article_history_num,
+			ui.article_history_num AS user_article_history_num 
+		FROM
+			cygx_article_history_record_all AS r
+			INNER JOIN cygx_article AS art ON art.article_id = r.article_id
+			INNER JOIN company_product AS cp ON cp.company_id = r.company_id 
+			AND cp.product_id = 2
+			INNER JOIN cygx_company_interaction_num AS ci ON ci.company_id = r.company_id
+			INNER JOIN cygx_user_interaction_num AS ui ON ui.user_id = r.user_id 
+		WHERE
+			1 = 1 
+			AND r.is_del = 0 ` + condition + ` GROUP BY r.id  `
+	if startSize > 0 || pageSize > 0 {
+		sql += ` LIMIT ` + strconv.Itoa(startSize) + "," + strconv.Itoa(pageSize)
+	}
+	_, err = o.Raw(sql).QueryRows(&items)
+	return
+}
+
+//获取阅读记录数量
+func GetCygxArticleHistoryCountByCompany(condition string) (count int, err error) {
+	o := orm.NewOrm()
+	sqlCount := `SELECT
+	COUNT( 1 ) AS count 
+FROM
+	(
+	SELECT
+		COUNT( 1 ) 
+	FROM
+		cygx_article_history_record_all AS r
+		INNER JOIN cygx_article AS art ON art.article_id = r.article_id
+		INNER JOIN company_product AS cp ON cp.company_id = r.company_id 
+		AND cp.product_id = 2
+		INNER JOIN cygx_company_interaction_num AS ci ON ci.company_id = r.company_id
+		INNER JOIN cygx_user_interaction_num AS ui ON ui.user_id = r.user_id 
+	WHERE
+		r.is_del = 0 ` + condition + `
+	GROUP BY
+	r.id 
+	) AS count `
+	err = o.Raw(sqlCount).QueryRow(&count)
+	return
+}

+ 10 - 2
models/send_company_user.go

@@ -231,9 +231,17 @@ func GetWxUserOpLog(createTime string) (items []*WxUserOpLogResp, err error) {
 }
 
 //获取指定时间内被移动的用户
-func GetWxUserOpLogList(createTime string) (items []*WxUserOpLogResp, err error) {
+func GetWxUserOpLogList(startDate, endDate string) (items []*WxUserOpLogResp, err error) {
 	o := orm.NewOrm()
-	sql := ` SELECT company_id,user_id,mobile FROM wx_user_op_log WHERE  log_type IN ('move','add') AND create_time >=  '` + createTime + `'   GROUP BY user_id `
+	sql := ` SELECT company_id,user_id,mobile FROM wx_user_op_log WHERE  log_type IN ('move','add') AND create_time >=  '` + startDate + `' AND create_time <=  '` + endDate + `'   GROUP BY user_id `
+	_, err = o.Raw(sql).QueryRows(&items)
+	return
+}
+
+//获取指定时间内被删除的用户
+func GetWxUserOpLogDeleteList(startDate, endDate string) (items []*WxUserOpLogResp, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT company_id,user_id,mobile FROM wx_user_op_log WHERE  log_type IN ('delete') AND create_time >=  '` + startDate + `' AND create_time <=  '` + endDate + `'   GROUP BY user_id `
 	_, err = o.Raw(sql).QueryRows(&items)
 	return
 }

+ 251 - 0
services/elastic.go

@@ -10,6 +10,7 @@ import (
 	"sort"
 	"strconv"
 	"strings"
+	"time"
 )
 
 func NewClient() (client *elastic.Client, err error) {
@@ -34,6 +35,256 @@ func NewClient() (client *elastic.Client, err error) {
 	return
 }
 
+//创建文章阅读记录的Es索引
+func CreateIndexNameArticleHistory() {
+	indexName := utils.IndexNameArticleHistory
+	mappingJson := `{
+ "mappings": {
+   "dynamic": true,
+   "properties": {
+        "ArticleId": {
+          "type": "integer"
+        },
+ 		 "Id": {
+          "type": "integer"
+        },
+   		"ArticleType": {
+          "type": "short"
+        },
+        "CompanyArticleHistoryNum": {
+          "type": "integer"
+        },
+        "CompanyName": {
+          "type": "keyword"
+        },
+	   "CompanyId": {
+			  "type": "integer"
+			},
+        "CreateTime": {
+          "type": "date",
+          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
+        },
+        "Email": {
+          "type": "keyword"
+        },
+        "Mobile": {
+          "type": "keyword"
+        },
+        "PublishDate": {
+          "type": "date",
+          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
+        },
+        "RealName": {
+          "type": "keyword"
+        },
+        "SellerName": {
+          "type": "keyword"
+        },
+  		"SellerId": {
+          "type": "integer"
+        },
+        "StopTime": {
+          "type": "integer"
+        },
+        "Title": {
+          "type": "keyword"
+        },
+	   "UserId": {
+		   "type": "integer"
+		},
+        "UserArticleHistoryNum": {
+          "type": "integer"
+        }
+   }
+ }
+}`
+	EsCreateIndex(indexName, mappingJson)
+}
+
+//func UpdateWxUserLabel(cont context.Context) (err error) {
+func AddAllArticleHistory(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			go utils.SendAlarmMsg("同步阅读记录到es失败;Err:"+err.Error(), 2)
+			go utils.SendEmail("同步阅读记录到es失败"+"【"+utils.APPNAME+"】"+time.Now().Format(utils.FormatDateTime), ";Err:"+err.Error(), utils.EmailSendToUsers)
+			utils.FileLog.Info("同步阅读记录到es失败,Err:%s", err.Error())
+		}
+	}()
+
+	var updateUserIds string //更改过的用户ID
+	userIdMap := make(map[int]int)
+	condition := `  AND r.create_time < '` + time.Now().Format(utils.FormatDate) + `' 	AND r.company_id IN (
+					SELECT a.company_id 
+					FROM  company AS a INNER JOIN company_product AS b ON a.company_id = b.company_id 
+					WHERE a.enabled = 1 AND b.STATUS IN ( '正式', '试用', '冻结' )) `
+	totalAll, err := models.GetCygxArticleHistoryCountByCompany(condition)
+	if err != nil {
+		fmt.Println("GetCygxArticleHistoryCountByCompany Err:totalAll", err.Error())
+		return err
+	}
+	//更改阅读总数
+	err = models.UpdateConfigByCode(strconv.Itoa(totalAll), "company_article_history_num")
+	if err != nil {
+		fmt.Println("UpdateConfigByCode Err:totalAll", err.Error())
+		return err
+	}
+	//处理前一天新增的数据
+	condition += `  AND r.create_time >='` + time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + `'`
+	total, err := models.GetCygxArticleHistoryCountByCompany(condition)
+	//fmt.Println(total)
+	//return
+	fmt.Println(total)
+	if err != nil {
+		fmt.Println("GetCygxArticleHistoryCountByCompany Err:", err.Error())
+		return err
+	}
+	for i := 0; i <= total/1000; i++ {
+		allList, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 1000*i, 1000)
+		if err != nil {
+			fmt.Println("GetCygxArticleHistoryRecordByCompanyList Err:", err.Error())
+			return err
+		}
+		for k, v := range allList {
+			fmt.Println(v.Id, "___", k)
+			err := EsAddArticleHistoryData(v)
+			if err != nil {
+				fmt.Println("EsAddOrEditData Err:", err.Error())
+				return err
+			}
+			if _, ok := userIdMap[v.UserId]; !ok {
+				updateUserIds += strconv.Itoa(v.UserId) + ","
+				userIdMap[v.UserId] = v.UserId
+			}
+		}
+	}
+	//处理前一天新增的数据 end
+
+	//处理前一天被移动的用户
+
+	startDate := time.Now().AddDate(0, 0, -1).Format(utils.FormatDate)
+	endDate := time.Now().Format(utils.FormatDate)
+	var mobiles string
+	listUpdateUser, err := models.GetWxUserOpLogList(startDate, endDate)
+	if err != nil && err.Error() != utils.ErrNoRow() {
+		return err
+	}
+	if len(listUpdateUser) > 0 {
+		for _, v := range listUpdateUser {
+			mobiles += "'" + v.Mobile + "',"
+		}
+	}
+	mobiles = strings.TrimRight(mobiles, ",")
+	if mobiles != "" {
+		condition = ` AND r.mobile IN (` + mobiles + `)`
+		//修改用户的阅读记录(es 自动判断,如果有他会修改数据)
+		listUpdatePv, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 0, 0)
+		if err != nil {
+			fmt.Println("GetArticleHistoryRecordAllByMobileList ,Err" + err.Error())
+			return err
+		}
+
+		for _, v := range listUpdatePv {
+			err := EsAddArticleHistoryData(v)
+			if err != nil {
+				fmt.Println("EsAddOrEditData Err:", err.Error())
+				return err
+			}
+		}
+	}
+
+	//处理前一天被移动的用户 end
+
+	//处理前一天被删除的用户
+	{
+		listDeleteUser, err := models.GetWxUserOpLogDeleteList(startDate, endDate)
+		if err != nil && err.Error() != utils.ErrNoRow() {
+			return err
+		}
+		mobiles = ""
+		if len(listUpdateUser) > 0 {
+			for _, v := range listDeleteUser {
+				mobiles += "'" + v.Mobile + "',"
+			}
+		}
+		mobiles = strings.TrimRight(mobiles, ",")
+		if mobiles != "" {
+			condition = ` AND r.mobile IN (` + mobiles + `)`
+			listDeletePv, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 0, 0)
+			if err != nil {
+				fmt.Println("GetArticleHistoryRecordAllByMobileList ,Err" + err.Error())
+				return err
+			}
+			//fmt.Println("Es 删除")
+			for _, v := range listDeletePv {
+				err := EsDeleteData(utils.IndexNameArticleHistory, strconv.Itoa(v.Id))
+				if err != nil {
+					fmt.Println("EsAddOrEditData Err:", err.Error())
+					return err
+				}
+			}
+		}
+	}
+	//处理前一天被删除的用户 end
+
+	//处理新增的阅读记录的用户阅读数量、机构阅读数量(暂未找到批量修改的方法,后期优化处理 2022.7.11)
+	updateUserIds = strings.TrimRight(updateUserIds, ",")
+	if updateUserIds != "" {
+		condition = `  AND r.create_time >='` + time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + `' AND r.user_id IN (` + updateUserIds + `)`
+		total, err := models.GetCygxArticleHistoryCountByCompany(condition)
+		//fmt.Println(total)
+		//return
+		fmt.Println(total)
+		if err != nil {
+			fmt.Println("GetCygxArticleHistoryCountByCompany Err:", err.Error())
+			return err
+		}
+		for i := 0; i <= total/1000; i++ {
+			allList, err := models.GetCygxArticleHistoryRecordByCompanyList(condition, 1000*i, 1000)
+			if err != nil {
+				fmt.Println("GetCygxArticleHistoryRecordByCompanyList Err:", err.Error())
+				return err
+			}
+			for k, v := range allList {
+				fmt.Println(v.Id, "___", k)
+				err := EsAddArticleHistoryData(v)
+				if err != nil {
+					fmt.Println("EsAddOrEditData Err:", err.Error())
+					return err
+				}
+				if _, ok := userIdMap[v.UserId]; !ok {
+					updateUserIds += strconv.Itoa(v.UserId) + ","
+					userIdMap[v.UserId] = v.UserId
+				}
+			}
+		}
+	}
+	return
+}
+
+//新增数据
+func EsAddArticleHistoryData(item *models.EsUserInteraction) (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println("EsAddOrEditData Err:", err.Error())
+		}
+	}()
+	client := utils.Client
+
+	resp, err := client.Index().Index(utils.IndexNameArticleHistory).Id(strconv.Itoa(item.Id)).BodyJson(item).Do(context.Background())
+	if err != nil {
+		fmt.Println("新增失败:", err.Error())
+		return err
+	}
+	if resp.Status == 0 && resp.Result == "created" {
+		//fmt.Println("新增成功")
+		err = nil
+		return err
+	} else {
+		fmt.Println("AddData", resp.Status, resp.Result)
+	}
+	return
+}
+
 //indexName:索引名称
 //mappingJson:表结构
 func EsCreateIndex(indexName, mappingJson string) (err error) {

+ 5 - 1
services/task.go

@@ -69,6 +69,9 @@ func Task() {
 		updateWxUserLabel := task.NewTask("updateWxUserLabel", "0 01 0 * * *", UpdateWxUserLabel) //更新用户的标签
 		task.AddTask("updateWxUserLabel", updateWxUserLabel)
 
+		addAllArticleHistory := task.NewTask("addAllArticleHistory", "0 30 2 * * *", AddAllArticleHistory) //把前一天的用户阅读记录同步到ES
+		task.AddTask("addAllArticleHistory", addAllArticleHistory)
+
 		getCeLueArticlePv := task.NewTask("getCeLueArticlePv", "0 */10 * * * *", GetCeLueArticlePv) //通过三方接口获取策略平台上的阅读记录
 		task.AddTask("getCeLueArticlePv", getCeLueArticlePv)
 	}
@@ -76,7 +79,8 @@ func Task() {
 		getArticleListByApi := task.NewTask("getArticleListByApi", "0 */60 * * * *", GetArticleListByApi) //通过三方接口获取策略平台上的文章
 		task.AddTask("getArticleListByApi", getArticleListByApi)
 	}
-
+	//CreateIndexNameArticleHistory()
+	//AddAllArticleHistory()
 	//DoCompany()
 	//ActivityAttendanceDetail()
 	//SynchronizationArthistory()//同步原有的阅读记录

+ 3 - 2
services/wx_user.go

@@ -104,8 +104,9 @@ func UpdateWxUserLabel(cont context.Context) (err error) {
 	//处理 前一天移动之后的用户的公司记录信息
 
 	{
-		updateTime := time.Now().Add(-time.Hour * 25).Format("2006-01-02 15:04:05")
-		listUpdateUser, err := models.GetWxUserOpLogList(updateTime)
+		startDate := time.Now().AddDate(0, 0, -1).Format(utils.FormatDate)
+		endDate := time.Now().Format(utils.FormatDate)
+		listUpdateUser, err := models.GetWxUserOpLogList(startDate, endDate)
 		if err != nil && err.Error() != utils.ErrNoRow() {
 			return err
 		}

+ 3 - 0
utils/config.go

@@ -54,6 +54,7 @@ var (
 
 var (
 	IndexName                 string
+	IndexNameArticleHistory   string //文章阅读记录Es索引
 	OnlineTime                string
 	SummaryArticleId          int
 	YanxSummaryPermissionId   int    //研选纪要分类ID
@@ -134,6 +135,7 @@ func init() {
 		WxPublicAppSecret = "26c586e7ccb3c575433f0f37797b3eeb"
 		WxPublicId = "gh_b67e0049fb8c"
 		IndexName = "cygx_article_v0622"
+		IndexNameArticleHistory = "cygx_article_history_v07_08"
 
 		//接收附件邮箱
 		EmailTechnology = "mlluo@hzinsights.com;jxu@hzinsights.com;tshen@hzinsights.com;cxzhang@hzinsights.com;jhwang@hzinsights.com"    //科技行业专家邮箱
@@ -166,6 +168,7 @@ func init() {
 		WxPublicAppSecret = "f4d52e34021eee262dce9682b31f8861"
 		WxPublicId = "gh_5dc508325c6f"
 		IndexName = "cygx_article_v1"
+		IndexNameArticleHistory = "cygx_article_history_v1"
 
 		//接收附件邮箱
 		EmailTechnology = "jhwang@hzinsights.com;cxzhang@hzinsights.com"  //科技行业专家邮箱