knowledge.go 6.9 KB


  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models"
  6. "eta_gn/eta_task/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var lockSyncDataNode sync.Mutex
  14. func SyncDataNode(cont context.Context) (err error) {
  15. lockSyncDataNode.Lock()
  16. defer lockSyncDataNode.Unlock()
  17. syncDataNodeExecute()
  18. return
  19. }
  20. func syncDataNodeExecute() (err error) {
  21. fmt.Println("准备同步数据节点")
  22. utils.FileLog.Info("准备同步数据节点")
  23. errMsgList := make([]string, 0)
  24. defer func() {
  25. fmt.Println("同步数据节点结束")
  26. if err != nil {
  27. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
  28. utils.FileLog.Info(tips)
  29. }
  30. if len(errMsgList) > 0 {
  31. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  32. utils.FileLog.Info(tips)
  33. }
  34. }()
  35. obj := new(models.KnowledgeResource)
  36. condition := " AND out_id > ? AND resource_type = ?"
  37. pars := []interface{}{0, models.KnowledgeResourceTypeOpinion}
  38. knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars)
  39. if err != nil && !utils.IsErrNoRow(err) {
  40. errMsgList = append(errMsgList, err.Error())
  41. return
  42. }
  43. param := make(map[string]interface{})
  44. param["pageSize"] = 1000
  45. if !utils.IsErrNoRow(err) {
  46. // UPDATE_TIME格式:2024-02-21 12:05:23.910000,2025-02-21 12:05:23.910000, 结束时间始终取time.Now
  47. timeFormat := "2006-01-02 15:04:05.000000"
  48. strEndTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 23, 59, 59, 0, time.Local).Format(timeFormat)
  49. now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
  50. if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) {
  51. //param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime)
  52. param["UPDATE_TIME"] = fmt.Sprintf("%s,%s", knowledge.ModifyTime.Format(timeFormat), strEndTime)
  53. } else {
  54. //param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime)
  55. strBeginTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local).Format(timeFormat)
  56. param["UPDATE_TIME"] = fmt.Sprintf("%s,%s", strBeginTime, strEndTime)
  57. }
  58. }
  59. bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param)
  60. if err != nil {
  61. errMsgList = append(errMsgList, errMsg)
  62. errMsgList = append(errMsgList, err.Error())
  63. return
  64. }
  65. var result models.EtaBridgeKnowledgeResourceResp
  66. if er := json.Unmarshal(bBody, &result); er != nil {
  67. errMsgList = append(errMsgList, er.Error())
  68. err = er
  69. return
  70. }
  71. if result.Code != 200 {
  72. fmt.Println("eta_bridge同步数据节点失败")
  73. errMsgList = append(errMsgList, result.Msg)
  74. return
  75. }
  76. if result.Data.Code != "000000" {
  77. fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败")
  78. errMsgList = append(errMsgList, result.Data.Mesg)
  79. return
  80. }
  81. err = ViewPointBatchSave(result.Data.Data.Records)
  82. if err != nil {
  83. errMsgList = append(errMsgList, err.Error())
  84. }
  85. return
  86. }
  87. func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) {
  88. obj := new(models.KnowledgeClassify)
  89. obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0)
  90. if err != nil {
  91. if !utils.IsErrNoRow(err) {
  92. return
  93. }
  94. // 创建新的分类
  95. err = nil
  96. obj.ClassifyName = classifyName
  97. obj.ResourceType = resourceType
  98. obj.CreateTime = time.Now()
  99. obj.ModifyTime = time.Now()
  100. obj.Enabled = 1
  101. obj.Level = 1
  102. obj.Sort = 1
  103. err = obj.Create()
  104. if err != nil {
  105. return
  106. }
  107. classifyId = obj.ClassifyId
  108. } else {
  109. classifyId = obj.ClassifyId
  110. }
  111. return
  112. }
  113. func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) {
  114. if len(records) == 0 {
  115. return
  116. }
  117. var outIds []int
  118. var outMap = make(map[int]models.MarketOverviewRecord)
  119. for _, record := range records {
  120. outIds = append(outIds, record.Id)
  121. outMap[record.Id] = record
  122. }
  123. obj := new(models.KnowledgeResource)
  124. resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion)
  125. if err != nil {
  126. return
  127. }
  128. for _, resource := range resourceList {
  129. record, ok := outMap[resource.OutId]
  130. if !ok {
  131. continue
  132. }
  133. var updateCols []string
  134. if resource.Title != record.Title {
  135. resource.Title = record.Title
  136. updateCols = append(updateCols, "title")
  137. }
  138. if resource.Content != record.Content {
  139. resource.Content = record.Content
  140. updateCols = append(updateCols, "content")
  141. }
  142. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  143. if er != nil {
  144. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  145. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  146. err = er
  147. continue
  148. }
  149. if !resource.StartTime.Equal(recordTime) {
  150. resource.StartTime = &recordTime
  151. updateCols = append(updateCols, "start_time")
  152. }
  153. if resource.SourceFrom != record.DataSource {
  154. resource.SourceFrom = record.DataSource
  155. updateCols = append(updateCols, "source_from")
  156. }
  157. var isDelete int
  158. if record.IsValidData == 0 {
  159. isDelete = 1
  160. }
  161. if resource.IsDelete != isDelete {
  162. resource.IsDelete = isDelete
  163. updateCols = append(updateCols, "is_delete")
  164. }
  165. if len(updateCols) > 0 {
  166. resource.ModifyTime = time.Now()
  167. updateCols = append(updateCols, "modify_time")
  168. err = resource.Update(updateCols)
  169. if err != nil {
  170. return
  171. }
  172. go func() {
  173. err = EsAddOrEditKnowledgeResource(resource)
  174. if err != nil {
  175. utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error())
  176. }
  177. }()
  178. }
  179. delete(outMap, resource.OutId)
  180. }
  181. var addList []*models.KnowledgeResource
  182. classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion)
  183. if er != nil {
  184. err = er
  185. return
  186. }
  187. for _, record := range outMap {
  188. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  189. if er != nil {
  190. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  191. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  192. continue
  193. }
  194. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  195. var isDelete int
  196. if record.IsValidData == 0 {
  197. isDelete = 1
  198. }
  199. addList = append(addList, &models.KnowledgeResource{
  200. OutId: record.Id,
  201. Title: record.Title,
  202. Content: record.Content,
  203. State: models.KnowledgeResourceStateApproved,
  204. AdminRealName: "无",
  205. StartTime: &recordTime,
  206. SourceFrom: record.DataSource,
  207. ClassifyId: classifyId,
  208. IsDelete: isDelete,
  209. ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp),
  210. ResourceType: models.KnowledgeResourceTypeOpinion,
  211. CreateTime: time.Now(),
  212. ModifyTime: time.Now(),
  213. })
  214. }
  215. err = obj.BatchCreate(addList)
  216. if err != nil {
  217. return
  218. }
  219. go func() {
  220. e := EsBatchAddOrEditKnowledgeResource(addList)
  221. if e != nil {
  222. utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error())
  223. }
  224. }()
  225. return
  226. }