data_source.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package data_source
  2. import (
  3. "eta/eta_api/models"
  4. "eta/eta_api/models/data_manage"
  5. dataSourceModel "eta/eta_api/models/data_source"
  6. "eta/eta_api/services/elastic"
  7. "eta/eta_api/utils"
  8. "fmt"
  9. "github.com/rdlucklib/rdluck_tools/paging"
  10. )
  11. // SearchByEs
  12. // @Title ES搜索
  13. // @Description ES搜索
  14. // @Param PageSize query int true "每页数据条数"
  15. // @Param CurrentIndex query int true "当前页页码,从1开始"
  16. // @Param Keyword query string true "搜索关键词"
  17. // @Param Source query int true "数据源"
  18. // @Success 200 {object} response.ExcelListResp
  19. // @router /common/search_by_es [get]
  20. func (c *DataSourceController) SearchByEs() {
  21. br := new(models.BaseResponse).Init()
  22. defer func() {
  23. if br.ErrMsg == "" {
  24. br.IsSendEmail = false
  25. }
  26. c.Data["json"] = br
  27. c.ServeJSON()
  28. }()
  29. sysUser := c.SysUser
  30. if sysUser == nil {
  31. br.Msg = "请登录"
  32. br.ErrMsg = "请登录,SysUser Is Empty"
  33. br.Ret = 408
  34. return
  35. }
  36. pageSize, _ := c.GetInt("PageSize")
  37. currentIndex, _ := c.GetInt("CurrentIndex")
  38. keyword := c.GetString("KeyWord")
  39. if keyword == `` {
  40. keyword = c.GetString("Keyword")
  41. }
  42. source, _ := c.GetInt("Source")
  43. if source <= 0 {
  44. br.Msg = "来源有误"
  45. br.ErrMsg = fmt.Sprintf("数据来源有误, Source: %d", source)
  46. return
  47. }
  48. subSource, _ := c.GetInt("SubSource")
  49. // 以下为兼容各旧接口的额外传参,不为空时修改Resp对应的Key
  50. primaryIdKey := c.GetString("PrimaryIdKey")
  51. indexNameKey := c.GetString("IndexNameKey")
  52. classifyIdKey := c.GetString("ClassifyIdKey")
  53. var total, startSize int
  54. if pageSize <= 0 {
  55. pageSize = utils.PageSize15
  56. }
  57. if currentIndex <= 0 {
  58. currentIndex = 1
  59. }
  60. startSize = paging.StartIndex(currentIndex, pageSize)
  61. // es搜索
  62. t, list, e := elastic.SearchDataSourceIndex(utils.EsDataSourceIndexName, keyword, source, subSource, []int{}, []int{}, []string{}, startSize, pageSize)
  63. if e != nil {
  64. br.Msg = "获取失败"
  65. br.ErrMsg = fmt.Sprintf("ES-搜索数据源列表失败, %v", e)
  66. return
  67. }
  68. total = int(t)
  69. // (短期方案)中国煤炭市场网/煤炭江湖额外补充基础字段
  70. indexModifyTime := make(map[string]string)
  71. indexBaseInfo := make(map[string]*dataSourceModel.BaseFromCoalmineIndexBase)
  72. {
  73. if source == utils.DATA_SOURCE_MTJH {
  74. var updateCodes []string
  75. for _, v := range list {
  76. if v.ModifyTime == "" {
  77. updateCodes = append(updateCodes, v.IndexCode)
  78. }
  79. }
  80. if len(updateCodes) > 0 {
  81. baseIndexes, e := dataSourceModel.GetMtjhBaseInfoFromDataTable(updateCodes)
  82. if e != nil {
  83. br.Msg = "获取失败"
  84. br.ErrMsg = fmt.Sprintf("获取煤炭江湖指标基础信息失败, %v", e)
  85. return
  86. }
  87. for _, v := range baseIndexes {
  88. indexModifyTime[v.IndexCode] = utils.TimeTransferString(utils.FormatDateTime, v.ModifyTime)
  89. }
  90. }
  91. }
  92. if source == utils.DATA_SOURCE_COAL {
  93. var updateCodes []string
  94. for _, v := range list {
  95. if v.Unit == "" || v.Frequency == "" || v.ModifyTime == "" {
  96. updateCodes = append(updateCodes, v.IndexCode)
  97. }
  98. }
  99. if len(updateCodes) > 0 {
  100. baseIndexes, e := dataSourceModel.GetCoalmineBaseInfoFromDataTable(updateCodes)
  101. if e != nil {
  102. br.Msg = "获取失败"
  103. br.ErrMsg = fmt.Sprintf("获取中国煤炭市场网指标基础信息失败, %v", e)
  104. return
  105. }
  106. for _, v := range baseIndexes {
  107. indexBaseInfo[v.IndexCode] = v
  108. }
  109. }
  110. }
  111. }
  112. listMap := make([]map[string]interface{}, 0)
  113. for _, v := range list {
  114. // (短期方案)由于start_date、end_date和latest_value字段不全的历史遗留问题,这里查出来并补充进ES里面去
  115. var updateEs bool
  116. if v.StartDate == "" || v.EndDate == "" || v.LatestValue == "" || v.LatestValue == "0" {
  117. minMax, e := dataSourceModel.GetBaseIndexDataMinMax(v.Source, v.SubSource, v.IndexCode)
  118. if e != nil && e.Error() != utils.ErrNoRow() {
  119. br.Msg = "获取失败"
  120. br.ErrMsg = fmt.Sprintf("获取指标开始结束时间失败, %v", e)
  121. return
  122. }
  123. if minMax != nil {
  124. v.StartDate = minMax.MinDate
  125. v.EndDate = minMax.MaxDate
  126. v.LatestValue = minMax.LatestValue
  127. updateEs = true
  128. }
  129. }
  130. // 煤炭江湖缺ModifyTime
  131. if source == utils.DATA_SOURCE_MTJH && v.ModifyTime == "" && indexModifyTime[v.IndexCode] != "" {
  132. v.ModifyTime = indexModifyTime[v.IndexCode]
  133. updateEs = true
  134. }
  135. // 中国煤炭市场网缺Unit, Frequency, ModifyTime
  136. if source == utils.DATA_SOURCE_COAL && (v.Unit == "" || v.Frequency == "" || v.ModifyTime == "") {
  137. if indexBaseInfo[v.IndexCode] != nil {
  138. v.Unit = indexBaseInfo[v.IndexCode].Unit
  139. v.Frequency = indexBaseInfo[v.IndexCode].Frequency
  140. v.ModifyTime = utils.TimeTransferString(utils.FormatDateTime, indexBaseInfo[v.IndexCode].ModifyTime)
  141. updateEs = true
  142. }
  143. }
  144. // 写入ES更新队列
  145. if updateEs {
  146. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, v); e != nil {
  147. br.Msg = "获取失败"
  148. br.ErrMsg = fmt.Sprintf("写入ES更新队列失败, Source: %d, IndexCode: %s, err: %v", v.Source, v.IndexCode, e)
  149. return
  150. }
  151. }
  152. // 转换成map返回
  153. listMap = append(listMap, v.ToMap(primaryIdKey, indexNameKey, classifyIdKey))
  154. }
  155. // 美国农业部(多一个ParentClassifyId字段)
  156. if source == utils.DATA_SOURCE_USDA_FAS {
  157. // 父级分类ID
  158. classifyIds := make([]int, 0)
  159. for _, v := range list {
  160. classifyIds = append(classifyIds, v.ClassifyId)
  161. }
  162. classifyList, e := data_manage.GetBaseFromUsdaFasClassifyByIds(classifyIds)
  163. if e != nil {
  164. br.Msg = "获取失败"
  165. br.ErrMsg = fmt.Sprintf("获取美国农业部分类失败, %v", e)
  166. return
  167. }
  168. classifyMap := make(map[int]int)
  169. for _, v := range classifyList {
  170. classifyMap[v.ClassifyId] = v.ParentId
  171. }
  172. for _, v := range listMap {
  173. id, ok := v["ClassifyId"].(int)
  174. if !ok {
  175. v["ParentClassifyId"] = 0
  176. continue
  177. }
  178. v["ParentClassifyId"] = classifyMap[id]
  179. }
  180. }
  181. // 煤炭江湖(多一个Area字段)
  182. if source == utils.DATA_SOURCE_MTJH {
  183. cond := ``
  184. pars := make([]interface{}, 0)
  185. indexes, e := data_manage.GetMtjhItemsByCondition(cond, pars)
  186. if e != nil {
  187. br.Msg = "获取失败"
  188. br.ErrMsg = fmt.Sprintf("获取煤炭江湖指标失败, %v", e)
  189. return
  190. }
  191. indexArea := make(map[string]string)
  192. for _, v := range indexes {
  193. indexArea[v.IndexCode] = v.Area
  194. }
  195. for _, v := range listMap {
  196. code, ok := v["IndexCode"].(string)
  197. if !ok {
  198. v["Area"] = ""
  199. continue
  200. }
  201. v["Area"] = indexArea[code]
  202. }
  203. }
  204. page := paging.GetPaging(currentIndex, pageSize, total)
  205. resp := dataSourceModel.SearchDataSourceResp{
  206. Paging: page,
  207. List: listMap,
  208. }
  209. br.Data = resp
  210. br.Ret = 200
  211. br.Success = true
  212. br.Msg = "获取成功"
  213. }