package eta_bridge import ( "context" "encoding/json" "eta_gn/eta_task/models" "eta_gn/eta_task/utils" "fmt" "strconv" "strings" "sync" "time" ) var lockSyncDataNode sync.Mutex func SyncDataNode(cont context.Context) (err error) { lockSyncDataNode.Lock() defer lockSyncDataNode.Unlock() syncDataNodeExecute() return } func syncDataNodeExecute() (err error) { fmt.Println("准备同步数据节点") utils.FileLog.Info("准备同步数据节点") errMsgList := make([]string, 0) defer func() { fmt.Println("同步数据节点结束") if err != nil { tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) } if len(errMsgList) > 0 { tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n") utils.FileLog.Info(tips) } }() obj := new(models.KnowledgeResource) condition := " AND out_id > ? AND resource_type = ?" pars := []interface{}{0, models.KnowledgeResourceTypeOpinion} knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars) if err != nil && !utils.IsErrNoRow(err) { errMsgList = append(errMsgList, err.Error()) return } param := make(map[string]interface{}) param["pageSize"] = 1000 if !utils.IsErrNoRow(err) { now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local) if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) { param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime) } else { param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime) } } bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param) if err != nil { errMsgList = append(errMsgList, errMsg) errMsgList = append(errMsgList, err.Error()) return } var result models.EtaBridgeKnowledgeResourceResp if er := json.Unmarshal(bBody, &result); er != nil { errMsgList = append(errMsgList, er.Error()) err = er return } if result.Code != 200 { fmt.Println("eta_bridge同步数据节点失败") errMsgList = append(errMsgList, result.Msg) return } if result.Data.Code != "000000" { fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败") errMsgList = append(errMsgList, result.Data.Mesg) return } err = ViewPointBatchSave(result.Data.Data.Records) if err != nil { errMsgList = append(errMsgList, err.Error()) } return } func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) { obj := new(models.KnowledgeClassify) obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0) if err != nil { if !utils.IsErrNoRow(err) { return } // 创建新的分类 err = nil obj.ClassifyName = classifyName obj.ResourceType = resourceType obj.CreateTime = time.Now() obj.ModifyTime = time.Now() obj.Enabled = 1 obj.Level = 1 obj.Sort = 1 err = obj.Create() if err != nil { return } classifyId = obj.ClassifyId } else { classifyId = obj.ClassifyId } return } func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) { if len(records) == 0 { return } var outIds []int var outMap = make(map[int]models.MarketOverviewRecord) for _, record := range records { outIds = append(outIds, record.Id) outMap[record.Id] = record } obj := new(models.KnowledgeResource) resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion) if err != nil { return } for _, resource := range resourceList { record, ok := outMap[resource.OutId] if !ok { continue } var updateCols []string if resource.Title != record.Title { resource.Title = record.Title updateCols = append(updateCols, "title") } if resource.Content != record.Content { resource.Content = record.Content updateCols = append(updateCols, "content") } recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate) if er != nil { fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error()) utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error()) err = er continue } if !resource.StartTime.Equal(recordTime) { resource.StartTime = &recordTime updateCols = append(updateCols, "start_time") } if resource.SourceFrom != record.DataSource { resource.SourceFrom = record.DataSource updateCols = append(updateCols, "source_from") } var isDelete int if record.IsValidData == 0 { isDelete = 1 } if resource.IsDelete != isDelete { resource.IsDelete = isDelete updateCols = append(updateCols, "is_delete") } if len(updateCols) > 0 { resource.ModifyTime = time.Now() updateCols = append(updateCols, "modify_time") err = resource.Update(updateCols) if err != nil { return } go func() { err = EsAddOrEditKnowledgeResource(resource) if err != nil { utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error()) } }() } delete(outMap, resource.OutId) } var addList []*models.KnowledgeResource classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion) if er != nil { err = er return } for _, record := range outMap { recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate) if er != nil { fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error()) utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error()) continue } timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) var isDelete int if record.IsValidData == 0 { isDelete = 1 } addList = append(addList, &models.KnowledgeResource{ OutId: record.Id, Title: record.Title, Content: record.Content, State: models.KnowledgeResourceStateApproved, AdminRealName: "无", StartTime: &recordTime, SourceFrom: record.DataSource, ClassifyId: classifyId, IsDelete: isDelete, ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp), ResourceType: models.KnowledgeResourceTypeOpinion, CreateTime: time.Now(), ModifyTime: time.Now(), }) } err = obj.BatchCreate(addList) if err != nil { return } go func() { e := EsBatchAddOrEditKnowledgeResource(addList) if e != nil { utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error()) } }() return }