Browse Source

fix: 数据源es信息

hsun 2 months ago
parent
commit
40b4e924e7

+ 1 - 1
controllers/commodity_trade_base_index.go

@@ -2741,7 +2741,7 @@ func (this *TradeCommonController) MtjhSingleData() {
 	}
 
 	modifyTime, err := data_manage.GetMtjhIndexLatestDate(indexCode)
-	if err != nil {
+	if err != nil && err.Error() != utils.ErrNoRow() {
 		br.Msg = "获取更新时间失败"
 		br.ErrMsg = "获取更新时间失败,Err:" + err.Error()
 		return

+ 76 - 43
controllers/data_manage/base_from_ths_hf.go

@@ -330,12 +330,12 @@ func (this *BaseFromThsHfController) List() {
 
 	// 筛选项
 	var (
-		cond         string
-		pars         []interface{}
-		listOrder    string
-		classifyIds  []int
-		adminIds     []int
-		frequencyArr []string
+		cond        string
+		pars        []interface{}
+		listOrder   string
+		classifyIds []int
+		adminIds    []int
+		//frequencyArr []string
 	)
 	indexOb := new(data_manage.BaseFromThsHfIndex)
 	{
@@ -357,8 +357,8 @@ func (this *BaseFromThsHfController) List() {
 				br.Msg = "获取成功"
 				return
 			}
-			//cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().BaseFromThsHfClassifyId, utils.GetOrmInReplace(len(classifyIds)))
-			//pars = append(pars, classifyIds)
+			cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().BaseFromThsHfClassifyId, utils.GetOrmInReplace(len(classifyIds)))
+			pars = append(pars, classifyIds)
 
 			//// 不包含子分类
 			//if len(classifyIds) > 0 && !params.IncludeChild {
@@ -409,11 +409,18 @@ func (this *BaseFromThsHfController) List() {
 		}
 
 		if params.Frequency != "" {
-			//frequencyArr := strings.Split(params.Frequency, ",")
-			//if len(frequencyArr) > 0 {
-			//	cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().Frequency, utils.GetOrmInReplace(len(frequencyArr)))
-			//	pars = append(pars, frequencyArr)
-			//}
+			frequencyArr := strings.Split(params.Frequency, ",")
+			if len(frequencyArr) > 0 {
+				cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().Frequency, utils.GetOrmInReplace(len(frequencyArr)))
+				pars = append(pars, frequencyArr)
+			} else {
+				page := paging.GetPaging(params.CurrentIndex, params.PageSize, 0)
+				resp.Paging = page
+				br.Ret = 200
+				br.Success = true
+				br.Msg = "获取成功"
+				return
+			}
 			frequencyArr = strings.Split(params.Frequency, ",")
 		}
 		if params.SysAdminId != "" {
@@ -425,17 +432,43 @@ func (this *BaseFromThsHfController) List() {
 					adminIds = append(adminIds, t)
 				}
 			}
-			//if len(adminIds) > 0 {
-			//	cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().SysUserId, utils.GetOrmInReplace(len(adminIds)))
-			//	pars = append(pars, adminIds)
-			//}
+			if len(adminIds) > 0 {
+				cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().SysUserId, utils.GetOrmInReplace(len(adminIds)))
+				pars = append(pars, adminIds)
+			} else {
+				page := paging.GetPaging(params.CurrentIndex, params.PageSize, 0)
+				resp.Paging = page
+				br.Ret = 200
+				br.Success = true
+				br.Msg = "获取成功"
+				return
+			}
 		}
 		params.Keywords = strings.TrimSpace(params.Keywords)
-		//if params.Keywords != "" {
-		//	cond += fmt.Sprintf(" AND (%s LIKE ? OR %s LIKE ?)", indexOb.Cols().IndexCode, indexOb.Cols().IndexName)
-		//	kw := fmt.Sprint("%", params.Keywords, "%")
-		//	pars = append(pars, kw, kw)
-		//}
+
+		// 关键词空格拆分
+		if params.Keywords != "" {
+			indexCodeCol := indexOb.Cols().IndexCode
+			indexNameCol := indexOb.Cols().IndexName
+			keywordArr := strings.Split(params.Keywords, " ")
+			if len(keywordArr) > 1 {
+				sliceArr := make([]string, 0)
+				sliceArr = append(sliceArr, fmt.Sprintf(` %s LIKE ? OR %s LIKE ? `, indexCodeCol, indexNameCol))
+				pars = utils.GetLikeKeywordPars(pars, params.Keywords, 2)
+
+				for _, v := range keywordArr {
+					if v == ` ` || v == `` {
+						continue
+					}
+					sliceArr = append(sliceArr, fmt.Sprintf(` %s LIKE ? OR %s LIKE ? `, indexCodeCol, indexNameCol))
+					pars = utils.GetLikeKeywordPars(pars, v, 2)
+				}
+				cond += ` AND (` + strings.Join(sliceArr, " OR ") + `)`
+			} else {
+				cond += fmt.Sprintf(` AND (%s LIKE ? OR %s LIKE ?)`, indexCodeCol, indexNameCol)
+				pars = utils.GetLikeKeywordPars(pars, params.Keywords, 2)
+			}
+		}
 
 		// 排序
 		if params.SortField > 0 && params.SortType > 0 {
@@ -445,27 +478,27 @@ func (this *BaseFromThsHfController) List() {
 		}
 	}
 
-	// 先ES搜索出IndexIds, 再去查mysql(别问为啥要这么干,因为强行要es分词搜=_=!)
-	_, list, e := elastic.SearchDataSourceIndex(utils.EsDataSourceIndexName, params.Keywords, utils.DATA_SOURCE_THS, utils.DATA_SUB_SOURCE_HIGH_FREQUENCY, classifyIds, adminIds, frequencyArr, startSize, params.PageSize)
-	if e != nil {
-		br.Msg = "获取失败"
-		br.ErrMsg = fmt.Sprintf("搜索表格失败, %v", e)
-		return
-	}
-	if len(list) == 0 {
-		page := paging.GetPaging(params.CurrentIndex, params.PageSize, 0)
-		resp.Paging = page
-		br.Ret = 200
-		br.Success = true
-		br.Msg = "获取成功"
-		return
-	}
-	var indexIds []int
-	for _, v := range list {
-		indexIds = append(indexIds, v.PrimaryId)
-	}
-	cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().PrimaryId, utils.GetOrmInReplace(len(indexIds)))
-	pars = append(pars, indexIds)
+	// 先ES搜索出IndexIds, 再去查mysql
+	//_, list, e := elastic.SearchDataSourceIndex(utils.EsDataSourceIndexName, params.Keywords, utils.DATA_SOURCE_THS, utils.DATA_SUB_SOURCE_HIGH_FREQUENCY, classifyIds, adminIds, frequencyArr, startSize, params.PageSize)
+	//if e != nil {
+	//	br.Msg = "获取失败"
+	//	br.ErrMsg = fmt.Sprintf("搜索表格失败, %v", e)
+	//	return
+	//}
+	//if len(list) == 0 {
+	//	page := paging.GetPaging(params.CurrentIndex, params.PageSize, 0)
+	//	resp.Paging = page
+	//	br.Ret = 200
+	//	br.Success = true
+	//	br.Msg = "获取成功"
+	//	return
+	//}
+	//var indexIds []int
+	//for _, v := range list {
+	//	indexIds = append(indexIds, v.PrimaryId)
+	//}
+	//cond += fmt.Sprintf(" AND %s IN (%s)", indexOb.Cols().PrimaryId, utils.GetOrmInReplace(len(indexIds)))
+	//pars = append(pars, indexIds)
 
 	// 列表总计
 	total, e := indexOb.GetCountByCondition(cond, pars)

+ 9 - 0
controllers/data_manage/jiayue_edb_source.go

@@ -71,6 +71,15 @@ func (this *JiaYueEdbSourceController) FrequencyList() {
 		return
 	}
 
+	// 测试环境老在报错=_=!
+	if utils.RunMode == "debug" {
+		br.Data = make([]string, 0)
+		br.Ret = 200
+		br.Success = true
+		br.Msg = "获取成功"
+		return
+	}
+
 	frequencies, e := data.GetJiaYueFrequencyListFromBridge()
 	if e != nil {
 		br.Msg = "获取失败"

+ 73 - 8
controllers/data_source/data_source.go

@@ -73,10 +73,55 @@ func (c *DataSourceController) SearchByEs() {
 	}
 	total = int(t)
 
+	// (短期方案)中国煤炭市场网/煤炭江湖额外补充基础字段
+	indexModifyTime := make(map[string]string)
+	indexBaseInfo := make(map[string]*dataSourceModel.BaseFromCoalmineIndexBase)
+	{
+		if source == utils.DATA_SOURCE_MTJH {
+			var updateCodes []string
+			for _, v := range list {
+				if v.ModifyTime == "" {
+					updateCodes = append(updateCodes, v.IndexCode)
+				}
+			}
+			if len(updateCodes) > 0 {
+				baseIndexes, e := dataSourceModel.GetMtjhBaseInfoFromDataTable(updateCodes)
+				if e != nil {
+					br.Msg = "获取失败"
+					br.ErrMsg = fmt.Sprintf("获取煤炭江湖指标基础信息失败, %v", e)
+					return
+				}
+				for _, v := range baseIndexes {
+					indexModifyTime[v.IndexCode] = utils.TimeTransferString(utils.FormatDateTime, v.ModifyTime)
+				}
+			}
+		}
+		if source == utils.DATA_SOURCE_COAL {
+			var updateCodes []string
+			for _, v := range list {
+				if v.Unit == "" || v.Frequency == "" || v.ModifyTime == "" {
+					updateCodes = append(updateCodes, v.IndexCode)
+				}
+			}
+			if len(updateCodes) > 0 {
+				baseIndexes, e := dataSourceModel.GetCoalmineBaseInfoFromDataTable(updateCodes)
+				if e != nil {
+					br.Msg = "获取失败"
+					br.ErrMsg = fmt.Sprintf("获取中国煤炭市场网指标基础信息失败, %v", e)
+					return
+				}
+				for _, v := range baseIndexes {
+					indexBaseInfo[v.IndexCode] = v
+				}
+			}
+		}
+	}
+
 	listMap := make([]map[string]interface{}, 0)
 	for _, v := range list {
-		// 由于start_date、end_date和latest_value字段不全的历史遗留问题,这里查出来并补充进ES里面去
-		if v.StartDate == "" || v.EndDate == "" {
+		// (短期方案)由于start_date、end_date和latest_value字段不全的历史遗留问题,这里查出来并补充进ES里面去
+		var updateEs bool
+		if v.StartDate == "" || v.EndDate == "" || v.LatestValue == 0 {
 			minMax, e := dataSourceModel.GetBaseIndexDataMinMax(v.Source, v.SubSource, v.IndexCode)
 			if e != nil && e.Error() != utils.ErrNoRow() {
 				br.Msg = "获取失败"
@@ -87,15 +132,35 @@ func (c *DataSourceController) SearchByEs() {
 				v.StartDate = minMax.MinDate
 				v.EndDate = minMax.MaxDate
 				v.LatestValue = minMax.LatestValue
+				updateEs = true
+			}
+		}
 
-				// 写入ES更新队列
-				if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, v); e != nil {
-					br.Msg = "获取失败"
-					br.ErrMsg = fmt.Sprintf("写入ES更新队列失败, Source: %d, IndexCode: %s, err: %v", v.Source, v.IndexCode, e)
-					return
-				}
+		// 煤炭江湖缺ModifyTime
+		if source == utils.DATA_SOURCE_MTJH && v.ModifyTime == "" && indexModifyTime[v.IndexCode] != "" {
+			v.ModifyTime = indexModifyTime[v.IndexCode]
+			updateEs = true
+		}
+
+		// 中国煤炭市场网缺Unit, Frequency, ModifyTime
+		if source == utils.DATA_SOURCE_COAL && (v.Unit == "" || v.Frequency == "" || v.ModifyTime == "") {
+			if indexBaseInfo[v.IndexCode] != nil {
+				v.Unit = indexBaseInfo[v.IndexCode].Unit
+				v.Frequency = indexBaseInfo[v.IndexCode].Frequency
+				v.ModifyTime = utils.TimeTransferString(utils.FormatDateTime, indexBaseInfo[v.IndexCode].ModifyTime)
+				updateEs = true
 			}
 		}
+
+		// 写入ES更新队列
+		if updateEs {
+			if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, v); e != nil {
+				br.Msg = "获取失败"
+				br.ErrMsg = fmt.Sprintf("写入ES更新队列失败, Source: %d, IndexCode: %s, err: %v", v.Source, v.IndexCode, e)
+				return
+			}
+		}
+
 		// 转换成map返回
 		listMap = append(listMap, v.ToMap(primaryIdKey, indexNameKey, classifyIdKey))
 	}

+ 108 - 0
models/data_source/data_source.go

@@ -2009,3 +2009,111 @@ func GetBaseIndexDataMinMax(source, subSource int, indexCode string) (item *Base
 	item.LatestValue = latestVal
 	return
 }
+
+// BaseFromMtjhIndex 煤炭江湖数据表
+type BaseFromMtjhIndex struct {
+	BaseFromMtjhIndexId int       `orm:"column(base_from_mtjh_index_id);pk"`
+	IndexCode           string    `description:"指标编码"`
+	IndexName           string    `description:"指标名称"`
+	DealValue           float64   `description:"成交量"`
+	DataTime            time.Time `description:"数据日期"`
+	Unit                string    `description:"单位"`
+	Frequency           string    `description:"频度"`
+	StartDate           time.Time `description:"开始日期"`
+	EndDate             time.Time `description:"结束日期"`
+	LatestValue         float64   `description:"最新值"`
+	CreateTime          time.Time `description:"创建时间"`
+	ModifyTime          time.Time `description:"更新时间"`
+}
+
+// GetMtjhBaseInfoFromDataTable 煤炭江湖-从数据表获取基础信息
+func GetMtjhBaseInfoFromDataTable(codes []string) (items []*BaseFromMtjhIndex, err error) {
+	codeLens := len(codes)
+	if codeLens == 0 {
+		return
+	}
+	sql := fmt.Sprintf(`SELECT * FROM base_from_mtjh_index WHERE index_code IN (%s) GROUP BY index_code`, utils.GetOrmInReplace(codeLens))
+	_, err = orm.NewOrmUsingDB("data").Raw(sql, codes).QueryRows(&items)
+	return
+}
+
+// BaseFromCoalmineIndexBase 中国煤炭市场网数据表基础信息
+type BaseFromCoalmineIndexBase struct {
+	IndexCode  string    `description:"指标编码"`
+	IndexName  string    `description:"指标名称"`
+	Unit       string    `description:"单位"`
+	Frequency  string    `description:"频度"`
+	CreateTime time.Time `description:"创建时间"`
+	ModifyTime time.Time `description:"更新时间"`
+}
+
+// GetCoalmineBaseInfoFromDataTable 中国煤炭市场网-从数据表获取基础信息
+func GetCoalmineBaseInfoFromDataTable(codes []string) (items []*BaseFromCoalmineIndexBase, err error) {
+	codeLens := len(codes)
+	if codeLens == 0 {
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	items = make([]*BaseFromCoalmineIndexBase, 0)
+	var jsmCodes, companyCodes, firmCodes, coastalCodes, inlandCodes []string
+	for _, code := range codes {
+		if strings.Contains(code, "jsm") {
+			jsmCodes = append(jsmCodes, code)
+		}
+		if strings.Contains(code, "company") {
+			companyCodes = append(companyCodes, code)
+		}
+		if strings.Contains(code, "firm") {
+			firmCodes = append(firmCodes, code)
+		}
+		if strings.Contains(code, "coastal") {
+			coastalCodes = append(coastalCodes, code)
+		}
+		if strings.Contains(code, "inland") {
+			inlandCodes = append(inlandCodes, code)
+		}
+	}
+	var sql string
+	itemsOnce := make([]*BaseFromCoalmineIndexBase, 0)
+	if len(jsmCodes) > 0 {
+		sql = fmt.Sprintf(`SELECT * FROM base_from_coalmine_jsm_index WHERE index_code IN (%s) GROUP BY index_code`, utils.GetOrmInReplace(len(jsmCodes)))
+		_, err = o.Raw(sql, codes).QueryRows(&itemsOnce)
+		if err != nil {
+			return
+		}
+		items = append(items, itemsOnce...)
+	}
+	if len(companyCodes) > 0 {
+		sql = fmt.Sprintf(`SELECT * FROM base_from_coalmine_company_index WHERE index_code IN (%s) GROUP BY index_code`, utils.GetOrmInReplace(len(companyCodes)))
+		_, err = o.Raw(sql, codes).QueryRows(&itemsOnce)
+		if err != nil {
+			return
+		}
+		items = append(items, itemsOnce...)
+	}
+	if len(firmCodes) > 0 {
+		sql = fmt.Sprintf(`SELECT * FROM base_from_coalmine_firm_index WHERE index_code IN (%s) GROUP BY index_code`, utils.GetOrmInReplace(len(firmCodes)))
+		_, err = o.Raw(sql, codes).QueryRows(&itemsOnce)
+		if err != nil {
+			return
+		}
+		items = append(items, itemsOnce...)
+	}
+	if len(coastalCodes) > 0 {
+		sql = fmt.Sprintf(`SELECT * FROM base_from_coalmine_coastal_index WHERE index_code IN (%s) GROUP BY index_code`, utils.GetOrmInReplace(len(coastalCodes)))
+		_, err = o.Raw(sql, codes).QueryRows(&itemsOnce)
+		if err != nil {
+			return
+		}
+		items = append(items, itemsOnce...)
+	}
+	if len(inlandCodes) > 0 {
+		sql = fmt.Sprintf(`SELECT * FROM base_from_coalmine_inland_index WHERE index_code IN (%s) GROUP BY index_code`, utils.GetOrmInReplace(len(inlandCodes)))
+		_, err = o.Raw(sql, codes).QueryRows(&itemsOnce)
+		if err != nil {
+			return
+		}
+		items = append(items, itemsOnce...)
+	}
+	return
+}