1234567891011121314151617181920212223242526272829303132333435 |
- package binlog
- import (
- "encoding/json"
- dataSourceModel "eta/eta_api/models/data_source"
- "eta/eta_api/services/elastic"
- "eta/eta_api/utils"
- "fmt"
- )
- // HandleDataSourceChange2Es 数据源变动写入ES
- func HandleDataSourceChange2Es() {
- for {
- utils.Rc.Brpop(utils.CACHE_DATA_SOURCE_ES_HANDLE, func(b []byte) {
- indexItem := new(dataSourceModel.SearchDataSource)
- if e := json.Unmarshal(b, &indexItem); e != nil {
- utils.FileLog.Info(fmt.Sprintf("HandleDataSourceChange2Es, json unmarshal err: %v", e))
- return
- }
- fmt.Printf("ES开始写入, source: %d, sourceName: %s, indexCode: %s\n", indexItem.Source, indexItem.SourceName, indexItem.IndexCode)
- // 手工指标的文档ID用source和指标编码组合
- var docId string
- if indexItem.Source == utils.DATA_SOURCE_MANUAL {
- docId = fmt.Sprintf("%d-%s", indexItem.Source, indexItem.IndexCode)
- } else {
- docId = fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
- }
- if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
- utils.FileLog.Info(fmt.Sprintf("HandleDataSourceChange2Es, write2es byte: %s, err: %v", string(b), e))
- return
- }
- fmt.Println("ES写入成功")
- })
- }
- }
|