knowledge.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models"
  6. "eta_gn/eta_task/utils"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var lockSyncDataNode sync.Mutex
  14. func SyncDataNode(cont context.Context) (err error) {
  15. ctx, cancel := context.WithTimeout(cont, 5*time.Minute)
  16. defer cancel()
  17. if err = lockWithTimeout(ctx); err != nil {
  18. fmt.Println("获取锁失败, err", err)
  19. utils.FileLog.Info("SyncDataNode-获取锁失败, err:%s", err.Error())
  20. return
  21. }
  22. defer lockSyncDataNode.Unlock()
  23. syncDataNodeExecute()
  24. return
  25. }
  26. func lockWithTimeout(ctx context.Context) error {
  27. done := make(chan struct{})
  28. go func() {
  29. lockSyncDataNode.Lock()
  30. close(done)
  31. }()
  32. select {
  33. case <-done:
  34. return nil
  35. case <-ctx.Done():
  36. return fmt.Errorf("lockWithTimeout: %v", ctx.Err())
  37. }
  38. }
  39. func syncDataNodeExecute() (err error) {
  40. fmt.Println("准备同步数据节点")
  41. utils.FileLog.Info("准备同步数据节点")
  42. errMsgList := make([]string, 0)
  43. defer func() {
  44. fmt.Println("同步数据节点结束")
  45. if err != nil {
  46. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + err.Error()
  47. utils.FileLog.Info(tips)
  48. }
  49. if len(errMsgList) > 0 {
  50. tips := "SyncDataNode-同步数据节点到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  51. utils.FileLog.Info(tips)
  52. }
  53. }()
  54. obj := new(models.KnowledgeResource)
  55. condition := " AND out_id > ? AND resource_type = ?"
  56. pars := []interface{}{0, models.KnowledgeResourceTypeOpinion}
  57. knowledge, err := obj.GetMaxTimeKnowledgeByCondition(condition, pars)
  58. if err != nil && !utils.IsErrNoRow(err) {
  59. errMsgList = append(errMsgList, err.Error())
  60. return
  61. }
  62. param := make(map[string]interface{})
  63. param["pageSize"] = 1000
  64. if !utils.IsErrNoRow(err) {
  65. now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
  66. if knowledge.ModifyTime.Before(now.AddDate(0, 0, -1)) {
  67. param["UPDATE_TIME"] = knowledge.ModifyTime.Format(utils.FormatDateTime)
  68. } else {
  69. param["UPDATE_TIME"] = now.AddDate(0, 0, -1).Format(utils.FormatDateTime)
  70. }
  71. }
  72. bBody, err, errMsg := HttpEtaBridgePost("/knowledge/viewpoint/acquire", param)
  73. if err != nil {
  74. errMsgList = append(errMsgList, errMsg)
  75. errMsgList = append(errMsgList, err.Error())
  76. return
  77. }
  78. var result models.EtaBridgeKnowledgeResourceResp
  79. if er := json.Unmarshal(bBody, &result); er != nil {
  80. errMsgList = append(errMsgList, er.Error())
  81. err = er
  82. return
  83. }
  84. if result.Code != 200 {
  85. fmt.Println("eta_bridge同步数据节点失败")
  86. errMsgList = append(errMsgList, result.Msg)
  87. return
  88. }
  89. if result.Data.Code != "000000" {
  90. fmt.Println("eta_bridge请求数据中心-市场简况接口同步数据节点失败")
  91. errMsgList = append(errMsgList, result.Data.Mesg)
  92. return
  93. }
  94. err = ViewPointBatchSave(result.Data.Data.Records)
  95. if err != nil {
  96. errMsgList = append(errMsgList, err.Error())
  97. }
  98. return
  99. }
  100. func ViewPointKnowledgeResourceClassifyCheckAndSave(classifyName string, resourceType int) (classifyId int, err error) {
  101. obj := new(models.KnowledgeClassify)
  102. obj, err = obj.GetClassifyByNameTypeAndParentId(classifyName, resourceType, 0)
  103. if err != nil {
  104. if !utils.IsErrNoRow(err) {
  105. return
  106. }
  107. // 创建新的分类
  108. err = nil
  109. obj.ClassifyName = classifyName
  110. obj.ResourceType = resourceType
  111. obj.CreateTime = time.Now()
  112. obj.ModifyTime = time.Now()
  113. obj.Enabled = 1
  114. obj.Level = 1
  115. obj.Sort = 1
  116. err = obj.Create()
  117. if err != nil {
  118. return
  119. }
  120. classifyId = obj.ClassifyId
  121. } else {
  122. classifyId = obj.ClassifyId
  123. }
  124. return
  125. }
  126. func ViewPointBatchSave(records []models.MarketOverviewRecord) (err error) {
  127. if len(records) == 0 {
  128. return
  129. }
  130. var outIds []int
  131. var outMap = make(map[int]models.MarketOverviewRecord)
  132. for _, record := range records {
  133. outIds = append(outIds, record.Id)
  134. outMap[record.Id] = record
  135. }
  136. obj := new(models.KnowledgeResource)
  137. resourceList, err := obj.GetBatchKnowledgeResourceByOutIds(outIds, models.KnowledgeResourceTypeOpinion)
  138. if err != nil {
  139. return
  140. }
  141. for _, resource := range resourceList {
  142. record, ok := outMap[resource.OutId]
  143. if !ok {
  144. continue
  145. }
  146. var updateCols []string
  147. if resource.Title != record.Title {
  148. resource.Title = record.Title
  149. updateCols = append(updateCols, "title")
  150. }
  151. if resource.Content != record.Content {
  152. resource.Content = record.Content
  153. updateCols = append(updateCols, "content")
  154. }
  155. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  156. if er != nil {
  157. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  158. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  159. err = er
  160. continue
  161. }
  162. if !resource.StartTime.Equal(recordTime) {
  163. resource.StartTime = &recordTime
  164. updateCols = append(updateCols, "start_time")
  165. }
  166. if resource.SourceFrom != record.DataSource {
  167. resource.SourceFrom = record.DataSource
  168. updateCols = append(updateCols, "source_from")
  169. }
  170. var isDelete int
  171. if record.IsValidData == 0 {
  172. isDelete = 1
  173. }
  174. if resource.IsDelete != isDelete {
  175. resource.IsDelete = isDelete
  176. updateCols = append(updateCols, "is_delete")
  177. }
  178. if len(updateCols) > 0 {
  179. resource.ModifyTime = time.Now()
  180. updateCols = append(updateCols, "modify_time")
  181. err = resource.Update(updateCols)
  182. if err != nil {
  183. return
  184. }
  185. go func() {
  186. err = EsAddOrEditKnowledgeResource(resource)
  187. if err != nil {
  188. utils.FileLog.Info("EsAddOrEditKnowledgeResource-同步es失败, err:%s", err.Error())
  189. }
  190. }()
  191. }
  192. delete(outMap, resource.OutId)
  193. }
  194. var addList []*models.KnowledgeResource
  195. classifyId, er := ViewPointKnowledgeResourceClassifyCheckAndSave(records[0].Category, models.KnowledgeResourceTypeOpinion)
  196. if er != nil {
  197. err = er
  198. return
  199. }
  200. for _, record := range outMap {
  201. recordTime, er := time.Parse(utils.FormatDateTime, record.DataDate)
  202. if er != nil {
  203. fmt.Println("ViewPointBatchSave-时间转换失败,err :" + er.Error())
  204. utils.FileLog.Info("ViewPointBatchSave-时间转换失败, record:%v ,err:%s", record, er.Error())
  205. continue
  206. }
  207. timestamp := strconv.FormatInt(time.Now().UnixNano(), 10)
  208. var isDelete int
  209. if record.IsValidData == 0 {
  210. isDelete = 1
  211. }
  212. addList = append(addList, &models.KnowledgeResource{
  213. OutId: record.Id,
  214. Title: record.Title,
  215. Content: record.Content,
  216. State: models.KnowledgeResourceStateApproved,
  217. AdminRealName: "无",
  218. StartTime: &recordTime,
  219. SourceFrom: record.DataSource,
  220. ClassifyId: classifyId,
  221. IsDelete: isDelete,
  222. ResourceCode: utils.MD5(utils.CHART_PREFIX + "_" + timestamp),
  223. ResourceType: models.KnowledgeResourceTypeOpinion,
  224. CreateTime: time.Now(),
  225. ModifyTime: time.Now(),
  226. })
  227. }
  228. err = obj.BatchCreate(addList)
  229. if err != nil {
  230. return
  231. }
  232. go func() {
  233. e := EsBatchAddOrEditKnowledgeResource(addList)
  234. if e != nil {
  235. utils.FileLog.Info("ViewPointBatchSave-同步es失败, err:%s", e.Error())
  236. }
  237. }()
  238. return
  239. }