package data import ( "context" "eta/eta_task/models" aiPredictModel "eta/eta_task/models/ai_predict_model" dataSourceModel "eta/eta_task/models/data_source" "eta/eta_task/services/alarm_msg" "eta/eta_task/services/elastic" "eta/eta_task/utils" "fmt" ) // SyncDataSourceEs 同步数据源ES func SyncDataSourceEs(cont context.Context) (err error) { utils.FileLog.Info("SyncDataSourceEs-start") defer func() { if err != nil { tips := fmt.Sprintf("SyncDataSourceEs-同步数据源ES失败, %v", err) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } utils.FileLog.Info("SyncDataSourceEs-end") }() confOb := new(models.BusinessConf) conf, e := confOb.GetItemByConfKey(models.BusinessConfSyncDataEsDaily) if e != nil { if e.Error() == utils.ErrNoRow() { utils.FileLog.Info("SyncDataSourceEs-无刷新配置,不同步") return } err = fmt.Errorf("获取刷新配置失败, %v", e) return } if conf.ConfVal != "true" { return } var cond string var pars []interface{} // 睿咨得 rzdOb := new(dataSourceModel.BaseFromRzdIndex) { list, e := rzdOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取睿咨得失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:睿咨得-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("睿咨得-写入es失败, %v", e) return } } } // 泛糖科技 hisugarOb := new(dataSourceModel.BaseFromHisugarIndex) { list, e := hisugarOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取泛糖科技失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:泛糖科技-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("泛糖科技-写入es失败, %v", e) return } } } // 粮油商务网 lyOb := new(dataSourceModel.BaseFromLyIndex) { list, e := lyOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取粮油商务网失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:粮油商务网-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("粮油商务网-写入es失败, %v", e) return } } } // 卓创红期 sciHqOb := new(dataSourceModel.BaseFromSciHqIndex) { list, e := sciHqOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取卓创红期失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:卓创红期-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("卓创红期-写入es失败, %v", e) return } } } // 同花顺高频 thsHfOb := new(dataSourceModel.BaseFromThsHfIndex) { list, e := thsHfOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取同花顺高频失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:同花顺高频-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("同花顺高频-写入es失败, %v", e) return } } } // 隆众资讯 oilchemOb := new(dataSourceModel.BaseFromOilchemIndex) { list, e := oilchemOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取隆众资讯失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:隆众资讯-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("隆众资讯-写入es失败, %v", e) return } } } // CCF化纤信息 ccfOb := new(dataSourceModel.BaseFromCcfIndex) { list, e := ccfOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取CCF化纤信息失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:CCF化纤信息-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("CCF化纤信息-写入es失败, %v", e) return } } } // 上海钢联 mysteelOb := new(dataSourceModel.BaseFromMysteelChemicalIndex) { list, e := mysteelOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取上海钢联失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:上海钢联-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("上海钢联-写入es失败, %v", e) return } } } // SMM、有色原始数据库 smmOb := new(dataSourceModel.BaseFromSmmIndex) { list, e := smmOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取有色原始数据库失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:有色原始数据库-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("有色原始数据库-写入es失败, %v", e) return } } } // 百川盈孚 baiinfoOb := new(dataSourceModel.BaseFromBaiinfoIndex) { list, e := baiinfoOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取百川盈孚失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:百川盈孚-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("百川盈孚-写入es失败, %v", e) return } } } // 红桃3 sciOb := new(dataSourceModel.BaseFromSciIndex) { list, e := sciOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取红桃3失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:红桃3-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("红桃3-写入es失败, %v", e) return } } } // 中国煤炭市场网 coalmineOb := new(dataSourceModel.BaseFromCoalmineMapping) { list, e := coalmineOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取中国煤炭市场网失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:中国煤炭市场网-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("中国煤炭市场网-写入es失败, %v", e) return } } } // EIA STEO报告 eiaOb := new(dataSourceModel.BaseFromEiaSteoIndex) { list, e := eiaOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取EIA STEO报告失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:EIA STEO报告-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("EIA STEO报告-写入es失败, %v", e) return } } } // ICPI消费价格指数 icpiOb := new(dataSourceModel.BaseFromIcpiIndex) { list, e := icpiOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取ICPI消费价格指数失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:ICPI消费价格指数-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("ICPI消费价格指数-写入es失败, %v", e) return } } } // 涌益咨询 yongyiOb := new(dataSourceModel.BaseFromYongyiIndex) { list, e := yongyiOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取涌益咨询失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:涌益咨询-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("涌益咨询-写入es失败, %v", e) return } } } // 汾渭数据 fenweiOb := new(dataSourceModel.BaseFromFenweiIndex) { list, e := fenweiOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取汾渭数据失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:汾渭数据-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("汾渭数据-写入es失败, %v", e) return } } } // 卓创数据 sci99Ob := new(dataSourceModel.BaseFromSci99Index) { list, e := sci99Ob.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取卓创数据失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:卓创数据-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("卓创数据-写入es失败, %v", e) return } } } // 钢联原始指标库 glOb := new(dataSourceModel.BaseFromGlIndex) { list, e := glOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取钢联原始指标库失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:钢联原始指标库-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("钢联原始指标库-写入es失败, %v", e) return } } } // 手工指标录入 manualOb := new(dataSourceModel.BaseFromManualEdb) { list, e := manualOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取手工指标失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:手工指标-%s\n", indexItem.IndexCode) docId := fmt.Sprintf("%d-%s", indexItem.Source, indexItem.IndexCode) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("手工指标-写入es失败, %v", e) return } } } // Bloomberg bloombergOb := new(dataSourceModel.BaseFromBloombergIndex) { list, e := bloombergOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取Bloomberg失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:Bloomberg-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("Bloomberg-写入es失败, %v", e) return } } } // 煤炭江湖 mtjhOb := new(dataSourceModel.BaseFromMtjhMapping) { list, e := mtjhOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取煤炭江湖失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:煤炭江湖-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("煤炭江湖-写入es失败, %v", e) return } } } // AI预测模型 aiPredictOb := new(aiPredictModel.AiPredictModelIndex) { list, e := aiPredictOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取AI预测模型失败, %v", e) return } for _, v := range list { indexItem := new(dataSourceModel.SearchDataSource) indexItem.PrimaryId = v.AiPredictModelIndexId indexItem.IndexName = v.IndexName indexItem.IndexCode = v.IndexCode indexItem.ClassifyId = v.ClassifyId indexItem.Source = utils.DATA_SOURCE_AI_PREDICT_MODEL indexItem.SourceName = "AI预测模型" indexItem.CreateTime = utils.TimeTransferString(utils.FormatDateTime, v.CreateTime) indexItem.ModifyTime = utils.TimeTransferString(utils.FormatDateTime, v.ModifyTime) fmt.Printf("写入中:AI预测模型-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("AI预测模型-写入es失败, %v", e) return } } } // 美国农业部 usdaOb := new(dataSourceModel.BaseFromUsdaFasIndex) { list, e := usdaOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取美国农业部失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:美国农业部-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("美国农业部-写入es失败, %v", e) return } } } // 自有数据 businessOb := new(dataSourceModel.BaseFromBusinessIndex) { list, e := businessOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取自有数据失败, %v", e) return } for _, v := range list { indexItem := v.Format2SearchDataSource(v) fmt.Printf("写入中:自有数据-%d\n", indexItem.PrimaryId) docId := fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { err = fmt.Errorf("自有数据-写入es失败, %v", e) return } } } return }