package elastic import ( "context" "errors" "fmt" "github.com/PuerkitoBio/goquery" "github.com/olivere/elastic/v7" "hongze/hz_crm_api/models/cygx" "hongze/hz_crm_api/services/alarm_msg" cygxService "hongze/hz_crm_api/services/cygx" "hongze/hz_crm_api/utils" "html" "strconv" "strings" "time" ) type ElasticComprehensiveDetail struct { SourceId int `description:"资源ID"` IsSummary int `description:"是否是纪要"` Source string `description:"资源类型 报告 :article 、图表 :newchart、微路演 :roadshow、活动 :activity、活动视频:activityvideo、活动音频:activityvoice、专项调研活动:activityspecial 、 本周研究汇总: researchsummary 、 上周纪要汇总 :minutessummary 、晨会精华 :meetingreviewchapt 、 产品内测:productinterior 、 产业资源包:industrialsource"` Title string `description:"标题"` BodyText string `description:"内容"` PublishDate string `description:"发布时间"` Abstract string `description:"摘要"` Annotation string `description:"核心观点"` IndustryName string `description:"产业名称"` SubjectNames string `description:"标的名称"` } // 新增和修改数据 func EsAddOrEditComprehensiveData(item *ElasticComprehensiveDetail) (err error) { indexName := utils.IndexNameComprehensive // 避免调用错别的项目的索引 ,这里写死 //return defer func() { if err != nil { go alarm_msg.SendAlarmMsg("更新综合页面数据Es失败"+err.Error()+fmt.Sprint(item), 2) } }() client, err := NewClient() if err != nil { fmt.Println(err, "err1") return } mustMap := make([]interface{}, 0) mustMap = append(mustMap, map[string]interface{}{ "term": map[string]interface{}{ "SourceId": item.SourceId, }, }) mustMap = append(mustMap, map[string]interface{}{ "term": map[string]interface{}{ "Source": item.Source, }, }) //fmt.Println(item.SourceId) queryMap := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": mustMap, }, }, } requestTotalHits := client.Count(indexName).BodyJson(queryMap) total, e := requestTotalHits.Do(context.Background()) if e != nil { err = errors.New("requestTotalHits.Do(context.Background()), Err: " + e.Error()) return } //return //根据来源以及ID ,判断内容是否存在,如果存在就新增,如果不存在就修改 if total == 0 { resp, e := client.Index().Index(indexName).BodyJson(item).Do(context.Background()) if e != nil { err = errors.New("client.Index().Index(indexName).BodyJson(item).Do(context.Background()), Err: " + e.Error()) return } if resp.Status == 0 && resp.Result == "created" { //fmt.Println("新增成功") //err = nil return } else { err = errors.New(fmt.Sprint(resp)) return } } else { //拼接需要改动的前置条件 bool_query := elastic.NewBoolQuery() bool_query.Must(elastic.NewTermQuery("SourceId", item.SourceId)) bool_query.Must(elastic.NewTermQuery("Source", item.Source)) //设置需要改动的内容 var script string script += fmt.Sprint("ctx._source['SubjectNames'] = '", item.SubjectNames, "';") script += fmt.Sprint("ctx._source['PublishDate'] = '", item.PublishDate, "';") script += fmt.Sprint("ctx._source['IsSummary'] = ", item.IsSummary, ";") script += fmt.Sprint("ctx._source['Abstract'] = '", item.Abstract, "';") script += fmt.Sprint("ctx._source['Title'] = '", item.Title, "';") script += fmt.Sprint("ctx._source['BodyText'] = '", item.BodyText, "';") script += fmt.Sprint("ctx._source['Annotation'] = '", item.Annotation, "';") script += fmt.Sprint("ctx._source['IndustryName'] = '", item.IndustryName, "'") _, e := client.UpdateByQuery(indexName). Query(bool_query). Script(elastic.NewScriptInline(script)). Refresh("true"). Do(context.Background()) if e != nil { err = errors.New(" client.UpdateByQuery(indexName), Err: " + e.Error()) return } } return } // 更新产业资源包 func UpdateComprehensiveIndustrialResourceData(itemSource *ElasticComprehensiveDetail) { var err error defer func() { if err != nil { go alarm_msg.SendAlarmMsg("更新产业资源包到最新数据表失败,Err:"+err.Error()+"资源ID"+strconv.Itoa(itemSource.SourceId), 3) } }() sourceId := itemSource.SourceId var source = itemSource.Source //判断是否存在,如果不存在就新增,存在就更新 totalData, e := cygx.GetCygxResourceDataBySourceAndIdCount(sourceId, source) if e != nil { err = errors.New("GetCygxResourceDataBySourceAndIdCount, Err: " + e.Error()) return } item := new(cygx.CygxResourceData) item.SourceId = sourceId item.Source = source item.PublishDate = itemSource.PublishDate item.CreateTime = time.Now() item.SearchTitle = itemSource.IndustryName + itemSource.SubjectNames item.SearchContent = "" item.SearchOrderTime = itemSource.PublishDate if totalData == 0 { _, e = cygx.AddCygxResourceData(item) if e != nil { err = errors.New("AddCygxResourceData, Err: " + e.Error()) return } } else { e = cygx.UpdateResourceDataByItem(item) if e != nil { err = errors.New("UpdateResourceData, Err: " + e.Error()) return } } return } // 删除数据 func EsDeleteComprehensiveData(item *ElasticComprehensiveDetail) (err error) { indexName := utils.IndexNameComprehensive // 避免调用错别的项目的索引 ,这里写死 defer func() { if err != nil { fmt.Println(err) go alarm_msg.SendAlarmMsg("删除数据综合页面数据Es失败"+err.Error()+fmt.Sprint(item), 2) } }() fmt.Println("删除", item.SourceId) client, err := NewClient() //拼接需要删除的前置条件 bool_query := elastic.NewBoolQuery() bool_query.Must(elastic.NewTermQuery("SourceId", item.SourceId)) bool_query.Must(elastic.NewTermQuery("Source", item.Source)) _, e := client.DeleteByQuery(indexName). Query(bool_query). Do(context.Background()) if e != nil { err = errors.New(" client.DeleteByQuery(indexName), Err: " + e.Error()) return } return } // ES添加文章:报告、纪要 func AddComprehensiveArticle(sourceId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveArticle,Err:"+err.Error(), 3) } }() v, e := cygx.GetArticleInfoOtherByArticleId(sourceId) if e != nil { err = errors.New("GetArticleInfoOtherByArticleId" + e.Error()) return } content := html.UnescapeString(v.Body) doc, e := goquery.NewDocumentFromReader(strings.NewReader(content)) if e != nil { err = errors.New("goquery.NewDocumentFromReader" + e.Error()) return } bodyText := doc.Text() item := new(ElasticComprehensiveDetail) item.SourceId = v.ArticleId item.IsSummary = v.IsSummary item.Source = utils.CYGX_OBJ_ARTICLE item.Title = v.Title item.PublishDate = v.PublishDate.Format(utils.FormatDateTime) item.BodyText = bodyText item.Annotation = html.UnescapeString(v.Annotation) item.Abstract = html.UnescapeString(v.Abstract) if v.PublishStatus == 1 { EsAddOrEditComprehensiveData(item) //如果发布了就新增 } else { EsDeleteComprehensiveData(item) // 没有发布就删除 } return } // Es添加活动 func AddComprehensiveActivity(sourceId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveActivity,Err:"+err.Error(), 3) } }() activityId := sourceId detail, e := cygx.GetAddActivityInfoById(sourceId) if e != nil { err = errors.New("GetAddActivityInfoById" + e.Error()) return } mapActivityIndustrialManagement := make(map[int][]string) mapActivitySubject := make(map[int][]string) industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 1) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error()) return } if len(industrialList) > 0 { for _, v := range industrialList { mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName) } subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 1) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetSubjectActivityGroupManagementList" + e.Error()) return } if len(subjectList) > 0 { for _, v := range subjectList { mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName) } } } item := new(ElasticComprehensiveDetail) item.SourceId = detail.ActivityId item.Source = utils.CYGX_OBJ_ACTIVITY item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",") item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",") item.Title = detail.ActivityName item.PublishDate = detail.ActivityTime if detail.PublishStatus == 1 { EsAddOrEditComprehensiveData(item) //如果发布了就新增 } else { EsDeleteComprehensiveData(item) // 没有发布就删除 } return } // Es添加专项调研活动 func AddComprehensiveActivitySpecial(sourceId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveActivitySpecial,Err:"+err.Error(), 3) } }() activityId := sourceId detail, e := cygx.GetAddActivityInfoSpecialById(sourceId) if e != nil { err = errors.New("GetAddActivityInfoSpecialById" + e.Error()) return } mapActivityIndustrialManagement := make(map[int][]string) mapActivitySubject := make(map[int][]string) industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 2) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error()) return } if len(industrialList) > 0 { for _, v := range industrialList { mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName) } subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 2) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetSubjectActivityGroupManagementList" + e.Error()) return } if len(subjectList) > 0 { for _, v := range subjectList { mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName) } } } item := new(ElasticComprehensiveDetail) item.SourceId = detail.ActivityId item.Source = utils.CYGX_OBJ_ACTIVITYSPECIAL item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",") item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",") item.Title = detail.ResearchTheme if detail.ActivityTime == utils.EmptyDateTimeStr { item.PublishDate = detail.LastUpdatedTime.Format(utils.FormatDateTime) } else { item.PublishDate = detail.ActivityTime } if detail.PublishStatus == 1 { EsAddOrEditComprehensiveData(item) //如果发布了就新增 } else { EsDeleteComprehensiveData(item) // 没有发布就删除 } return } // Es添加活动视频 func AddComprehensiveActivityVideo(activityId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveActivityVideo,Err:"+err.Error(), 3) } }() detailactivity, e := cygx.GetAddActivityInfoById(activityId) if e != nil { err = errors.New("GetAddActivityInfoById" + e.Error()) return } detail, e := cygx.GetCygxActivityVideoReqDetail(activityId) if e != nil { err = errors.New("GetAddActivityInfoById" + e.Error()) return } item := new(ElasticComprehensiveDetail) item.SourceId = detail.VideoId item.Source = utils.CYGX_OBJ_ACTIVITYVIDEO if detail == nil { EsDeleteComprehensiveData(item) //如果活动视频不存在 没有发布就删除 } mapActivityIndustrialManagement := make(map[int][]string) mapActivitySubject := make(map[int][]string) industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 1) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error()) return } if len(industrialList) > 0 { for _, v := range industrialList { mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName) } subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 1) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetSubjectActivityGroupManagementList" + e.Error()) return } if len(subjectList) > 0 { for _, v := range subjectList { mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName) } } } item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",") item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",") item.Title = detail.VideoName item.BodyText = detailactivity.ActivityName item.PublishDate = detailactivity.ActivityTime EsAddOrEditComprehensiveData(item) //新增或者修改 return } // Es添加活动音频 func AddComprehensiveActivityVoice(activityId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveActivityVoice,Err:"+err.Error(), 3) } }() detailactivity, e := cygx.GetAddActivityInfoById(activityId) if e != nil { err = errors.New("GetAddActivityInfoById" + e.Error()) return } detail, e := cygx.GetCygxActivityVoiceReqDetail(activityId) if e != nil { err = errors.New("GetAddActivityInfoById" + e.Error()) return } item := new(ElasticComprehensiveDetail) item.SourceId = detail.ActivityVoiceId item.Source = utils.CYGX_OBJ_ACTIVITYVOICE if detail == nil { EsDeleteComprehensiveData(item) //如果不存在就先删除 } mapActivityIndustrialManagement := make(map[int][]string) mapActivitySubject := make(map[int][]string) industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 1) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error()) return } if len(industrialList) > 0 { for _, v := range industrialList { mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName) } subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 1) if e != nil && e.Error() != utils.ErrNoRow() { err = errors.New("GetSubjectActivityGroupManagementList" + e.Error()) return } if len(subjectList) > 0 { for _, v := range subjectList { mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName) } } } item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",") item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",") item.Title = detail.VoiceName item.BodyText = detailactivity.ActivityName item.PublishDate = detailactivity.ActivityTime EsAddOrEditComprehensiveData(item) //新增或者修改 return } // Es添加微路演 func AddComprehensiveRoadshow(sourceId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveActivityVoice,Err:"+err.Error(), 3) } }() v, e := cygx.GetMicroRoadshowVideoByVideoId(sourceId) if e != nil { err = errors.New("GetMicroRoadshowVideoByVideoId" + e.Error()) return } item := new(ElasticComprehensiveDetail) item.SourceId = v.VideoId item.Source = utils.CYGX_OBJ_ROADSHOW item.IndustryName = v.IndustryName item.Title = v.VideoName item.PublishDate = v.PublishDate.Format(utils.FormatDateTime) if v.PublishStatus == 1 { EsAddOrEditComprehensiveData(item) //如果发布了就新增 } else { EsDeleteComprehensiveData(item) // 没有发布就删除 } return } // Es添加晨会精华 func AddComprehensiveMeetingreviewchapt(sourceId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveMeetingreviewchapt,Err:"+err.Error(), 3) } }() v, e := cygx.GetCygxMorningMeetingReviewChapterDetail(sourceId) if e != nil { err = errors.New("GetCygxMorningMeetingReviewChapterDetail" + e.Error()) return } content := html.UnescapeString(v.Content) doc, e := goquery.NewDocumentFromReader(strings.NewReader(content)) if e != nil { err = errors.New("goquery.NewDocumentFromReader" + e.Error()) return } bodyText := doc.Text() item := new(ElasticComprehensiveDetail) item.SourceId = sourceId item.Source = utils.CYGX_OBJ_MEETINGREVIEWCHAPT item.IndustryName = v.IndustryName item.PublishDate = v.MeetingTime.Format(utils.FormatDateTime) item.Abstract = bodyText EsAddOrEditComprehensiveData(item) //如果发布了就新增 return } // Es删除晨会精华 func DeleteComprehensiveMeetingreviewchapt(sourceId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg("AddComprehensiveMeetingreviewchapt,Err:"+err.Error(), 3) } }() item := new(ElasticComprehensiveDetail) item.SourceId = sourceId item.Source = utils.CYGX_OBJ_MEETINGREVIEWCHAPT EsDeleteComprehensiveData(item) //删除 return } // 添加产业资源包 func AddComprehensiveIndustrialSource(sourceType string, articleId int) { time.Sleep(3 * time.Second) // 延迟三秒处理 var err error defer func() { if err != nil { fmt.Println(err) go alarm_msg.SendAlarmMsg("AddComprehensiveIndustrialSource"+err.Error(), 2) } }() var condition string var pars []interface{} mapActivitySubject := make(map[int][]string, 0) listsubject, e := cygx.GetCygxIndustrialSubjectListCondition(condition, pars) if e != nil { err = errors.New("GetCygxIndustrialSubjectListCondition, Err: " + e.Error()) return } for _, v := range listsubject { mapActivitySubject[v.IndustrialManagementId] = append(mapActivitySubject[v.IndustrialManagementId], v.SubjectName) } var industrialsource string if sourceType == "Hz" { condition = " AND a.article_type_id = 0 " // 弘则资源包 industrialsource = "industrialsourceHz" } else { condition = " AND a.article_type_id > 0 " //研选资源包 industrialsource = "industrialsourceYx" } if articleId > 0 { condition += " AND a.article_id = " + strconv.Itoa(articleId) } list, err := cygx.GetSearchResourceList(0, condition, 0, 0) if err != nil { fmt.Println(err) return } if len(list) == 0 { return } for _, v := range list { item := new(ElasticComprehensiveDetail) item.SourceId = v.IndustrialManagementId item.Source = industrialsource item.IndustryName = v.IndustryName item.SubjectNames = strings.Join(mapActivitySubject[v.IndustrialManagementId], ",") item.PublishDate = v.PublishDate + " 00:00:00" EsAddOrEditComprehensiveData(item) UpdateComprehensiveIndustrialResourceData(item) } } // Es研选专栏 func EsAddYanxuanSpecial(sourceId int) { var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg(fmt.Sprint("更新研选专栏失败sourceId: ", sourceId, err.Error()), 2) } }() detail, e := cygx.GetYanxuanSpecialItemById(sourceId) if e != nil { err = errors.New("GetArticleInfoOtherByArticleId" + e.Error()) return } content := html.UnescapeString(detail.Content) doc, e := goquery.NewDocumentFromReader(strings.NewReader(content)) if e != nil { err = errors.New("goquery.NewDocumentFromReader" + e.Error()) return } bodyText := doc.Text() item := new(ElasticComprehensiveDetail) item.SourceId = detail.Id item.Source = utils.CYGX_OBJ_YANXUANSPECIAL item.Title = detail.Title item.PublishDate = detail.PublishTime item.BodyText = bodyText if detail.Status == 3 { EsAddOrEditComprehensiveData(item) //如果发布了就新增 } else { EsDeleteComprehensiveData(item) // 没有发布就删除 } return } //func init() { // EsAddAskserieVideo(74) //} // Es问答系列视频 func EsAddAskserieVideo(sourceId int) { var err error defer func() { if err != nil { fmt.Println("err:", err) go alarm_msg.SendAlarmMsg(fmt.Sprint("Es更新问答系列视频AddAskserieVideo失败sourceId: ", sourceId, err.Error()), 2) } }() detail, e := cygx.GetCygxAskserieVideoDetail(sourceId) if e != nil { err = errors.New("GetArticleInfoOtherByArticleId" + e.Error()) return } content := html.UnescapeString(detail.VideoName) doc, e := goquery.NewDocumentFromReader(strings.NewReader(content)) if e != nil { err = errors.New("goquery.NewDocumentFromReader" + e.Error()) return } bodyText := doc.Text() item := new(ElasticComprehensiveDetail) item.SourceId = detail.AskserieVideoId item.Source = utils.CYGX_OBJ_ASKSERIEVIDEO item.Title = detail.VideoName item.PublishDate = detail.PublishDate item.BodyText = bodyText mapLabel := cygxService.GetCygxAskserieVideoLabelMap([]int{sourceId}) // 标签 item.IndustryName = mapLabel[sourceId] if detail.PublishStatus == 1 { EsAddOrEditComprehensiveData(item) //如果发布了就新增 } else { EsDeleteComprehensiveData(item) // 没有发布就删除 } return }