data_source.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. package data
  2. import (
  3. "context"
  4. "eta/eta_task/models"
  5. aiPredictModel "eta/eta_task/models/ai_predict_model"
  6. dataSourceModel "eta/eta_task/models/data_source"
  7. "eta/eta_task/services/alarm_msg"
  8. "eta/eta_task/services/elastic"
  9. "eta/eta_task/utils"
  10. "fmt"
  11. )
  12. // SyncDataSourceEs 同步数据源ES
  13. func SyncDataSourceEs(cont context.Context) (err error) {
  14. utils.FileLog.Info("SyncDataSourceEs-start")
  15. defer func() {
  16. if err != nil {
  17. tips := fmt.Sprintf("SyncDataSourceEs-同步数据源ES失败, %v", err)
  18. utils.FileLog.Info(tips)
  19. go alarm_msg.SendAlarmMsg(tips, 3)
  20. }
  21. utils.FileLog.Info("SyncDataSourceEs-end")
  22. }()
  23. confOb := new(models.BusinessConf)
  24. conf, e := confOb.GetItemByConfKey(models.BusinessConfSyncDataEsDaily)
  25. if e != nil {
  26. if utils.IsErrNoRow(e) {
  27. utils.FileLog.Info("SyncDataSourceEs-无刷新配置,不同步")
  28. return
  29. }
  30. err = fmt.Errorf("获取刷新配置失败, %v", e)
  31. return
  32. }
  33. if conf != nil && conf.Id <= 0 {
  34. utils.FileLog.Info("SyncDataSourceEs-无刷新配置,不同步")
  35. return
  36. }
  37. if conf.ConfVal != "true" {
  38. return
  39. }
  40. var cond string
  41. var pars []interface{}
  42. // 睿咨得
  43. rzdOb := new(dataSourceModel.BaseFromRzdIndex)
  44. {
  45. list, e := rzdOb.GetItemsByCondition(cond, pars, []string{}, "")
  46. if e != nil {
  47. err = fmt.Errorf("获取睿咨得失败, %v", e)
  48. return
  49. }
  50. for _, v := range list {
  51. indexItem := v.Format2SearchDataSource(v)
  52. fmt.Printf("写入中:睿咨得-%d\n", indexItem.PrimaryId)
  53. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  54. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  55. err = fmt.Errorf("睿咨得-写入es失败, %v", e)
  56. return
  57. }
  58. }
  59. }
  60. // 泛糖科技
  61. hisugarOb := new(dataSourceModel.BaseFromHisugarIndex)
  62. {
  63. list, e := hisugarOb.GetItemsByCondition(cond, pars, []string{}, "")
  64. if e != nil {
  65. err = fmt.Errorf("获取泛糖科技失败, %v", e)
  66. return
  67. }
  68. for _, v := range list {
  69. indexItem := v.Format2SearchDataSource(v)
  70. fmt.Printf("写入中:泛糖科技-%d\n", indexItem.PrimaryId)
  71. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  72. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  73. err = fmt.Errorf("泛糖科技-写入es失败, %v", e)
  74. return
  75. }
  76. }
  77. }
  78. // 粮油商务网
  79. lyOb := new(dataSourceModel.BaseFromLyIndex)
  80. {
  81. list, e := lyOb.GetItemsByCondition(cond, pars, []string{}, "")
  82. if e != nil {
  83. err = fmt.Errorf("获取粮油商务网失败, %v", e)
  84. return
  85. }
  86. for _, v := range list {
  87. indexItem := v.Format2SearchDataSource(v)
  88. fmt.Printf("写入中:粮油商务网-%d\n", indexItem.PrimaryId)
  89. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  90. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  91. err = fmt.Errorf("粮油商务网-写入es失败, %v", e)
  92. return
  93. }
  94. }
  95. }
  96. // 卓创红期
  97. sciHqOb := new(dataSourceModel.BaseFromSciHqIndex)
  98. {
  99. list, e := sciHqOb.GetItemsByCondition(cond, pars, []string{}, "")
  100. if e != nil {
  101. err = fmt.Errorf("获取卓创红期失败, %v", e)
  102. return
  103. }
  104. for _, v := range list {
  105. indexItem := v.Format2SearchDataSource(v)
  106. fmt.Printf("写入中:卓创红期-%d\n", indexItem.PrimaryId)
  107. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  108. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  109. err = fmt.Errorf("卓创红期-写入es失败, %v", e)
  110. return
  111. }
  112. }
  113. }
  114. // 同花顺高频
  115. thsHfOb := new(dataSourceModel.BaseFromThsHfIndex)
  116. {
  117. list, e := thsHfOb.GetItemsByCondition(cond, pars, []string{}, "")
  118. if e != nil {
  119. err = fmt.Errorf("获取同花顺高频失败, %v", e)
  120. return
  121. }
  122. for _, v := range list {
  123. indexItem := v.Format2SearchDataSource(v)
  124. fmt.Printf("写入中:同花顺高频-%d\n", indexItem.PrimaryId)
  125. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  126. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  127. err = fmt.Errorf("同花顺高频-写入es失败, %v", e)
  128. return
  129. }
  130. }
  131. }
  132. // 隆众资讯
  133. oilchemOb := new(dataSourceModel.BaseFromOilchemIndex)
  134. {
  135. list, e := oilchemOb.GetItemsByCondition(cond, pars, []string{}, "")
  136. if e != nil {
  137. err = fmt.Errorf("获取隆众资讯失败, %v", e)
  138. return
  139. }
  140. for _, v := range list {
  141. indexItem := v.Format2SearchDataSource(v)
  142. fmt.Printf("写入中:隆众资讯-%d\n", indexItem.PrimaryId)
  143. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  144. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  145. err = fmt.Errorf("隆众资讯-写入es失败, %v", e)
  146. return
  147. }
  148. }
  149. }
  150. // CCF化纤信息
  151. ccfOb := new(dataSourceModel.BaseFromCcfIndex)
  152. {
  153. list, e := ccfOb.GetItemsByCondition(cond, pars, []string{}, "")
  154. if e != nil {
  155. err = fmt.Errorf("获取CCF化纤信息失败, %v", e)
  156. return
  157. }
  158. for _, v := range list {
  159. indexItem := v.Format2SearchDataSource(v)
  160. fmt.Printf("写入中:CCF化纤信息-%d\n", indexItem.PrimaryId)
  161. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  162. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  163. err = fmt.Errorf("CCF化纤信息-写入es失败, %v", e)
  164. return
  165. }
  166. }
  167. }
  168. // 上海钢联
  169. mysteelOb := new(dataSourceModel.BaseFromMysteelChemicalIndex)
  170. {
  171. list, e := mysteelOb.GetItemsByCondition(cond, pars, []string{}, "")
  172. if e != nil {
  173. err = fmt.Errorf("获取上海钢联失败, %v", e)
  174. return
  175. }
  176. for _, v := range list {
  177. indexItem := v.Format2SearchDataSource(v)
  178. fmt.Printf("写入中:上海钢联-%d\n", indexItem.PrimaryId)
  179. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  180. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  181. err = fmt.Errorf("上海钢联-写入es失败, %v", e)
  182. return
  183. }
  184. }
  185. }
  186. // SMM、有色原始数据库
  187. smmOb := new(dataSourceModel.BaseFromSmmIndex)
  188. {
  189. list, e := smmOb.GetItemsByCondition(cond, pars, []string{}, "")
  190. if e != nil {
  191. err = fmt.Errorf("获取有色原始数据库失败, %v", e)
  192. return
  193. }
  194. for _, v := range list {
  195. indexItem := v.Format2SearchDataSource(v)
  196. fmt.Printf("写入中:有色原始数据库-%d\n", indexItem.PrimaryId)
  197. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  198. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  199. err = fmt.Errorf("有色原始数据库-写入es失败, %v", e)
  200. return
  201. }
  202. }
  203. }
  204. // 百川盈孚
  205. baiinfoOb := new(dataSourceModel.BaseFromBaiinfoIndex)
  206. {
  207. list, e := baiinfoOb.GetItemsByCondition(cond, pars, []string{}, "")
  208. if e != nil {
  209. err = fmt.Errorf("获取百川盈孚失败, %v", e)
  210. return
  211. }
  212. for _, v := range list {
  213. indexItem := v.Format2SearchDataSource(v)
  214. fmt.Printf("写入中:百川盈孚-%d\n", indexItem.PrimaryId)
  215. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  216. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  217. err = fmt.Errorf("百川盈孚-写入es失败, %v", e)
  218. return
  219. }
  220. }
  221. }
  222. // 红桃3
  223. sciOb := new(dataSourceModel.BaseFromSciIndex)
  224. {
  225. list, e := sciOb.GetItemsByCondition(cond, pars, []string{}, "")
  226. if e != nil {
  227. err = fmt.Errorf("获取红桃3失败, %v", e)
  228. return
  229. }
  230. for _, v := range list {
  231. indexItem := v.Format2SearchDataSource(v)
  232. fmt.Printf("写入中:红桃3-%d\n", indexItem.PrimaryId)
  233. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  234. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  235. err = fmt.Errorf("红桃3-写入es失败, %v", e)
  236. return
  237. }
  238. }
  239. }
  240. // 中国煤炭市场网
  241. coalmineOb := new(dataSourceModel.BaseFromCoalmineMapping)
  242. {
  243. list, e := coalmineOb.GetItemsByCondition(cond, pars, []string{}, "")
  244. if e != nil {
  245. err = fmt.Errorf("获取中国煤炭市场网失败, %v", e)
  246. return
  247. }
  248. for _, v := range list {
  249. indexItem := v.Format2SearchDataSource(v)
  250. fmt.Printf("写入中:中国煤炭市场网-%d\n", indexItem.PrimaryId)
  251. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  252. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  253. err = fmt.Errorf("中国煤炭市场网-写入es失败, %v", e)
  254. return
  255. }
  256. }
  257. }
  258. // EIA STEO报告
  259. eiaOb := new(dataSourceModel.BaseFromEiaSteoIndex)
  260. {
  261. list, e := eiaOb.GetItemsByCondition(cond, pars, []string{}, "")
  262. if e != nil {
  263. err = fmt.Errorf("获取EIA STEO报告失败, %v", e)
  264. return
  265. }
  266. for _, v := range list {
  267. indexItem := v.Format2SearchDataSource(v)
  268. fmt.Printf("写入中:EIA STEO报告-%d\n", indexItem.PrimaryId)
  269. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  270. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  271. err = fmt.Errorf("EIA STEO报告-写入es失败, %v", e)
  272. return
  273. }
  274. }
  275. }
  276. // ICPI消费价格指数
  277. icpiOb := new(dataSourceModel.BaseFromIcpiIndex)
  278. {
  279. list, e := icpiOb.GetItemsByCondition(cond, pars, []string{}, "")
  280. if e != nil {
  281. err = fmt.Errorf("获取ICPI消费价格指数失败, %v", e)
  282. return
  283. }
  284. for _, v := range list {
  285. indexItem := v.Format2SearchDataSource(v)
  286. fmt.Printf("写入中:ICPI消费价格指数-%d\n", indexItem.PrimaryId)
  287. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  288. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  289. err = fmt.Errorf("ICPI消费价格指数-写入es失败, %v", e)
  290. return
  291. }
  292. }
  293. }
  294. // 涌益咨询
  295. yongyiOb := new(dataSourceModel.BaseFromYongyiIndex)
  296. {
  297. list, e := yongyiOb.GetItemsByCondition(cond, pars, []string{}, "")
  298. if e != nil {
  299. err = fmt.Errorf("获取涌益咨询失败, %v", e)
  300. return
  301. }
  302. for _, v := range list {
  303. indexItem := v.Format2SearchDataSource(v)
  304. fmt.Printf("写入中:涌益咨询-%d\n", indexItem.PrimaryId)
  305. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  306. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  307. err = fmt.Errorf("涌益咨询-写入es失败, %v", e)
  308. return
  309. }
  310. }
  311. }
  312. // 汾渭数据
  313. fenweiOb := new(dataSourceModel.BaseFromFenweiIndex)
  314. {
  315. list, e := fenweiOb.GetItemsByCondition(cond, pars, []string{}, "")
  316. if e != nil {
  317. err = fmt.Errorf("获取汾渭数据失败, %v", e)
  318. return
  319. }
  320. for _, v := range list {
  321. indexItem := v.Format2SearchDataSource(v)
  322. fmt.Printf("写入中:汾渭数据-%d\n", indexItem.PrimaryId)
  323. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  324. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  325. err = fmt.Errorf("汾渭数据-写入es失败, %v", e)
  326. return
  327. }
  328. }
  329. }
  330. // 卓创数据
  331. sci99Ob := new(dataSourceModel.BaseFromSci99Index)
  332. {
  333. list, e := sci99Ob.GetItemsByCondition(cond, pars, []string{}, "")
  334. if e != nil {
  335. err = fmt.Errorf("获取卓创数据失败, %v", e)
  336. return
  337. }
  338. for _, v := range list {
  339. indexItem := v.Format2SearchDataSource(v)
  340. fmt.Printf("写入中:卓创数据-%d\n", indexItem.PrimaryId)
  341. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  342. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  343. err = fmt.Errorf("卓创数据-写入es失败, %v", e)
  344. return
  345. }
  346. }
  347. }
  348. // 钢联原始指标库
  349. glOb := new(dataSourceModel.BaseFromGlIndex)
  350. {
  351. list, e := glOb.GetItemsByCondition(cond, pars, []string{}, "")
  352. if e != nil {
  353. err = fmt.Errorf("获取钢联原始指标库失败, %v", e)
  354. return
  355. }
  356. for _, v := range list {
  357. indexItem := v.Format2SearchDataSource(v)
  358. fmt.Printf("写入中:钢联原始指标库-%d\n", indexItem.PrimaryId)
  359. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  360. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  361. err = fmt.Errorf("钢联原始指标库-写入es失败, %v", e)
  362. return
  363. }
  364. }
  365. }
  366. // 手工指标录入
  367. manualOb := new(dataSourceModel.BaseFromManualEdb)
  368. {
  369. list, e := manualOb.GetItemsByCondition(cond, pars, []string{}, "")
  370. if e != nil {
  371. err = fmt.Errorf("获取手工指标失败, %v", e)
  372. return
  373. }
  374. for _, v := range list {
  375. indexItem := v.Format2SearchDataSource(v)
  376. fmt.Printf("写入中:手工指标-%s\n", indexItem.IndexCode)
  377. docId := fmt.Sprintf("%d-%s", indexItem.Source, indexItem.IndexCode)
  378. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  379. err = fmt.Errorf("手工指标-写入es失败, %v", e)
  380. return
  381. }
  382. }
  383. }
  384. // Bloomberg
  385. bloombergOb := new(dataSourceModel.BaseFromBloombergIndex)
  386. {
  387. list, e := bloombergOb.GetItemsByCondition(cond, pars, []string{}, "")
  388. if e != nil {
  389. err = fmt.Errorf("获取Bloomberg失败, %v", e)
  390. return
  391. }
  392. for _, v := range list {
  393. indexItem := v.Format2SearchDataSource(v)
  394. fmt.Printf("写入中:Bloomberg-%d\n", indexItem.PrimaryId)
  395. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  396. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  397. err = fmt.Errorf("Bloomberg-写入es失败, %v", e)
  398. return
  399. }
  400. }
  401. }
  402. // 煤炭江湖
  403. mtjhOb := new(dataSourceModel.BaseFromMtjhMapping)
  404. {
  405. list, e := mtjhOb.GetItemsByCondition(cond, pars, []string{}, "")
  406. if e != nil {
  407. err = fmt.Errorf("获取煤炭江湖失败, %v", e)
  408. return
  409. }
  410. for _, v := range list {
  411. indexItem := v.Format2SearchDataSource(v)
  412. fmt.Printf("写入中:煤炭江湖-%d\n", indexItem.PrimaryId)
  413. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  414. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  415. err = fmt.Errorf("煤炭江湖-写入es失败, %v", e)
  416. return
  417. }
  418. }
  419. }
  420. // AI预测模型
  421. aiPredictOb := new(aiPredictModel.AiPredictModelIndex)
  422. {
  423. list, e := aiPredictOb.GetItemsByCondition(cond, pars, []string{}, "")
  424. if e != nil {
  425. err = fmt.Errorf("获取AI预测模型失败, %v", e)
  426. return
  427. }
  428. for _, v := range list {
  429. indexItem := new(dataSourceModel.SearchDataSource)
  430. indexItem.PrimaryId = v.AiPredictModelIndexId
  431. indexItem.IndexName = v.IndexName
  432. indexItem.IndexCode = v.IndexCode
  433. indexItem.ClassifyId = v.ClassifyId
  434. indexItem.Source = utils.DATA_SOURCE_AI_PREDICT_MODEL
  435. indexItem.SourceName = "AI预测模型"
  436. indexItem.CreateTime = utils.TimeTransferString(utils.FormatDateTime, v.CreateTime)
  437. indexItem.ModifyTime = utils.TimeTransferString(utils.FormatDateTime, v.ModifyTime)
  438. fmt.Printf("写入中:AI预测模型-%d\n", indexItem.PrimaryId)
  439. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  440. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  441. err = fmt.Errorf("AI预测模型-写入es失败, %v", e)
  442. return
  443. }
  444. }
  445. }
  446. // 美国农业部
  447. usdaOb := new(dataSourceModel.BaseFromUsdaFasIndex)
  448. {
  449. list, e := usdaOb.GetItemsByCondition(cond, pars, []string{}, "")
  450. if e != nil {
  451. err = fmt.Errorf("获取美国农业部失败, %v", e)
  452. return
  453. }
  454. for _, v := range list {
  455. indexItem := v.Format2SearchDataSource(v)
  456. fmt.Printf("写入中:美国农业部-%d\n", indexItem.PrimaryId)
  457. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  458. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  459. err = fmt.Errorf("美国农业部-写入es失败, %v", e)
  460. return
  461. }
  462. }
  463. }
  464. // 自有数据
  465. businessOb := new(dataSourceModel.BaseFromBusinessIndex)
  466. {
  467. list, e := businessOb.GetItemsByCondition(cond, pars, []string{}, "")
  468. if e != nil {
  469. err = fmt.Errorf("获取自有数据失败, %v", e)
  470. return
  471. }
  472. for _, v := range list {
  473. indexItem := v.Format2SearchDataSource(v)
  474. fmt.Printf("写入中:自有数据-%d\n", indexItem.PrimaryId)
  475. docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  476. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  477. err = fmt.Errorf("自有数据-写入es失败, %v", e)
  478. return
  479. }
  480. }
  481. }
  482. return
  483. }