knowledge.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models"
  6. "eta_gn/eta_task/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var lockSyncDataNode sync.Mutex
  14. func SyncDataNode(cont context.Context) (err error) {
  15. fmt.Println("准备同步数据节点")
  16. lockSyncDataNode.Lock()
  17. utils.FileLog.Info("准备同步数据节点")
  18. errMsgList := make([]string, 0)
  19. defer func() {
  20. lockSyncDataNode.Unlock()
  21. fmt.Println("同步数据节点结束")
  22. if err != nil {
  23. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
  24. utils.FileLog.Info(tips)
  25. }
  26. if len(errMsgList) > 0 {
  27. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  28. utils.FileLog.Info(tips)
  29. }
  30. }()
  31. obj := new(models.KnowledgeResource)
  32. condition := " AND out_id > ? AND resource_type = ?"
  33. pars := []interface{}{0, models.KnowledgeResourceTypeOpinion}
  34. knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars)
  35. if err != nil && !utils.IsErrNoRow(err) {
  36. errMsgList = append(errMsgList, err.Error())
  37. return
  38. }
  39. param := make(map[string]interface{})
  40. param["pageSize"] = 1000
  41. if !utils.IsErrNoRow(err) {
  42. now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
  43. if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) {
  44. param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime)
  45. } else {
  46. param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime)
  47. }
  48. }
  49. bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/save", param)
  50. if err != nil {
  51. errMsgList = append(errMsgList, errMsg)
  52. errMsgList = append(errMsgList, err.Error())
  53. return
  54. }
  55. var result models.EtaBridgeKnowledgeResourceResp
  56. if err := json.Unmarshal(bBody, &result); err != nil {
  57. errMsgList = append(errMsgList, err.Error())
  58. }
  59. if result.Code != 200 {
  60. fmt.Println("eta_bridge同步数据节点失败")
  61. errMsgList = append(errMsgList, result.Msg)
  62. }
  63. if result.Data.Code != "000000" {
  64. fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败")
  65. errMsgList = append(errMsgList, result.Data.Msg)
  66. }
  67. err = ViewPointBatchSave(result.Data.Data.Records)
  68. if err != nil {
  69. errMsgList = append(errMsgList, err.Error())
  70. }
  71. return
  72. }
  73. func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) {
  74. obj := new(models.KnowledgeClassify)
  75. obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0)
  76. if err != nil {
  77. if !utils.IsErrNoRow(err) {
  78. return
  79. }
  80. // 创建新的分类
  81. err = nil
  82. obj.ClassifyName = classifyName
  83. obj.ResourceType = resourceType
  84. obj.CreateTime = time.Now()
  85. obj.ModifyTime = time.Now()
  86. obj.Enabled = 1
  87. obj.Level = 1
  88. obj.Sort = 1
  89. err = obj.Create()
  90. if err != nil {
  91. return
  92. }
  93. classifyId = obj.ClassifyId
  94. } else {
  95. classifyId = obj.ClassifyId
  96. }
  97. return
  98. }
  99. func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) {
  100. if len(records) == 0 {
  101. return
  102. }
  103. var outIds []int
  104. var outMap = make(map[int]models.MarketOverviewRecord)
  105. for _, record := range records {
  106. outIds = append(outIds, record.Id)
  107. outMap[record.Id] = record
  108. }
  109. obj := new(models.KnowledgeResource)
  110. resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion)
  111. if err != nil {
  112. return
  113. }
  114. for _, resource := range resourceList {
  115. record, ok := outMap[resource.OutId]
  116. if !ok {
  117. continue
  118. }
  119. var updateCols []string
  120. if resource.Title != record.Title {
  121. resource.Title = record.Title
  122. updateCols = append(updateCols, "title")
  123. }
  124. if resource.Content != record.Content {
  125. resource.Content = record.Content
  126. updateCols = append(updateCols, "content")
  127. }
  128. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  129. if er != nil {
  130. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  131. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  132. err = er
  133. continue
  134. }
  135. if !resource.StartTime.Equal(recordTime) {
  136. resource.StartTime = &recordTime
  137. updateCols = append(updateCols, "start_time")
  138. }
  139. if resource.SourceFrom != record.DataSource {
  140. resource.SourceFrom = record.DataSource
  141. updateCols = append(updateCols, "source_from")
  142. }
  143. var isDelete int
  144. if record.IsValidData == 0 {
  145. isDelete = 1
  146. }
  147. if resource.IsDelete != isDelete {
  148. resource.IsDelete = isDelete
  149. updateCols = append(updateCols, "is_delete")
  150. }
  151. if len(updateCols) > 0 {
  152. resource.ModifyTime = time.Now()
  153. updateCols = append(updateCols, "modify_time")
  154. err = resource.Update(updateCols)
  155. if err != nil {
  156. return
  157. }
  158. go func() {
  159. err = EsAddOrEditKnowledgeResource(resource)
  160. if err != nil {
  161. utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error())
  162. }
  163. }()
  164. }
  165. delete(outMap, resource.OutId)
  166. }
  167. var addList []*models.KnowledgeResource
  168. classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion)
  169. if er != nil {
  170. err = er
  171. return
  172. }
  173. for _, record := range outMap {
  174. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  175. if er != nil {
  176. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  177. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  178. continue
  179. }
  180. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  181. var isDelete int
  182. if record.IsValidData == 0 {
  183. isDelete = 1
  184. }
  185. addList = append(addList, &models.KnowledgeResource{
  186. OutId: record.Id,
  187. Title: record.Title,
  188. Content: record.Content,
  189. State: models.KnowledgeResourceStateApproved,
  190. AdminRealName: "无",
  191. StartTime: &recordTime,
  192. SourceFrom: record.DataSource,
  193. ClassifyId: classifyId,
  194. IsDelete: isDelete,
  195. ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp),
  196. ResourceType: models.KnowledgeResourceTypeOpinion,
  197. CreateTime: time.Now(),
  198. ModifyTime: time.Now(),
  199. })
  200. }
  201. err = obj.BatchCreate(addList)
  202. if err != nil {
  203. return
  204. }
  205. go func() {
  206. e := EsBatchAddOrEditKnowledgeResource(addList)
  207. if e != nil {
  208. utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error())
  209. }
  210. }()
  211. return
  212. }