data_source.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package data_source
  2. import (
  3. "eta/eta_api/models"
  4. "eta/eta_api/models/data_manage"
  5. dataSourceModel "eta/eta_api/models/data_source"
  6. "eta/eta_api/services/elastic"
  7. "eta/eta_api/utils"
  8. "fmt"
  9. "github.com/rdlucklib/rdluck_tools/paging"
  10. "strconv"
  11. )
  12. // SearchByEs
  13. // @Title ES搜索
  14. // @Description ES搜索
  15. // @Param PageSize query int true "每页数据条数"
  16. // @Param CurrentIndex query int true "当前页页码,从1开始"
  17. // @Param Keyword query string true "搜索关键词"
  18. // @Param Source query int true "数据源"
  19. // @Success 200 {object} response.ExcelListResp
  20. // @router /common/search_by_es [get]
  21. func (c *DataSourceController) SearchByEs() {
  22. br := new(models.BaseResponse).Init()
  23. defer func() {
  24. if br.ErrMsg == "" {
  25. br.IsSendEmail = false
  26. }
  27. c.Data["json"] = br
  28. c.ServeJSON()
  29. }()
  30. sysUser := c.SysUser
  31. if sysUser == nil {
  32. br.Msg = "请登录"
  33. br.ErrMsg = "请登录,SysUser Is Empty"
  34. br.Ret = 408
  35. return
  36. }
  37. pageSize, _ := c.GetInt("PageSize")
  38. currentIndex, _ := c.GetInt("CurrentIndex")
  39. keyword := c.GetString("KeyWord")
  40. if keyword == `` {
  41. keyword = c.GetString("Keyword")
  42. }
  43. source, _ := c.GetInt("Source")
  44. if source <= 0 {
  45. br.Msg = "来源有误"
  46. br.ErrMsg = fmt.Sprintf("数据来源有误, Source: %d", source)
  47. return
  48. }
  49. subSource, _ := c.GetInt("SubSource")
  50. // 以下为兼容各旧接口的额外传参,不为空时修改Resp对应的Key
  51. primaryIdKey := c.GetString("PrimaryIdKey")
  52. indexNameKey := c.GetString("IndexNameKey")
  53. classifyIdKey := c.GetString("ClassifyIdKey")
  54. var total, startSize int
  55. if pageSize <= 0 {
  56. pageSize = utils.PageSize15
  57. }
  58. if currentIndex <= 0 {
  59. currentIndex = 1
  60. }
  61. startSize = paging.StartIndex(currentIndex, pageSize)
  62. // es搜索
  63. t, list, e := elastic.SearchDataSourceIndex(utils.EsDataSourceIndexName, keyword, source, subSource, []int{}, []int{}, []string{}, startSize, pageSize)
  64. if e != nil {
  65. br.Msg = "获取失败"
  66. br.ErrMsg = fmt.Sprintf("ES-搜索数据源列表失败, %v", e)
  67. return
  68. }
  69. total = int(t)
  70. // (短期方案)中国煤炭市场网/煤炭江湖额外补充基础字段
  71. indexModifyTime := make(map[string]string)
  72. indexBaseInfo := make(map[string]*dataSourceModel.BaseFromCoalmineIndexBase)
  73. {
  74. if source == utils.DATA_SOURCE_MTJH {
  75. var updateCodes []string
  76. for _, v := range list {
  77. if v.ModifyTime == "" {
  78. updateCodes = append(updateCodes, v.IndexCode)
  79. }
  80. }
  81. if len(updateCodes) > 0 {
  82. baseIndexes, e := dataSourceModel.GetMtjhBaseInfoFromDataTable(updateCodes)
  83. if e != nil {
  84. br.Msg = "获取失败"
  85. br.ErrMsg = fmt.Sprintf("获取煤炭江湖指标基础信息失败, %v", e)
  86. return
  87. }
  88. for _, v := range baseIndexes {
  89. indexModifyTime[v.IndexCode] = utils.TimeTransferString(utils.FormatDateTime, v.ModifyTime)
  90. }
  91. }
  92. }
  93. if source == utils.DATA_SOURCE_COAL {
  94. var updateCodes []string
  95. for _, v := range list {
  96. if v.Unit == "" || v.Frequency == "" || v.ModifyTime == "" {
  97. updateCodes = append(updateCodes, v.IndexCode)
  98. }
  99. }
  100. if len(updateCodes) > 0 {
  101. baseIndexes, e := dataSourceModel.GetCoalmineBaseInfoFromDataTable(updateCodes)
  102. if e != nil {
  103. br.Msg = "获取失败"
  104. br.ErrMsg = fmt.Sprintf("获取中国煤炭市场网指标基础信息失败, %v", e)
  105. return
  106. }
  107. for _, v := range baseIndexes {
  108. indexBaseInfo[v.IndexCode] = v
  109. }
  110. }
  111. }
  112. }
  113. listMap := make([]map[string]interface{}, 0)
  114. for _, v := range list {
  115. // (短期方案)由于start_date、end_date和latest_value字段不全的历史遗留问题,这里查出来并补充进ES里面去
  116. var updateEs bool
  117. if v.StartDate == "" || v.EndDate == "" || v.LatestValue == "" || v.LatestValue == "0" {
  118. minMax, e := dataSourceModel.GetBaseIndexDataMinMax(v.Source, v.SubSource, v.IndexCode)
  119. if e != nil && e.Error() != utils.ErrNoRow() {
  120. br.Msg = "获取失败"
  121. br.ErrMsg = fmt.Sprintf("获取指标开始结束时间失败, %v", e)
  122. return
  123. }
  124. if minMax != nil {
  125. v.StartDate = minMax.MinDate
  126. v.EndDate = minMax.MaxDate
  127. v.LatestValue = minMax.LatestValue
  128. updateEs = true
  129. }
  130. }
  131. // 煤炭江湖缺ModifyTime
  132. if source == utils.DATA_SOURCE_MTJH && v.ModifyTime == "" && indexModifyTime[v.IndexCode] != "" {
  133. v.ModifyTime = indexModifyTime[v.IndexCode]
  134. updateEs = true
  135. }
  136. // 中国煤炭市场网缺Unit, Frequency, ModifyTime
  137. if source == utils.DATA_SOURCE_COAL && (v.Unit == "" || v.Frequency == "" || v.ModifyTime == "") {
  138. if indexBaseInfo[v.IndexCode] != nil {
  139. v.Unit = indexBaseInfo[v.IndexCode].Unit
  140. v.Frequency = indexBaseInfo[v.IndexCode].Frequency
  141. v.ModifyTime = utils.TimeTransferString(utils.FormatDateTime, indexBaseInfo[v.IndexCode].ModifyTime)
  142. updateEs = true
  143. }
  144. }
  145. // 写入ES更新队列
  146. if updateEs {
  147. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, v); e != nil {
  148. br.Msg = "获取失败"
  149. br.ErrMsg = fmt.Sprintf("写入ES更新队列失败, Source: %d, IndexCode: %s, err: %v", v.Source, v.IndexCode, e)
  150. return
  151. }
  152. }
  153. // 转换成map返回
  154. listMap = append(listMap, v.ToMap(primaryIdKey, indexNameKey, classifyIdKey))
  155. }
  156. // 美国农业部(多一个ParentClassifyId字段)
  157. if source == utils.DATA_SOURCE_USDA_FAS {
  158. // 父级分类ID
  159. classifyIds := make([]int, 0)
  160. for _, v := range list {
  161. classifyIds = append(classifyIds, v.ClassifyId)
  162. }
  163. classifyList, e := data_manage.GetBaseFromUsdaFasClassifyByIds(classifyIds)
  164. if e != nil {
  165. br.Msg = "获取失败"
  166. br.ErrMsg = fmt.Sprintf("获取美国农业部分类失败, %v", e)
  167. return
  168. }
  169. classifyMap := make(map[int]int)
  170. for _, v := range classifyList {
  171. classifyMap[v.ClassifyId] = v.ParentId
  172. }
  173. for _, v := range listMap {
  174. id, ok := v["ClassifyId"].(int)
  175. if !ok {
  176. v["ParentClassifyId"] = 0
  177. continue
  178. }
  179. v["ParentClassifyId"] = classifyMap[id]
  180. }
  181. }
  182. // 煤炭江湖(多一个Area字段)
  183. if source == utils.DATA_SOURCE_MTJH {
  184. cond := ``
  185. pars := make([]interface{}, 0)
  186. indexes, e := data_manage.GetMtjhItemsByCondition(cond, pars)
  187. if e != nil {
  188. br.Msg = "获取失败"
  189. br.ErrMsg = fmt.Sprintf("获取煤炭江湖指标失败, %v", e)
  190. return
  191. }
  192. indexArea := make(map[string]string)
  193. for _, v := range indexes {
  194. indexArea[v.IndexCode] = v.Area
  195. }
  196. for _, v := range listMap {
  197. code, ok := v["IndexCode"].(string)
  198. if !ok {
  199. v["Area"] = ""
  200. continue
  201. }
  202. v["Area"] = indexArea[code]
  203. }
  204. }
  205. // 分类的唯一编码(前端定位用)
  206. if classifyIdKey == "" {
  207. classifyIdKey = "ClassifyId"
  208. }
  209. idUniqueCodeArr := []int{
  210. utils.DATA_SOURCE_SCI_HQ, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS, utils.DATA_SOURCE_EIA_STEO,
  211. }
  212. if utils.InArrayByInt(idUniqueCodeArr, source) {
  213. for _, v := range listMap {
  214. classifyId, ok := v[classifyIdKey].(int)
  215. if !ok {
  216. v["ClassifyUniqueCode"] = ""
  217. continue
  218. }
  219. v["ClassifyUniqueCode"] = strconv.Itoa(classifyId)
  220. }
  221. }
  222. //if source == utils.DATA_SOURCE_MANUAL {
  223. // // 手工指标
  224. // var wxUserId int64
  225. // wxUserId = int64(sysUser.AdminId)
  226. // if sysUser.RoleTypeCode == utils.ROLE_TYPE_CODE_ADMIN {
  227. // wxUserId = 0
  228. // }
  229. // classifies, err := models.GetEdbdataClassify(wxUserId)
  230. // if err != nil {
  231. // br.Msg = "获取失败"
  232. // br.ErrMsg = fmt.Sprintf("获取手工指标分类失败, %v", e)
  233. // return
  234. // }
  235. // unicodeMap := make(map[int]string)
  236. // for _, v := range classifies {
  237. // unicodeMap[v.ClassifyId] = v.UniqueCode
  238. // }
  239. // for _, v := range listMap {
  240. // classifyId, ok := v["ClassifyId"].(int)
  241. // if !ok {
  242. // continue
  243. // }
  244. // v["ClassifyUniqueCode"] = unicodeMap[classifyId]
  245. // }
  246. //}
  247. page := paging.GetPaging(currentIndex, pageSize, total)
  248. resp := dataSourceModel.SearchDataSourceResp{
  249. Paging: page,
  250. List: listMap,
  251. }
  252. br.Data = resp
  253. br.Ret = 200
  254. br.Success = true
  255. br.Msg = "获取成功"
  256. }