data_source.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435
  1. package binlog
  2. import (
  3. "encoding/json"
  4. dataSourceModel "eta/eta_api/models/data_source"
  5. "eta/eta_api/services/elastic"
  6. "eta/eta_api/utils"
  7. "fmt"
  8. )
  9. // HandleDataSourceChange2Es 数据源变动写入ES
  10. func HandleDataSourceChange2Es() {
  11. for {
  12. utils.Rc.Brpop(utils.CACHE_DATA_SOURCE_ES_HANDLE, func(b []byte) {
  13. indexItem := new(dataSourceModel.SearchDataSource)
  14. if e := json.Unmarshal(b, &indexItem); e != nil {
  15. utils.FileLog.Info(fmt.Sprintf("HandleDataSourceChange2Es, json unmarshal err: %v", e))
  16. return
  17. }
  18. fmt.Printf("ES开始写入, source: %d, sourceName: %s, indexCode: %s\n", indexItem.Source, indexItem.SourceName, indexItem.IndexCode)
  19. // 手工指标的文档ID用source和指标编码组合
  20. var docId string
  21. if indexItem.Source == utils.DATA_SOURCE_MANUAL {
  22. docId = fmt.Sprintf("%d-%s", indexItem.Source, indexItem.IndexCode)
  23. } else {
  24. docId = fmt.Sprintf("%d-%d", indexItem.Source, indexItem.PrimaryId)
  25. }
  26. if e := elastic.EsAddOrEditDataSourceIndex(utils.EsDataSourceIndexName, docId, indexItem); e != nil {
  27. utils.FileLog.Info(fmt.Sprintf("HandleDataSourceChange2Es, write2es byte: %s, err: %v", string(b), e))
  28. return
  29. }
  30. fmt.Println("ES写入成功")
  31. })
  32. }
  33. }