|
@@ -0,0 +1,227 @@
|
|
|
+package eta_bridge
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "encoding/json"
|
|
|
+ "eta_gn/eta_task/models"
|
|
|
+ "eta_gn/eta_task/utils"
|
|
|
+ "fmt"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+var lockSyncDataNode sync.Mutex
|
|
|
+
|
|
|
+func SyncDataNode(cont context.Context) (err error) {
|
|
|
+ lockSyncDataNode.Lock()
|
|
|
+ defer lockSyncDataNode.Unlock()
|
|
|
+ syncDataNodeExecute()
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func syncDataNodeExecute() (err error) {
|
|
|
+ fmt.Println("准备同步数据节点")
|
|
|
+ utils.FileLog.Info("准备同步数据节点")
|
|
|
+ errMsgList := make([]string, 0)
|
|
|
+ defer func() {
|
|
|
+ fmt.Println("同步数据节点结束")
|
|
|
+ if err != nil {
|
|
|
+ tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
|
|
|
+ utils.FileLog.Info(tips)
|
|
|
+ }
|
|
|
+ if len(errMsgList) > 0 {
|
|
|
+ tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
|
|
|
+ utils.FileLog.Info(tips)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ obj := new(models.KnowledgeResource)
|
|
|
+ condition := " AND out_id > ? AND resource_type = ?"
|
|
|
+ pars := []interface{}{0, models.KnowledgeResourceTypeOpinion}
|
|
|
+ knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars)
|
|
|
+ if err != nil && !utils.IsErrNoRow(err) {
|
|
|
+ errMsgList = append(errMsgList, err.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ param := make(map[string]interface{})
|
|
|
+ param["pageSize"] = 1000
|
|
|
+ if !utils.IsErrNoRow(err) {
|
|
|
+ now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) {
|
|
|
+ param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime)
|
|
|
+ } else {
|
|
|
+ param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param)
|
|
|
+ if err != nil {
|
|
|
+ errMsgList = append(errMsgList, errMsg)
|
|
|
+ errMsgList = append(errMsgList, err.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var result models.EtaBridgeKnowledgeResourceResp
|
|
|
+ if er := json.Unmarshal(bBody, &result); er != nil {
|
|
|
+ errMsgList = append(errMsgList, er.Error())
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if result.Code != 200 {
|
|
|
+ fmt.Println("eta_bridge同步数据节点失败")
|
|
|
+ errMsgList = append(errMsgList, result.Msg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if result.Data.Code != "000000" {
|
|
|
+ fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败")
|
|
|
+ errMsgList = append(errMsgList, result.Data.Mesg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err = ViewPointBatchSave(result.Data.Data.Records)
|
|
|
+ if err != nil {
|
|
|
+ errMsgList = append(errMsgList, err.Error())
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) {
|
|
|
+ obj := new(models.KnowledgeClassify)
|
|
|
+ obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0)
|
|
|
+ if err != nil {
|
|
|
+ if !utils.IsErrNoRow(err) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 创建新的分类
|
|
|
+ err = nil
|
|
|
+ obj.ClassifyName = classifyName
|
|
|
+ obj.ResourceType = resourceType
|
|
|
+ obj.CreateTime = time.Now()
|
|
|
+ obj.ModifyTime = time.Now()
|
|
|
+ obj.Enabled = 1
|
|
|
+ obj.Level = 1
|
|
|
+ obj.Sort = 1
|
|
|
+ err = obj.Create()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ classifyId = obj.ClassifyId
|
|
|
+ } else {
|
|
|
+ classifyId = obj.ClassifyId
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) {
|
|
|
+ if len(records) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var outIds []int
|
|
|
+ var outMap = make(map[int]models.MarketOverviewRecord)
|
|
|
+ for _, record := range records {
|
|
|
+ outIds = append(outIds, record.Id)
|
|
|
+ outMap[record.Id] = record
|
|
|
+ }
|
|
|
+ obj := new(models.KnowledgeResource)
|
|
|
+ resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, resource := range resourceList {
|
|
|
+ record, ok := outMap[resource.OutId]
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ var updateCols []string
|
|
|
+ if resource.Title != record.Title {
|
|
|
+ resource.Title = record.Title
|
|
|
+ updateCols = append(updateCols, "title")
|
|
|
+ }
|
|
|
+ if resource.Content != record.Content {
|
|
|
+ resource.Content = record.Content
|
|
|
+ updateCols = append(updateCols, "content")
|
|
|
+ }
|
|
|
+ recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
|
|
|
+ if er != nil {
|
|
|
+ fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
|
|
|
+ utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
|
|
|
+ err = er
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if !resource.StartTime.Equal(recordTime) {
|
|
|
+ resource.StartTime = &recordTime
|
|
|
+ updateCols = append(updateCols, "start_time")
|
|
|
+ }
|
|
|
+ if resource.SourceFrom != record.DataSource {
|
|
|
+ resource.SourceFrom = record.DataSource
|
|
|
+ updateCols = append(updateCols, "source_from")
|
|
|
+ }
|
|
|
+ var isDelete int
|
|
|
+ if record.IsValidData == 0 {
|
|
|
+ isDelete = 1
|
|
|
+ }
|
|
|
+ if resource.IsDelete != isDelete {
|
|
|
+ resource.IsDelete = isDelete
|
|
|
+ updateCols = append(updateCols, "is_delete")
|
|
|
+ }
|
|
|
+ if len(updateCols) > 0 {
|
|
|
+ resource.ModifyTime = time.Now()
|
|
|
+ updateCols = append(updateCols, "modify_time")
|
|
|
+ err = resource.Update(updateCols)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ go func() {
|
|
|
+ err = EsAddOrEditKnowledgeResource(resource)
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ delete(outMap, resource.OutId)
|
|
|
+ }
|
|
|
+ var addList []*models.KnowledgeResource
|
|
|
+ classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion)
|
|
|
+ if er != nil {
|
|
|
+ err = er
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, record := range outMap {
|
|
|
+ recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
|
|
|
+ if er != nil {
|
|
|
+ fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
|
|
|
+ utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
|
|
|
+ var isDelete int
|
|
|
+ if record.IsValidData == 0 {
|
|
|
+ isDelete = 1
|
|
|
+ }
|
|
|
+ addList = append(addList, &models.KnowledgeResource{
|
|
|
+ OutId: record.Id,
|
|
|
+ Title: record.Title,
|
|
|
+ Content: record.Content,
|
|
|
+ State: models.KnowledgeResourceStateApproved,
|
|
|
+ AdminRealName: "无",
|
|
|
+ StartTime: &recordTime,
|
|
|
+ SourceFrom: record.DataSource,
|
|
|
+ ClassifyId: classifyId,
|
|
|
+ IsDelete: isDelete,
|
|
|
+ ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp),
|
|
|
+ ResourceType: models.KnowledgeResourceTypeOpinion,
|
|
|
+ CreateTime: time.Now(),
|
|
|
+ ModifyTime: time.Now(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ err = obj.BatchCreate(addList)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ go func() {
|
|
|
+ e := EsBatchAddOrEditKnowledgeResource(addList)
|
|
|
+ if e != nil {
|
|
|
+ utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ return
|
|
|
+}
|