data_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package data
  2. import (
  3. "context"
  4. "eta/eta_task/models"
  5. aiPredictModel "eta/eta_task/models/ai_predict_model"
  6. dataSourceModel "eta/eta_task/models/data_source"
  7. "eta/eta_task/services/alarm_msg"
  8. "eta/eta_task/services/elastic"
  9. "eta/eta_task/utils"
  10. "fmt"
  11. )
  12. // SyncDataSourceEs 同步数据源ES
  13. func SyncDataSourceEs(cont context.Context) (err error) {
  14. utils.FileLog.Info("SyncDataSourceEs-start")
  15. defer func() {
  16. if err != nil {
  17. tips := fmt.Sprintf("SyncDataSourceEs-同步数据源ES失败, %v", err)
  18. utils.FileLog.Info(tips)
  19. go alarm_msg.SendAlarmMsg(tips, 3)
  20. }
  21. utils.FileLog.Info("SyncDataSourceEs-end")
  22. }()
  23. confOb := new(models.BusinessConf)
  24. conf, e := confOb.GetItemByConfKey(models.BusinessConfSyncDataEsDaily)
  25. if e != nil {
  26. if e.Error() == utils.ErrNoRow() {
  27. utils.FileLog.Info("SyncDataSourceEs-无刷新配置,不同步")
  28. return
  29. }
  30. err = fmt.Errorf("获取刷新配置失败, %v", e)
  31. return
  32. }
  33. if conf.ConfVal != "true" {
  34. return
  35. }
  36. var cond string
  37. var pars []interface{}
  38. // 睿咨得
  39. rzdOb := new(dataSourceModel.BaseFromRzdIndex)
  40. {
  41. list, e := rzdOb.GetItemsByCondition(cond, pars, []string{}, "")
  42. if e != nil {
  43. err = fmt.Errorf("获取睿咨得失败, %v", e)
  44. return
  45. }
  46. for _, v := range list {
  47. indexItem := v.Format2SearchDataSource(v)
  48. fmt.Printf("写入中:睿咨得-%d\n", indexItem.PrimaryId)
  49. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  50. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  51. err = fmt.Errorf("睿咨得-写入es失败, %v", e)
  52. return
  53. }
  54. }
  55. }
  56. // 泛糖科技
  57. hisugarOb := new(dataSourceModel.BaseFromHisugarIndex)
  58. {
  59. list, e := hisugarOb.GetItemsByCondition(cond, pars, []string{}, "")
  60. if e != nil {
  61. err = fmt.Errorf("获取泛糖科技失败, %v", e)
  62. return
  63. }
  64. for _, v := range list {
  65. indexItem := v.Format2SearchDataSource(v)
  66. fmt.Printf("写入中:泛糖科技-%d\n", indexItem.PrimaryId)
  67. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  68. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  69. err = fmt.Errorf("泛糖科技-写入es失败, %v", e)
  70. return
  71. }
  72. }
  73. }
  74. // 粮油商务网
  75. lyOb := new(dataSourceModel.BaseFromLyIndex)
  76. {
  77. list, e := lyOb.GetItemsByCondition(cond, pars, []string{}, "")
  78. if e != nil {
  79. err = fmt.Errorf("获取粮油商务网失败, %v", e)
  80. return
  81. }
  82. for _, v := range list {
  83. indexItem := v.Format2SearchDataSource(v)
  84. fmt.Printf("写入中:粮油商务网-%d\n", indexItem.PrimaryId)
  85. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  86. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  87. err = fmt.Errorf("粮油商务网-写入es失败, %v", e)
  88. return
  89. }
  90. }
  91. }
  92. // 卓创红期
  93. sciHqOb := new(dataSourceModel.BaseFromSciHqIndex)
  94. {
  95. list, e := sciHqOb.GetItemsByCondition(cond, pars, []string{}, "")
  96. if e != nil {
  97. err = fmt.Errorf("获取卓创红期失败, %v", e)
  98. return
  99. }
  100. for _, v := range list {
  101. indexItem := v.Format2SearchDataSource(v)
  102. fmt.Printf("写入中:卓创红期-%d\n", indexItem.PrimaryId)
  103. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  104. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  105. err = fmt.Errorf("卓创红期-写入es失败, %v", e)
  106. return
  107. }
  108. }
  109. }
  110. // 同花顺高频
  111. thsHfOb := new(dataSourceModel.BaseFromThsHfIndex)
  112. {
  113. list, e := thsHfOb.GetItemsByCondition(cond, pars, []string{}, "")
  114. if e != nil {
  115. err = fmt.Errorf("获取同花顺高频失败, %v", e)
  116. return
  117. }
  118. for _, v := range list {
  119. indexItem := v.Format2SearchDataSource(v)
  120. fmt.Printf("写入中:同花顺高频-%d\n", indexItem.PrimaryId)
  121. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  122. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  123. err = fmt.Errorf("同花顺高频-写入es失败, %v", e)
  124. return
  125. }
  126. }
  127. }
  128. // 隆众资讯
  129. oilchemOb := new(dataSourceModel.BaseFromOilchemIndex)
  130. {
  131. list, e := oilchemOb.GetItemsByCondition(cond, pars, []string{}, "")
  132. if e != nil {
  133. err = fmt.Errorf("获取隆众资讯失败, %v", e)
  134. return
  135. }
  136. for _, v := range list {
  137. indexItem := v.Format2SearchDataSource(v)
  138. fmt.Printf("写入中:隆众资讯-%d\n", indexItem.PrimaryId)
  139. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  140. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  141. err = fmt.Errorf("隆众资讯-写入es失败, %v", e)
  142. return
  143. }
  144. }
  145. }
  146. // CCF化纤信息
  147. ccfOb := new(dataSourceModel.BaseFromCcfIndex)
  148. {
  149. list, e := ccfOb.GetItemsByCondition(cond, pars, []string{}, "")
  150. if e != nil {
  151. err = fmt.Errorf("获取CCF化纤信息失败, %v", e)
  152. return
  153. }
  154. for _, v := range list {
  155. indexItem := v.Format2SearchDataSource(v)
  156. fmt.Printf("写入中:CCF化纤信息-%d\n", indexItem.PrimaryId)
  157. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  158. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  159. err = fmt.Errorf("CCF化纤信息-写入es失败, %v", e)
  160. return
  161. }
  162. }
  163. }
  164. // 上海钢联
  165. mysteelOb := new(dataSourceModel.BaseFromMysteelChemicalIndex)
  166. {
  167. list, e := mysteelOb.GetItemsByCondition(cond, pars, []string{}, "")
  168. if e != nil {
  169. err = fmt.Errorf("获取上海钢联失败, %v", e)
  170. return
  171. }
  172. for _, v := range list {
  173. indexItem := v.Format2SearchDataSource(v)
  174. fmt.Printf("写入中:上海钢联-%d\n", indexItem.PrimaryId)
  175. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  176. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  177. err = fmt.Errorf("上海钢联-写入es失败, %v", e)
  178. return
  179. }
  180. }
  181. }
  182. // SMM、有色原始数据库
  183. smmOb := new(dataSourceModel.BaseFromSmmIndex)
  184. {
  185. list, e := smmOb.GetItemsByCondition(cond, pars, []string{}, "")
  186. if e != nil {
  187. err = fmt.Errorf("获取有色原始数据库失败, %v", e)
  188. return
  189. }
  190. for _, v := range list {
  191. indexItem := v.Format2SearchDataSource(v)
  192. fmt.Printf("写入中:有色原始数据库-%d\n", indexItem.PrimaryId)
  193. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  194. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  195. err = fmt.Errorf("有色原始数据库-写入es失败, %v", e)
  196. return
  197. }
  198. }
  199. }
  200. // 百川盈孚
  201. baiinfoOb := new(dataSourceModel.BaseFromBaiinfoIndex)
  202. {
  203. list, e := baiinfoOb.GetItemsByCondition(cond, pars, []string{}, "")
  204. if e != nil {
  205. err = fmt.Errorf("获取百川盈孚失败, %v", e)
  206. return
  207. }
  208. for _, v := range list {
  209. indexItem := v.Format2SearchDataSource(v)
  210. fmt.Printf("写入中:百川盈孚-%d\n", indexItem.PrimaryId)
  211. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  212. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  213. err = fmt.Errorf("百川盈孚-写入es失败, %v", e)
  214. return
  215. }
  216. }
  217. }
  218. // 红桃3
  219. sciOb := new(dataSourceModel.BaseFromSciIndex)
  220. {
  221. list, e := sciOb.GetItemsByCondition(cond, pars, []string{}, "")
  222. if e != nil {
  223. err = fmt.Errorf("获取红桃3失败, %v", e)
  224. return
  225. }
  226. for _, v := range list {
  227. indexItem := v.Format2SearchDataSource(v)
  228. fmt.Printf("写入中:红桃3-%d\n", indexItem.PrimaryId)
  229. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  230. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  231. err = fmt.Errorf("红桃3-写入es失败, %v", e)
  232. return
  233. }
  234. }
  235. }
  236. // 中国煤炭市场网
  237. coalmineOb := new(dataSourceModel.BaseFromCoalmineMapping)
  238. {
  239. list, e := coalmineOb.GetItemsByCondition(cond, pars, []string{}, "")
  240. if e != nil {
  241. err = fmt.Errorf("获取中国煤炭市场网失败, %v", e)
  242. return
  243. }
  244. for _, v := range list {
  245. indexItem := v.Format2SearchDataSource(v)
  246. fmt.Printf("写入中:中国煤炭市场网-%d\n", indexItem.PrimaryId)
  247. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  248. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  249. err = fmt.Errorf("中国煤炭市场网-写入es失败, %v", e)
  250. return
  251. }
  252. }
  253. }
  254. // EIA STEO报告
  255. eiaOb := new(dataSourceModel.BaseFromEiaSteoIndex)
  256. {
  257. list, e := eiaOb.GetItemsByCondition(cond, pars, []string{}, "")
  258. if e != nil {
  259. err = fmt.Errorf("获取EIA STEO报告失败, %v", e)
  260. return
  261. }
  262. for _, v := range list {
  263. indexItem := v.Format2SearchDataSource(v)
  264. fmt.Printf("写入中:EIA STEO报告-%d\n", indexItem.PrimaryId)
  265. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  266. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  267. err = fmt.Errorf("EIA STEO报告-写入es失败, %v", e)
  268. return
  269. }
  270. }
  271. }
  272. // ICPI消费价格指数
  273. icpiOb := new(dataSourceModel.BaseFromIcpiIndex)
  274. {
  275. list, e := icpiOb.GetItemsByCondition(cond, pars, []string{}, "")
  276. if e != nil {
  277. err = fmt.Errorf("获取ICPI消费价格指数失败, %v", e)
  278. return
  279. }
  280. for _, v := range list {
  281. indexItem := v.Format2SearchDataSource(v)
  282. fmt.Printf("写入中:ICPI消费价格指数-%d\n", indexItem.PrimaryId)
  283. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  284. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  285. err = fmt.Errorf("ICPI消费价格指数-写入es失败, %v", e)
  286. return
  287. }
  288. }
  289. }
  290. // 涌益咨询
  291. yongyiOb := new(dataSourceModel.BaseFromYongyiIndex)
  292. {
  293. list, e := yongyiOb.GetItemsByCondition(cond, pars, []string{}, "")
  294. if e != nil {
  295. err = fmt.Errorf("获取涌益咨询失败, %v", e)
  296. return
  297. }
  298. for _, v := range list {
  299. indexItem := v.Format2SearchDataSource(v)
  300. fmt.Printf("写入中:涌益咨询-%d\n", indexItem.PrimaryId)
  301. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  302. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  303. err = fmt.Errorf("涌益咨询-写入es失败, %v", e)
  304. return
  305. }
  306. }
  307. }
  308. // 汾渭数据
  309. fenweiOb := new(dataSourceModel.BaseFromFenweiIndex)
  310. {
  311. list, e := fenweiOb.GetItemsByCondition(cond, pars, []string{}, "")
  312. if e != nil {
  313. err = fmt.Errorf("获取汾渭数据失败, %v", e)
  314. return
  315. }
  316. for _, v := range list {
  317. indexItem := v.Format2SearchDataSource(v)
  318. fmt.Printf("写入中:汾渭数据-%d\n", indexItem.PrimaryId)
  319. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  320. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  321. err = fmt.Errorf("汾渭数据-写入es失败, %v", e)
  322. return
  323. }
  324. }
  325. }
  326. // 卓创数据
  327. sci99Ob := new(dataSourceModel.BaseFromSci99Index)
  328. {
  329. list, e := sci99Ob.GetItemsByCondition(cond, pars, []string{}, "")
  330. if e != nil {
  331. err = fmt.Errorf("获取卓创数据失败, %v", e)
  332. return
  333. }
  334. for _, v := range list {
  335. indexItem := v.Format2SearchDataSource(v)
  336. fmt.Printf("写入中:卓创数据-%d\n", indexItem.PrimaryId)
  337. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  338. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  339. err = fmt.Errorf("卓创数据-写入es失败, %v", e)
  340. return
  341. }
  342. }
  343. }
  344. // 钢联原始指标库
  345. glOb := new(dataSourceModel.BaseFromGlIndex)
  346. {
  347. list, e := glOb.GetItemsByCondition(cond, pars, []string{}, "")
  348. if e != nil {
  349. err = fmt.Errorf("获取钢联原始指标库失败, %v", e)
  350. return
  351. }
  352. for _, v := range list {
  353. indexItem := v.Format2SearchDataSource(v)
  354. fmt.Printf("写入中:钢联原始指标库-%d\n", indexItem.PrimaryId)
  355. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  356. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  357. err = fmt.Errorf("钢联原始指标库-写入es失败, %v", e)
  358. return
  359. }
  360. }
  361. }
  362. // 手工指标录入
  363. manualOb := new(dataSourceModel.BaseFromManualEdb)
  364. {
  365. list, e := manualOb.GetItemsByCondition(cond, pars, []string{}, "")
  366. if e != nil {
  367. err = fmt.Errorf("获取手工指标失败, %v", e)
  368. return
  369. }
  370. for _, v := range list {
  371. indexItem := v.Format2SearchDataSource(v)
  372. fmt.Printf("写入中:手工指标-%s\n", indexItem.IndexCode)
  373. docId := fmt.Sprintf("%d-%s", indexItem.Source, indexItem.IndexCode)
  374. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  375. err = fmt.Errorf("手工指标-写入es失败, %v", e)
  376. return
  377. }
  378. }
  379. }
  380. // Bloomberg
  381. bloombergOb := new(dataSourceModel.BaseFromBloombergIndex)
  382. {
  383. list, e := bloombergOb.GetItemsByCondition(cond, pars, []string{}, "")
  384. if e != nil {
  385. err = fmt.Errorf("获取Bloomberg失败, %v", e)
  386. return
  387. }
  388. for _, v := range list {
  389. indexItem := v.Format2SearchDataSource(v)
  390. fmt.Printf("写入中:Bloomberg-%d\n", indexItem.PrimaryId)
  391. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  392. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  393. err = fmt.Errorf("Bloomberg-写入es失败, %v", e)
  394. return
  395. }
  396. }
  397. }
  398. // 煤炭江湖
  399. mtjhOb := new(dataSourceModel.BaseFromMtjhMapping)
  400. {
  401. list, e := mtjhOb.GetItemsByCondition(cond, pars, []string{}, "")
  402. if e != nil {
  403. err = fmt.Errorf("获取煤炭江湖失败, %v", e)
  404. return
  405. }
  406. for _, v := range list {
  407. indexItem := v.Format2SearchDataSource(v)
  408. fmt.Printf("写入中:煤炭江湖-%d\n", indexItem.PrimaryId)
  409. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  410. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  411. err = fmt.Errorf("煤炭江湖-写入es失败, %v", e)
  412. return
  413. }
  414. }
  415. }
  416. // AI预测模型
  417. aiPredictOb := new(aiPredictModel.AiPredictModelIndex)
  418. {
  419. list, e := aiPredictOb.GetItemsByCondition(cond, pars, []string{}, "")
  420. if e != nil {
  421. err = fmt.Errorf("获取AI预测模型失败, %v", e)
  422. return
  423. }
  424. for _, v := range list {
  425. indexItem := new(dataSourceModel.SearchDataSource)
  426. indexItem.PrimaryId = v.AiPredictModelIndexId
  427. indexItem.IndexName = v.IndexName
  428. indexItem.IndexCode = v.IndexCode
  429. indexItem.ClassifyId = v.ClassifyId
  430. indexItem.Source = utils.DATA_SOURCE_AI_PREDICT_MODEL
  431. indexItem.SourceName = "AI预测模型"
  432. indexItem.CreateTime = utils.TimeTransferString(utils.FormatDateTime, v.CreateTime)
  433. indexItem.ModifyTime = utils.TimeTransferString(utils.FormatDateTime, v.ModifyTime)
  434. fmt.Printf("写入中:AI预测模型-%d\n", indexItem.PrimaryId)
  435. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  436. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  437. err = fmt.Errorf("AI预测模型-写入es失败, %v", e)
  438. return
  439. }
  440. }
  441. }
  442. // 美国农业部
  443. usdaOb := new(dataSourceModel.BaseFromUsdaFasIndex)
  444. {
  445. list, e := usdaOb.GetItemsByCondition(cond, pars, []string{}, "")
  446. if e != nil {
  447. err = fmt.Errorf("获取美国农业部失败, %v", e)
  448. return
  449. }
  450. for _, v := range list {
  451. indexItem := v.Format2SearchDataSource(v)
  452. fmt.Printf("写入中:美国农业部-%d\n", indexItem.PrimaryId)
  453. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  454. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  455. err = fmt.Errorf("美国农业部-写入es失败, %v", e)
  456. return
  457. }
  458. }
  459. }
  460. // 自有数据
  461. businessOb := new(dataSourceModel.BaseFromBusinessIndex)
  462. {
  463. list, e := businessOb.GetItemsByCondition(cond, pars, []string{}, "")
  464. if e != nil {
  465. err = fmt.Errorf("获取自有数据失败, %v", e)
  466. return
  467. }
  468. for _, v := range list {
  469. indexItem := v.Format2SearchDataSource(v)
  470. fmt.Printf("写入中:自有数据-%d\n", indexItem.PrimaryId)
  471. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  472. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  473. err = fmt.Errorf("自有数据-写入es失败, %v", e)
  474. return
  475. }
  476. }
  477. }
  478. return
  479. }