123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- package eta_bridge
- import (
- "context"
- "encoding/json"
- "eta_gn/eta_task/models"
- "eta_gn/eta_task/utils"
- "fmt"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var lockSyncDataNode sync.Mutex
- func SyncDataNode(cont context.Context) (err error) {
- lockSyncDataNode.Lock()
- defer lockSyncDataNode.Unlock()
- syncDataNodeExecute()
- return
- }
- func syncDataNodeExecute() (err error) {
- fmt.Println("准备同步数据节点")
- utils.FileLog.Info("准备同步数据节点")
- errMsgList := make([]string, 0)
- defer func() {
- fmt.Println("同步数据节点结束")
- if err != nil {
- tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
- utils.FileLog.Info(tips)
- }
- if len(errMsgList) > 0 {
- tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
- utils.FileLog.Info(tips)
- }
- }()
- obj := new(models.KnowledgeResource)
- condition := " AND out_id > ? AND resource_type = ?"
- pars := []interface{}{0, models.KnowledgeResourceTypeOpinion}
- knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars)
- if err != nil && !utils.IsErrNoRow(err) {
- errMsgList = append(errMsgList, err.Error())
- return
- }
- param := make(map[string]interface{})
- param["pageSize"] = 1000
- if !utils.IsErrNoRow(err) {
- now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
- if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) {
- param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime)
- } else {
- param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime)
- }
- }
- bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param)
- if err != nil {
- errMsgList = append(errMsgList, errMsg)
- errMsgList = append(errMsgList, err.Error())
- return
- }
- var result models.EtaBridgeKnowledgeResourceResp
- if er := json.Unmarshal(bBody, &result); er != nil {
- errMsgList = append(errMsgList, er.Error())
- err = er
- return
- }
- if result.Code != 200 {
- fmt.Println("eta_bridge同步数据节点失败")
- errMsgList = append(errMsgList, result.Msg)
- return
- }
- if result.Data.Code != "000000" {
- fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败")
- errMsgList = append(errMsgList, result.Data.Mesg)
- return
- }
- err = ViewPointBatchSave(result.Data.Data.Records)
- if err != nil {
- errMsgList = append(errMsgList, err.Error())
- }
- return
- }
- func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) {
- obj := new(models.KnowledgeClassify)
- obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- return
- }
- // 创建新的分类
- err = nil
- obj.ClassifyName = classifyName
- obj.ResourceType = resourceType
- obj.CreateTime = time.Now()
- obj.ModifyTime = time.Now()
- obj.Enabled = 1
- obj.Level = 1
- obj.Sort = 1
- err = obj.Create()
- if err != nil {
- return
- }
- classifyId = obj.ClassifyId
- } else {
- classifyId = obj.ClassifyId
- }
- return
- }
- func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) {
- if len(records) == 0 {
- return
- }
- var outIds []int
- var outMap = make(map[int]models.MarketOverviewRecord)
- for _, record := range records {
- outIds = append(outIds, record.Id)
- outMap[record.Id] = record
- }
- obj := new(models.KnowledgeResource)
- resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion)
- if err != nil {
- return
- }
- for _, resource := range resourceList {
- record, ok := outMap[resource.OutId]
- if !ok {
- continue
- }
- var updateCols []string
- if resource.Title != record.Title {
- resource.Title = record.Title
- updateCols = append(updateCols, "title")
- }
- if resource.Content != record.Content {
- resource.Content = record.Content
- updateCols = append(updateCols, "content")
- }
- recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
- if er != nil {
- fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
- utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
- err = er
- continue
- }
- if !resource.StartTime.Equal(recordTime) {
- resource.StartTime = &recordTime
- updateCols = append(updateCols, "start_time")
- }
- if resource.SourceFrom != record.DataSource {
- resource.SourceFrom = record.DataSource
- updateCols = append(updateCols, "source_from")
- }
- var isDelete int
- if record.IsValidData == 0 {
- isDelete = 1
- }
- if resource.IsDelete != isDelete {
- resource.IsDelete = isDelete
- updateCols = append(updateCols, "is_delete")
- }
- if len(updateCols) > 0 {
- resource.ModifyTime = time.Now()
- updateCols = append(updateCols, "modify_time")
- err = resource.Update(updateCols)
- if err != nil {
- return
- }
- go func() {
- err = EsAddOrEditKnowledgeResource(resource)
- if err != nil {
- utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error())
- }
- }()
- }
- delete(outMap, resource.OutId)
- }
- var addList []*models.KnowledgeResource
- classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion)
- if er != nil {
- err = er
- return
- }
- for _, record := range outMap {
- recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
- if er != nil {
- fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
- utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
- continue
- }
- timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
- var isDelete int
- if record.IsValidData == 0 {
- isDelete = 1
- }
- addList = append(addList, &models.KnowledgeResource{
- OutId: record.Id,
- Title: record.Title,
- Content: record.Content,
- State: models.KnowledgeResourceStateApproved,
- AdminRealName: "无",
- StartTime: &recordTime,
- SourceFrom: record.DataSource,
- ClassifyId: classifyId,
- IsDelete: isDelete,
- ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp),
- ResourceType: models.KnowledgeResourceTypeOpinion,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- })
- }
- err = obj.BatchCreate(addList)
- if err != nil {
- return
- }
- go func() {
- e := EsBatchAddOrEditKnowledgeResource(addList)
- if e != nil {
- utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error())
- }
- }()
- return
- }
|