xingzai 1 år sedan
förälder
incheckning
26f6547469

+ 0 - 16
models/industrial_activity_group_management.go

@@ -13,22 +13,6 @@ type CygxIndustrialActivityGroupManagement struct {
 	CreateTime             time.Time `description:"创建时间"`
 }
 
-type CygxIndustrialActivityGroupSubject struct {
-	Id                  int       `orm:"column(id);pk" description:"主键ID"`
-	ActivityId          int       `description:"活动ID"`
-	IndustrialSubjectId int       `description:"cygx_industrial_subject表的文章ID"`
-	Source              int       `description:"来源,1 活动,2专项调研"`
-	CreateTime          time.Time `description:"创建时间"`
-}
-
-type CygxIndustrialSubject struct {
-	IndustrialSubjectId    int       `orm:"column(industrial_subject_id);pk" description:"标的id"`
-	IndustrialManagementId int       `description:"产业id"`
-	SubjectName            string    `description:"标的名称"`
-	CreateTime             time.Time `description:"创建时间"`
-	Source                 int       `description:"来源,1正常添加,2:通过文章添加,3通过活动添加(默认为1)"`
-}
-
 // 获取标的列表
 func GetCygxIndustrialSubjectList(subjectName string) (items []*CygxIndustrialSubject, err error) {
 	o := orm.NewOrm()

+ 34 - 0
models/industrial_activity_group_subject.go

@@ -0,0 +1,34 @@
+package models
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+type CygxIndustrialActivityGroupSubject struct {
+	Id                  int       `orm:"column(id);pk" description:"主键ID"`
+	ActivityId          int       `description:"活动ID"`
+	IndustrialSubjectId int       `description:"cygx_industrial_subject表的文章ID"`
+	Source              int       `description:"来源,1 活动,2专项调研"`
+	CreateTime          time.Time `description:"创建时间"`
+}
+
+type SubjectActivityGroupManagementRep struct {
+	IndustrialSubjectId int    `description:"产业id"`
+	SubjectName         string `description:"标的名称"`
+}
+
+// GetCygxIndustrialActivityGroupSubjectList 获取列表
+func GetCygxIndustrialActivityGroupSubjectList(condition string, pars []interface{}) (list []*CygxIndustrialActivityGroupSubject, err error) {
+	sql := `SELECT
+			*
+			FROM
+			cygx_industrial_activity_group_subject
+			WHERE
+				1 = 1 `
+	if condition != `` {
+		sql += condition
+	}
+	_, err = orm.NewOrm().Raw(sql, pars).QueryRows(&list)
+	return
+}

+ 12 - 0
models/industrial_management.go

@@ -21,6 +21,18 @@ type IndustrialManagementRep struct {
 	IndustryNewLabel       bool   `description:"是否产业新标签"`
 }
 
+// 列表
+func GetIndustrialManagementRepList(condition string, pars []interface{}, startSize, pageSize int) (items []*IndustrialManagementRep, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM cygx_industrial_management as art WHERE 1= 1 `
+	if condition != "" {
+		sql += condition
+	}
+	sql += ` LIMIT ?,?  `
+	_, err = o.Raw(sql, pars, startSize, pageSize).QueryRows(&items)
+	return
+}
+
 type IndustrialManagementCount struct {
 	IndustrialManagementId int `orm:"column(industrial_management_id);pk" description:"产业id"`
 }

+ 24 - 0
models/industrial_subject.go

@@ -0,0 +1,24 @@
+package models
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+type CygxIndustrialSubject struct {
+	IndustrialSubjectId    int       `orm:"column(industrial_subject_id);pk" description:"标的id"`
+	IndustrialManagementId int       `description:"产业id"`
+	SubjectName            string    `description:"标的名称"`
+	CreateTime             time.Time `description:"创建时间"`
+	Source                 int       `description:"来源,1正常添加,2:通过文章添加,3通过活动添加(默认为1)"`
+}
+
+// 获取标的列表
+func GetCygxIndustrialSubjectListCondition(condition string, pars []interface{}) (items []*CygxIndustrialSubject, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * 
+		FROM
+			cygx_industrial_subject WHERE 1 = 1 ` + condition
+	_, err = o.Raw(sql, pars).QueryRows(&items)
+	return
+}

+ 214 - 56
services/es_comprehensive.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/PuerkitoBio/goquery"
+	"github.com/olivere/elastic/v7"
 	"strconv"
 
 	//"go/doc"
@@ -44,16 +45,18 @@ type ElasticComprehensiveDetail struct {
 	SubjectNames string `description:"标的名称"`
 }
 
-//func init() {
-//AddComprehensiveIndustrialSource()
-//AddComprehensiveMeetingreviewchapt()
-//AddComprehensiveActivity()
-//
-//AddComprehensiveActivitySpecial()
-//AddComprehensiveActivityVideo()
-//AddComprehensiveActivityVoice()
-//AddComprehensiveRoadshow()
-//}
+func init() {
+	AddComprehensiveActivityVideo()
+	//AddComprehensiveActivity()
+	//AddComprehensiveIndustrialSource()
+	// AddComprehensiveMeetingreviewchapt()
+	// AddComprehensiveActivity()
+	//
+	// AddComprehensiveActivitySpecial()
+	// AddComprehensiveActivityVideo()
+	// AddComprehensiveActivityVoice()
+	// AddComprehensiveRoadshow()
+}
 
 // 添加文章:报告、纪要
 func AddComprehensiveArticle() {
@@ -123,8 +126,24 @@ func AddComprehensiveChart() {
 
 // 添加产业资源包
 func AddComprehensiveIndustrialSource() {
+	var err error
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+			go utils.SendAlarmMsg("删除数据综合页面数据Es失败"+err.Error(), 2)
+		}
+	}()
 	var condition string
-
+	var pars []interface{}
+	mapActivitySubject := make(map[int][]string, 0)
+	listsubject, e := models.GetCygxIndustrialSubjectListCondition(condition, pars)
+	if e != nil {
+		err = errors.New("GetIndustrialManagementRepList, Err: " + e.Error())
+		return
+	}
+	for _, v := range listsubject {
+		mapActivitySubject[v.IndustrialManagementId] = append(mapActivitySubject[v.IndustrialManagementId], v.SubjectName)
+	}
 	list, err := models.GetSearchResourceList(0, condition, 0, 0)
 	if err != nil {
 		fmt.Println(err)
@@ -132,23 +151,12 @@ func AddComprehensiveIndustrialSource() {
 	}
 	indexName := utils.IndexNameComprehensive
 	for _, v := range list {
-		//content := html.UnescapeString(v.Body)
-		//doc, err := goquery.NewDocumentFromReader(strings.NewReader(content))
-		//if err != nil {
-		//	fmt.Println("create doc err:", err.Error())
-		//	return
-		//}
-		//bodyText := doc.Text()
 		item := new(ElasticComprehensiveDetail)
 		item.SourceId = v.IndustrialManagementId
-		//item.IsSummary = v.IsSummary
 		item.Source = "industrialsource"
 		item.IndustryName = v.IndustryName
+		item.SubjectNames = strings.Join(mapActivitySubject[v.IndustrialManagementId], ",")
 		item.PublishDate = v.PublishDate
-		//item.BodyText = bodyText
-		//item.Annotation, _ = GetReportContentTextSubNew(v.Annotation)
-		//item.Abstract, _ = GetReportContentTextSubNew(v.Abstract)
-
 		EsAddOrEditComprehensiveData(indexName, item)
 		fmt.Println(item)
 	}
@@ -158,7 +166,7 @@ func AddComprehensiveIndustrialSource() {
 func AddComprehensiveMeetingreviewchapt() {
 	var condition string
 	var pars []interface{}
-
+	IndustrialManagementRespMap, _ := GetIndustrialManagementRespMap()
 	list, err := models.GetCygxMorningMeetingReviewChapterList(condition, pars)
 
 	if err != nil {
@@ -176,14 +184,10 @@ func AddComprehensiveMeetingreviewchapt() {
 		bodyText := doc.Text()
 		item := new(ElasticComprehensiveDetail)
 		item.SourceId = v.Id
-		//item.IsSummary = v.IsSummary
 		item.Source = "meetingreviewchapt"
-		item.IndustryName = v.IndustryName
+		item.IndustryName = IndustrialManagementRespMap[v.IndustryId]
 		item.PublishDate = v.PublishTime
 		item.BodyText = bodyText
-		//item.Annotation, _ = GetReportContentTextSubNew(v.Annotation)
-		//item.Abstract, _ = GetReportContentTextSubNew(v.Abstract)
-
 		EsAddOrEditComprehensiveData(indexName, item)
 		fmt.Println(item)
 	}
@@ -194,6 +198,7 @@ func AddComprehensiveActivity() {
 	var condition string
 	var pars []interface{}
 	condition = ` AND publish_status = 1 `
+	//condition = ` AND publish_status = 1  AND  activity_id = 2407 `
 	list, err := models.GetCygxActivityList(condition, pars, 0, 100000)
 
 	if err != nil {
@@ -201,19 +206,54 @@ func AddComprehensiveActivity() {
 		return
 	}
 	indexName := utils.IndexNameComprehensive
+	IndustrialManagementRespMap, _ := GetIndustrialManagementRespMap()
+	IndustrialSubjectMap, _ := GetCygxIndustrialSubjectMap()
+	var condition1 string
+	var pars1 []interface{}
+	mapActivityIndustrialManagement := make(map[int][]string)
+	industrialgroupList, e := models.GetCygxIndustrialActivityGroupManagementList(condition1+"  AND  source = 1 ", pars1)
+	if e != nil {
+		fmt.Println(e)
+		return
+	}
+	for _, v := range industrialgroupList {
+		if v.ActivityId == 0 {
+			continue
+		}
+		mapActivityIndustrialManagement[v.ActivityId] = append(mapActivityIndustrialManagement[v.ActivityId], IndustrialManagementRespMap[v.IndustrialManagementId])
+	}
+
+	mapActivitySubject := make(map[int][]string)
+	SubjectgroupList, e := models.GetCygxIndustrialActivityGroupSubjectList(condition1+"  AND  source = 1 ", pars1)
+	if e != nil {
+		fmt.Println(e)
+		return
+	}
+	for _, v := range SubjectgroupList {
+		if v.ActivityId == 0 {
+			continue
+		}
+		mapActivitySubject[v.ActivityId] = append(mapActivitySubject[v.ActivityId], IndustrialSubjectMap[v.IndustrialSubjectId])
+	}
+	fmt.Println(mapActivitySubject[141])
+	fmt.Println(mapActivityIndustrialManagement[141])
+	//return
 	for _, v := range list {
 		item := new(ElasticComprehensiveDetail)
 		item.SourceId = v.ActivityId
 		//item.IsSummary = v.IsSummary
 		item.Source = "activity"
-		//item.IndustryName = v.IndustryName
+		//item.SubjectNames = json.Sprint(mapActivitySubject[v.ActivityId])
+		item.SubjectNames = strings.Join(mapActivitySubject[v.ActivityId], ",")
+		item.IndustryName = strings.Join(mapActivityIndustrialManagement[v.ActivityId], ",")
 		item.Title = v.ActivityName
 		item.PublishDate = v.ActivityTime
 		//item.BodyText = bodyText
 		//item.Annotation, _ = GetReportContentTextSubNew(v.Annotation)
 		//item.Abstract, _ = GetReportContentTextSubNew(v.Abstract)
 
-		EsAddOrEditComprehensiveData(indexName, item)
+		//EsAddOrEditComprehensiveData(indexName, item)
+		EsDeleteComprehensiveData(indexName, item)
 		fmt.Println(item)
 	}
 }
@@ -230,31 +270,76 @@ func AddComprehensiveActivitySpecial() {
 		return
 	}
 	indexName := utils.IndexNameComprehensive
+	IndustrialManagementRespMap, _ := GetIndustrialManagementRespMap() // 产业标签
+	IndustrialSubjectMap, _ := GetCygxIndustrialSubjectMap()           // 标的标签
+	var condition1 string
+	var pars1 []interface{}
+	mapActivityIndustrialManagement := make(map[int][]string)
+	industrialgroupList, e := models.GetCygxIndustrialActivityGroupManagementList(condition1+"  AND  source = 2 ", pars1)
+	if e != nil {
+		fmt.Println(e)
+		return
+	}
+	for _, v := range industrialgroupList {
+		if v.ActivityId == 0 {
+			continue
+		}
+		mapActivityIndustrialManagement[v.ActivityId] = append(mapActivityIndustrialManagement[v.ActivityId], IndustrialManagementRespMap[v.IndustrialManagementId])
+	}
+
+	mapActivitySubject := make(map[int][]string)
+	SubjectgroupList, e := models.GetCygxIndustrialActivityGroupSubjectList(condition1+"  AND  source = 2 ", pars1)
+	if e != nil {
+		fmt.Println(e)
+		return
+	}
+	for _, v := range SubjectgroupList {
+		if v.ActivityId == 0 {
+			continue
+		}
+		mapActivitySubject[v.ActivityId] = append(mapActivitySubject[v.ActivityId], IndustrialSubjectMap[v.IndustrialSubjectId])
+	}
 	for _, v := range list {
 		item := new(ElasticComprehensiveDetail)
 		item.SourceId = v.ActivityId
-		//item.IsSummary = v.IsSummary
 		item.Source = "activityspecial"
-		//item.IndustryName = v.IndustryName
 		item.Title = v.ResearchTheme
 		item.PublishDate = v.ActivityTime
-		//item.BodyText = bodyText
-		//item.Annotation, _ = GetReportContentTextSubNew(v.Annotation)
-		//item.Abstract, _ = GetReportContentTextSubNew(v.Abstract)
-
+		item.SubjectNames = strings.Join(mapActivitySubject[v.ActivityId], ",")
+		item.IndustryName = strings.Join(mapActivityIndustrialManagement[v.ActivityId], ",")
 		EsAddOrEditComprehensiveData(indexName, item)
-		fmt.Println(item)
 	}
 }
 
 // 添加活动视频
 func AddComprehensiveActivityVideo() {
+	var err error
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+			go utils.SendAlarmMsg("添加活动视频综合页面数据Es失败"+err.Error(), 2)
+		}
+	}()
 	var condition string
 	var pars []interface{}
 
-	list, err := models.GetActivityVideoListAll(condition, pars, 0, 100000)
-	if err != nil {
-		fmt.Println(err)
+	list, e := models.GetActivityVideoListAll(condition, pars, 0, 100000)
+	if e != nil {
+		err = errors.New("GetActivityVideoListAll, Err: " + e.Error())
+		return
+	}
+	if len(list) == 0 {
+		return
+	}
+	var activityIds []int
+	for _, v := range list {
+		activityIds = append(activityIds, v.ActivityId)
+	}
+
+	condition = ` AND publish_status = 1 `
+	listAct, e := models.GetCygxActivityList(condition, pars, 0, 100000)
+	if e != nil {
+		err = errors.New("GetCygxActivityList, Err: " + e.Error())
 		return
 	}
 	indexName := utils.IndexNameComprehensive
@@ -334,26 +419,99 @@ func EsAddOrEditComprehensiveData(indexName string, item *ElasticComprehensiveDe
 	//return
 	defer func() {
 		if err != nil {
-			fmt.Println("EsAddOrEditData Err:", err.Error())
+			go utils.SendAlarmMsg("更新综合页面数据Es失败"+err.Error()+fmt.Sprint(item), 2)
 		}
 	}()
 	client := utils.Client
-	//searchById, err := client.Get().Index(indexName).Do(context.Background())
-	//if err != nil && !strings.Contains(err.Error(), "404") {
-	//	fmt.Println("Get Err" + err.Error())
-	//	return
-	//}
-
-	resp, err := client.Index().Index(indexName).BodyJson(item).Do(context.Background())
-	if err != nil {
-		fmt.Println("新增失败:", err.Error())
-		return err
+	mustMap := make([]interface{}, 0)
+	mustMap = append(mustMap, map[string]interface{}{
+		"term": map[string]interface{}{
+			"SourceId": item.SourceId,
+		},
+	})
+	mustMap = append(mustMap, map[string]interface{}{
+		"term": map[string]interface{}{
+			"Source": item.Source,
+		},
+	})
+	fmt.Println(item.SourceId)
+	queryMap := map[string]interface{}{
+		"query": map[string]interface{}{
+			"bool": map[string]interface{}{
+				"must": mustMap,
+			},
+		},
+	}
+	requestTotalHits := client.Count(indexName).BodyJson(queryMap)
+	total, e := requestTotalHits.Do(context.Background())
+	if e != nil {
+		err = errors.New("requestTotalHits.Do(context.Background()), Err: " + e.Error())
+		return
 	}
-	if resp.Status == 0 && resp.Result == "created" {
-		fmt.Println("新增成功")
-		err = nil
+	//return
+	//根据来源以及ID ,判断内容是否存在,如果存在就新增,如果不存在就修改
+	if total == 0 {
+		resp, e := client.Index().Index(indexName).BodyJson(item).Do(context.Background())
+		if e != nil {
+			err = errors.New("client.Index().Index(indexName).BodyJson(item).Do(context.Background()), Err: " + e.Error())
+			return
+		}
+		if resp.Status == 0 && resp.Result == "created" {
+			//fmt.Println("新增成功")
+			//err = nil
+			return
+		} else {
+			err = errors.New(fmt.Sprint(resp))
+			return
+		}
 	} else {
-		fmt.Println("AddData", resp.Status, resp.Result)
+		//拼接需要改动的前置条件
+		bool_query := elastic.NewBoolQuery()
+		bool_query.Must(elastic.NewTermQuery("SourceId", item.SourceId))
+		bool_query.Must(elastic.NewTermQuery("Source", item.Source))
+		//设置需要改动的内容
+		var script string
+		script += fmt.Sprint("ctx._source['SubjectNames'] = '", item.SubjectNames, "';")
+		script += fmt.Sprint("ctx._source['PublishDate'] = '", item.PublishDate, "';")
+		script += fmt.Sprint("ctx._source['IsSummary'] = '", item.IsSummary, "';")
+		script += fmt.Sprint("ctx._source['Abstract'] = '", item.Abstract, "';")
+		script += fmt.Sprint("ctx._source['Title'] = '", item.Title, "';")
+		script += fmt.Sprint("ctx._source['BodyText'] = '", item.BodyText, "';")
+		script += fmt.Sprint("ctx._source['Annotation'] = '", item.Annotation, "';")
+		script += fmt.Sprint("ctx._source['IndustryName'] = '", item.IndustryName, "'")
+
+		_, e := client.UpdateByQuery(indexName).
+			Query(bool_query).
+			Script(elastic.NewScriptInline(script)).
+			Refresh("true").
+			Do(context.Background())
+		if e != nil {
+			err = errors.New(" client.UpdateByQuery(indexName), Err: " + e.Error())
+			return
+		}
+	}
+	return
+}
+
+// 删除数据
+func EsDeleteComprehensiveData(indexName string, item *ElasticComprehensiveDetail) (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+			go utils.SendAlarmMsg("删除数据综合页面数据Es失败"+err.Error()+fmt.Sprint(item), 2)
+		}
+	}()
+	client := utils.Client
+	//拼接需要删除的前置条件
+	bool_query := elastic.NewBoolQuery()
+	bool_query.Must(elastic.NewTermQuery("SourceId", item.SourceId))
+	bool_query.Must(elastic.NewTermQuery("Source", item.Source))
+	_, e := client.DeleteByQuery(indexName).
+		Query(bool_query).
+		Do(context.Background())
+	if e != nil {
+		err = errors.New(" client.DeleteByQuery(indexName), Err: " + e.Error())
+		return
 	}
 	return
 }

+ 47 - 1
services/industrial_management.go

@@ -41,7 +41,7 @@ func UpdateIndustrialManagementLabel(cont context.Context) (err error) {
 	return
 }
 
-// 修改活动状态
+// 修改状态
 func UpdateIndustrialManagementSubjectNnames() (err error) {
 	defer func() {
 		if err != nil {
@@ -750,3 +750,49 @@ func GetLyjhArticleMap() (mapResp map[int]bool, err error) {
 	}
 	return
 }
+
+// 获取所有的产业id与名称的map关联
+func GetIndustrialManagementRespMap() (itemMap map[int]string, err error) {
+	defer func() {
+		if err != nil {
+			go utils.SendAlarmMsg("GetIndustrialManagementRespMap ErrMsg:"+err.Error(), 2)
+		}
+	}()
+	var condition string
+	var pars []interface{}
+	itemMap = make(map[int]string, 0)
+	industrialList, e := models.GetIndustrialManagementRepList(condition, pars, 0, 999999)
+	if e != nil {
+		err = errors.New("GetIndustrialManagementRepList, Err: " + e.Error())
+		return
+	}
+	if len(industrialList) > 0 {
+		for _, v := range industrialList {
+			itemMap[v.IndustrialManagementId] = v.IndustryName
+		}
+	}
+	return
+}
+
+// 获取所有的标的id与名称的map关联
+func GetCygxIndustrialSubjectMap() (itemMap map[int]string, err error) {
+	defer func() {
+		if err != nil {
+			go utils.SendAlarmMsg("GetCygxIndustrialSubjectMap ErrMsg:"+err.Error(), 2)
+		}
+	}()
+	var condition string
+	var pars []interface{}
+	itemMap = make(map[int]string, 0)
+	list, e := models.GetCygxIndustrialSubjectListCondition(condition, pars)
+	if e != nil {
+		err = errors.New("GetIndustrialManagementRepList, Err: " + e.Error())
+		return
+	}
+	if len(list) > 0 {
+		for _, v := range list {
+			itemMap[v.IndustrialSubjectId] = v.SubjectName
+		}
+	}
+	return
+}

+ 14 - 1
utils/elastic.go

@@ -2,6 +2,8 @@ package utils
 
 import (
 	"github.com/olivere/elastic/v7"
+	"log"
+	"os"
 	"time"
 )
 
@@ -14,10 +16,21 @@ const (
 )
 
 func init() {
+	errorlog := log.New(os.Stdout, "APP", log.LstdFlags)
+	file := ""
+	if RunMode == "release" {
+		//file = `/data/rdlucklog/hongze_cygx/eslog.log`
+		file = `./rdlucklog/eslog.log`
+	} else {
+		file = `./rdlucklog/eslog.log`
+	}
+	logFile, _ := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
 	client, err := elastic.NewClient(
 		elastic.SetURL(ES_URL),
 		elastic.SetBasicAuth(ES_USERNAME, ES_PASSWORD),
-		elastic.SetSniff(false))
+		elastic.SetTraceLog(log.New(logFile, "ES-TRACE: ", 0)),
+		elastic.SetSniff(false), elastic.SetErrorLog(errorlog))
+	//elastic.SetSniff(false))
 	Client = client
 	if err != nil {
 		go SendAlarmMsg("ElasticSearch连接失败", 2)