knowledge.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models"
  6. "eta_gn/eta_task/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var lockSyncDataNode sync.Mutex
  14. func SyncDataNode(cont context.Context) (err error) {
  15. lockSyncDataNode.Lock()
  16. defer lockSyncDataNode.Unlock()
  17. syncDataNodeExecute()
  18. return
  19. }
  20. func syncDataNodeExecute() (err error) {
  21. fmt.Println("准备同步数据节点")
  22. utils.FileLog.Info("准备同步数据节点")
  23. errMsgList := make([]string, 0)
  24. defer func() {
  25. fmt.Println("同步数据节点结束")
  26. if err != nil {
  27. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
  28. utils.FileLog.Info(tips)
  29. }
  30. if len(errMsgList) > 0 {
  31. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  32. utils.FileLog.Info(tips)
  33. }
  34. }()
  35. obj := new(models.KnowledgeResource)
  36. condition := " AND out_id > ? AND resource_type = ?"
  37. pars := []interface{}{0, models.KnowledgeResourceTypeOpinion}
  38. knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars)
  39. if err != nil && !utils.IsErrNoRow(err) {
  40. errMsgList = append(errMsgList, err.Error())
  41. return
  42. }
  43. param := make(map[string]interface{})
  44. param["pageSize"] = 1000
  45. if !utils.IsErrNoRow(err) {
  46. now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
  47. if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) {
  48. param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime)
  49. } else {
  50. param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime)
  51. }
  52. }
  53. bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param)
  54. if err != nil {
  55. errMsgList = append(errMsgList, errMsg)
  56. errMsgList = append(errMsgList, err.Error())
  57. return
  58. }
  59. var result models.EtaBridgeKnowledgeResourceResp
  60. if er := json.Unmarshal(bBody, &result); er != nil {
  61. errMsgList = append(errMsgList, er.Error())
  62. err = er
  63. return
  64. }
  65. if result.Code != 200 {
  66. fmt.Println("eta_bridge同步数据节点失败")
  67. errMsgList = append(errMsgList, result.Msg)
  68. return
  69. }
  70. if result.Data.Code != "000000" {
  71. fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败")
  72. errMsgList = append(errMsgList, result.Data.Mesg)
  73. return
  74. }
  75. err = ViewPointBatchSave(result.Data.Data.Records)
  76. if err != nil {
  77. errMsgList = append(errMsgList, err.Error())
  78. }
  79. return
  80. }
  81. func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) {
  82. obj := new(models.KnowledgeClassify)
  83. obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0)
  84. if err != nil {
  85. if !utils.IsErrNoRow(err) {
  86. return
  87. }
  88. // 创建新的分类
  89. err = nil
  90. obj.ClassifyName = classifyName
  91. obj.ResourceType = resourceType
  92. obj.CreateTime = time.Now()
  93. obj.ModifyTime = time.Now()
  94. obj.Enabled = 1
  95. obj.Level = 1
  96. obj.Sort = 1
  97. err = obj.Create()
  98. if err != nil {
  99. return
  100. }
  101. classifyId = obj.ClassifyId
  102. } else {
  103. classifyId = obj.ClassifyId
  104. }
  105. return
  106. }
  107. func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) {
  108. if len(records) == 0 {
  109. return
  110. }
  111. var outIds []int
  112. var outMap = make(map[int]models.MarketOverviewRecord)
  113. for _, record := range records {
  114. outIds = append(outIds, record.Id)
  115. outMap[record.Id] = record
  116. }
  117. obj := new(models.KnowledgeResource)
  118. resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion)
  119. if err != nil {
  120. return
  121. }
  122. for _, resource := range resourceList {
  123. record, ok := outMap[resource.OutId]
  124. if !ok {
  125. continue
  126. }
  127. var updateCols []string
  128. if resource.Title != record.Title {
  129. resource.Title = record.Title
  130. updateCols = append(updateCols, "title")
  131. }
  132. if resource.Content != record.Content {
  133. resource.Content = record.Content
  134. updateCols = append(updateCols, "content")
  135. }
  136. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  137. if er != nil {
  138. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  139. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  140. err = er
  141. continue
  142. }
  143. if !resource.StartTime.Equal(recordTime) {
  144. resource.StartTime = &recordTime
  145. updateCols = append(updateCols, "start_time")
  146. }
  147. if resource.SourceFrom != record.DataSource {
  148. resource.SourceFrom = record.DataSource
  149. updateCols = append(updateCols, "source_from")
  150. }
  151. var isDelete int
  152. if record.IsValidData == 0 {
  153. isDelete = 1
  154. }
  155. if resource.IsDelete != isDelete {
  156. resource.IsDelete = isDelete
  157. updateCols = append(updateCols, "is_delete")
  158. }
  159. if len(updateCols) > 0 {
  160. resource.ModifyTime = time.Now()
  161. updateCols = append(updateCols, "modify_time")
  162. err = resource.Update(updateCols)
  163. if err != nil {
  164. return
  165. }
  166. go func() {
  167. err = EsAddOrEditKnowledgeResource(resource)
  168. if err != nil {
  169. utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error())
  170. }
  171. }()
  172. }
  173. delete(outMap, resource.OutId)
  174. }
  175. var addList []*models.KnowledgeResource
  176. classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion)
  177. if er != nil {
  178. err = er
  179. return
  180. }
  181. for _, record := range outMap {
  182. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  183. if er != nil {
  184. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  185. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  186. continue
  187. }
  188. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  189. var isDelete int
  190. if record.IsValidData == 0 {
  191. isDelete = 1
  192. }
  193. addList = append(addList, &models.KnowledgeResource{
  194. OutId: record.Id,
  195. Title: record.Title,
  196. Content: record.Content,
  197. State: models.KnowledgeResourceStateApproved,
  198. AdminRealName: "无",
  199. StartTime: &recordTime,
  200. SourceFrom: record.DataSource,
  201. ClassifyId: classifyId,
  202. IsDelete: isDelete,
  203. ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp),
  204. ResourceType: models.KnowledgeResourceTypeOpinion,
  205. CreateTime: time.Now(),
  206. ModifyTime: time.Now(),
  207. })
  208. }
  209. err = obj.BatchCreate(addList)
  210. if err != nil {
  211. return
  212. }
  213. go func() {
  214. e := EsBatchAddOrEditKnowledgeResource(addList)
  215. if e != nil {
  216. utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error())
  217. }
  218. }()
  219. return
  220. }