package binlog import ( "encoding/json" dataSourceModel "eta/eta_api/models/data_source" "eta/eta_api/services/elastic" "eta/eta_api/utils" "fmt" ) // HandleDataSourceChange2Es 数据源变动写入ES func HandleDataSourceChange2Es() { for { utils.Rc.Brpop(utils.CACHE_DATA_SOURCE_ES_HANDLE, func(b []byte) { indexItem := new(dataSourceModel.SearchDataSource) if e := json.Unmarshal(b, &indexItem); e != nil { utils.FileLog.Info(fmt.Sprintf("HandleDataSourceChange2Es, json unmarshal err: %v", e)) return } fmt.Printf("ES开始写入, source: %d, sourceName: %s, indexCode: %s\n", indexItem.Source, indexItem.SourceName, indexItem.IndexCode) // 手工指标的文档ID用source和指标编码组合 var docId string if indexItem.Source == utils.DATA_SOURCE_MANUAL { docId = fmt.Sprintf("%d-%s", indexItem.Source, indexItem.IndexCode) } else { docId = fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId) } if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil { utils.FileLog.Info(fmt.Sprintf("HandleDataSourceChange2Es, write2es byte: %s, err: %v", string(b), e)) return } fmt.Println("ES写入成功") }) } }