Browse Source

数据源明细汇总处理

xyxie 1 year ago
parent
commit
9ec21ef2bb

+ 0 - 7
controllers/base_from_calculate.go

@@ -1268,14 +1268,11 @@ func (this *CalculateController) BatchEdit() {
 		err, errMsg = baseEdbInfoModel.Edit(editParams)
 	}
 	if err != nil {
-		// todo 添加更新失败记录
 		br.Msg = "生成" + sourName + "失败"
 		if errMsg != `` {
 			br.Msg = errMsg
 		}
 		br.Msg = "生成" + sourName + "失败 Err:" + err.Error()
-		// 添加指标刷新失败日志
-		//_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, br.Msg)
 		return
 	}
 	if edbInfo == nil {
@@ -1315,10 +1312,6 @@ func (this *CalculateController) BatchEdit() {
 		return
 	}
 
-	// todo 新增更新记录
-	// 添加指标刷新成功日志
-	//_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "")
-
 	resp := models.AddEdbInfoResp{
 		EdbInfoId:  edbInfo.EdbInfoId,
 		UniqueCode: edbInfo.UniqueCode,

+ 60 - 0
models/data_stat/edb_info_delete_log.go

@@ -0,0 +1,60 @@
+package data_stat
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+)
+
+// EdbInfoDeleteLog 指标删除日志表
+type EdbInfoDeleteLog struct {
+	Id                 uint64  `orm:"column(id);pk"`
+	EdbInfoId          int     `description:"指标ID"`
+	EdbInfoType        int     `description:"指标类型,0:普通指标,1:预测指标"`
+	SourceName         string  `description:"来源名称"`
+	Source             int     `description:"来源id"`
+	EdbCode            string  `description:"指标编码"`
+	EdbName            string  `description:"指标名称"`
+	EdbNameEn          string  `description:"英文指标名称"`
+	EdbNameSource      string  `description:"指标名称来源"`
+	Frequency          string  `description:"频率"`
+	Unit               string  `description:"单位"`
+	UnitEn             string  `description:"英文单位"`
+	StartDate          string  `description:"起始日期"`
+	EndDate            string  `description:"终止日期"`
+	SysUserId          int     `description:"创建人ID"`
+	SysUserRealName    string  `description:"创建人姓名"`
+	UniqueCode         string  `description:"指标唯一编码"`
+	EdbCreateTime      string  `description:"指标创建时间"`
+	EdbModifyTime      string  `description:"指标修改时间"`
+	CreateTime         string  `description:"创建时间即删除时间"`
+	MinValue           float64 `description:"指标最小值"`
+	MaxValue           float64 `description:"指标最大值"`
+	CalculateFormula   string  `description:"计算公式"`
+	EdbType            int     `description:"指标类型:1:基础指标,2:计算指标"`
+	LatestDate         string  `description:"数据最新日期"`
+	LatestValue        float64 `description:"数据最新值"`
+	MoveType           int     `description:"移动方式:1:领先(默认),2:滞后"`
+	MoveFrequency      string  `description:"移动频度"`
+	NoUpdate           int8    `description:"是否停止更新,0:继续更新;1:停止更新"`
+	ChartImage         string  `description:"图表图片"`
+	Calendar           string  `description:"公历/农历" orm:"default(公历);"`
+	DataDateType       string  `orm:"column(data_date_type);size(255);null;default(交易日)"`
+	ManualSave         int     `description:"是否有手动保存过上下限: 0-否; 1-是"`
+	TerminalCode       string  `description:"终端编码,用于配置在机器上"`
+	DelSysUserId       int     `description:"删除人ID"`
+	DelSysUserRealName string  `description:"删除人姓名"`
+	DataUpdateTime     string  `description:"最近一次数据发生变化的时间"`
+	ErDataUpdateDate   string  `description:"本次更新,数据发生变化的最早日期"`
+}
+
+type EdbInfoDeleteLogNum struct {
+	Source       int    `description:"来源id"`
+	TerminalCode string `description:"终端编码,用于配置在机器上"`
+	Num          int    `description:"被删除的指标数据总数"`
+}
+
+func GetEdbDeleteLogNumByCreateTime(startDate, endDate string) (item []*EdbInfoDeleteLogNum, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT source, terminal_code, count(*) as num FROM edb_info_delete_log WHERE create_time >= ? and create_time < ? group by source, terminal_code `
+	_, err = o.Raw(sql, startDate, endDate).QueryRows(&item)
+	return
+}

+ 0 - 1
models/data_stat/edb_info_update_stat.go

@@ -48,7 +48,6 @@ func (r *EdbInfoUpdateStat) Add(list []*EdbInfoUpdateStat) (err error) {
 // UpdateEdbUpdateStatMulti 批量更新
 func UpdateEdbUpdateStatMulti(list []*EdbInfoUpdateStat) (err error) {
 	o := orm.NewOrm()
-	// todo 更新
 	sql := `UPDATE edb_info_update_stat 
 SET 
 edb_name = ?,

+ 73 - 0
models/data_stat/edb_source_stat.go

@@ -0,0 +1,73 @@
+package data_stat
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// EdbSourceStat 数据源统计表
+type EdbSourceStat struct {
+	Id                   int       `orm:"column(id);pk"`
+	SourceName           string    `description:"来源名称"`
+	Source               int       `description:"来源id"`
+	TerminalCode         string    `description:"终端编码,用于配置在机器上"`
+	EdbNum               int       `description:"指标总数"`
+	EdbNewNum            int       `description:"今日新增指标数"`
+	EdbDelNum            int       `description:"今日删除指标数"`
+	NeedRefreshNum       int       `description:"今日需刷新指标"`
+	HasRefreshNum        int       `description:"今日发起刷新任务指标数"`
+	UpdateSuccessNum     int       `description:"今日已刷新成功指标数"`
+	UpdateFailedNum      int       `description:"今日已刷新失败指标数"`
+	DataUpdateSuccessNum int       `description:"今日已更新指标"`
+	CreateTime           time.Time `description:"创建时间"`
+	ModifyTime           time.Time `description:"修改时间"`
+}
+
+func GetEdbSourceStatByCondition(condition string, pars []interface{}) (item []*EdbSourceStat, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT * FROM edb_source_stat WHERE 1=1 `
+	if condition != "" {
+		sql += condition
+	}
+	_, err = o.Raw(sql, pars).QueryRows(&item)
+	return
+}
+
+// Add 新增
+func (r *EdbSourceStat) Add(list []*EdbSourceStat) (err error) {
+	o := orm.NewOrm()
+	_, err = o.InsertMulti(len(list), list)
+	return
+}
+
+// UpdateEdbSourceStatMulti 批量更新
+func UpdateEdbSourceStatMulti(list []*EdbSourceStat) (err error) {
+	o := orm.NewOrm()
+	sql := `UPDATE edb_source_stat 
+SET 
+edb_num=?,                              
+edb_new_num=?,                          
+edb_del_num=?,                          
+need_refresh_num=?,                      
+has_refresh_num=?,                       
+update_success_num=?,                   
+update_failed_num=?,                    
+data_update_success_num=?,                                         
+modify_time=?                           
+WHERE
+	id = ?`
+	p, err := o.Raw(sql).Prepare()
+	if err != nil {
+		return
+	}
+	defer func() {
+		_ = p.Close() // 别忘记关闭 statement
+	}()
+	for _, v := range list {
+		_, err = p.Exec(v.EdbNum, v.EdbNewNum, v.EdbDelNum, v.NeedRefreshNum, v.HasRefreshNum, v.UpdateFailedNum, v.DataUpdateSuccessNum, v.ModifyTime, v.Id)
+		if err != nil {
+			return
+		}
+	}
+	return
+}

+ 1 - 0
models/db.go

@@ -113,5 +113,6 @@ func initDataStat() {
 	orm.RegisterModel(
 		new(data_stat.EdbInfoUpdateLog),  // 指标更新/刷新日志列表
 		new(data_stat.EdbInfoUpdateStat), // 数据源明细表
+		new(data_stat.EdbSourceStat),     // 数据源统计表
 	)
 }

+ 8 - 0
models/edb_terminal.go

@@ -27,6 +27,14 @@ func GetEdbTerminalListBySource(source int) (items []*EdbTerminal, err error) {
 	return
 }
 
+// GetEdbTerminalFirstBySource 根据指标来源类型获取配置的首个终端信息
+func GetEdbTerminalFirstBySource(source int) (item *EdbTerminal, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT *  FROM edb_terminal WHERE source = ? and status=1 ORDER BY terminal_id ASC Limit 1 `
+	err = o.Raw(sql, source).QueryRow(&item)
+	return
+}
+
 // GetEdbTerminalByCode 根据终端编码获取终端信息
 func GetEdbTerminalByCode(terminalCode string) (item *EdbTerminal, err error) {
 	o := orm.NewOrm()

+ 1 - 1
services/base_from_lt.go

@@ -19,7 +19,7 @@ func GetEdbDataFromLt(edbCode, startDate, endDate, edbTerminalCode string) (data
 		err = errors.New("路透社接口未配置")
 		return
 	}*/
-	terminal, err := GetTerminal(utils.DATA_SOURCE_THS, edbTerminalCode)
+	terminal, err := GetTerminal(utils.DATA_SOURCE_LT, edbTerminalCode)
 	if err != nil {
 		err = fmt.Errorf("获取路透社接口配置出错 Err: %s", err)
 		return

+ 2 - 2
services/base_from_pb.go

@@ -15,7 +15,7 @@ func GetEdbDataFromPb(edbCode, startDate, endDate, edbTerminalCode string) (item
 		err = errors.New("彭博接口未配置")
 		return
 	}*/
-	terminal, err := GetTerminal(utils.DATA_SOURCE_THS, edbTerminalCode)
+	terminal, err := GetTerminal(utils.DATA_SOURCE_PB, edbTerminalCode)
 	if err != nil {
 		err = fmt.Errorf("获取彭博接口配置出错 Err: %s", err)
 		return
@@ -55,7 +55,7 @@ func GetEdbDataFromPbFinance(companyCode, edbCode, startDate, endDate, edbTermin
 		err = errors.New("彭博接口未配置")
 		return
 	}*/
-	terminal, err := GetTerminal(utils.DATA_SOURCE_THS, edbTerminalCode)
+	terminal, err := GetFirstTerminal(utils.DATA_SOURCE_PB_FINANCE, edbTerminalCode)
 	if err != nil {
 		err = fmt.Errorf("获取彭博接口配置出错 Err: %s", err)
 		return

+ 1 - 1
services/base_from_ths.go

@@ -178,7 +178,7 @@ type FutureGoodDataFromThsInterface struct {
 
 func GetFutureGoodDataFromThs(edbCode, startDate, endDate, edbTerminalCode string) (item future_good.FutureGoodDataFromThs, err error) {
 	// todo 商品获取终端逻辑修改,默认取第一个终端
-	terminal, err := GetTerminal(utils.DATA_SOURCE_THS, edbTerminalCode)
+	terminal, err := GetFirstTerminal(utils.DATA_SOURCE_THS, edbTerminalCode)
 	if err != nil {
 		err = fmt.Errorf("获取同花顺接口配置出错 Err: %s", err)
 		return

+ 116 - 3
services/edb_info_stat.go

@@ -55,8 +55,8 @@ func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason str
 	return
 }
 
-// MysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表
-func MysteelChemicalEdbInfoUpdateStat() (err error) {
+// SetMysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表
+func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
 	//查询钢联的所有指标信息
 	condition := " and source = ? and edb_info_id=101838"
 	var pars []interface{}
@@ -173,12 +173,125 @@ func MysteelChemicalEdbInfoUpdateStat() (err error) {
 	//判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
 	if len(addList) > 0 {
 		err = logStat.Add(addList)
-	} else if len(modifyList) > 0 {
+	}
+	if len(modifyList) > 0 {
 		err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
 	}
 	return
 }
 
 func checkEdbInfoNeedRefresh(edbInfoId int, frequency string) (needRefresh int, err error) {
+
+	return
+}
+
+// SetEdbSourceStat 定时统计数据源汇总表
+func SetEdbSourceStat() (err error) {
+	//查询钢联的所有指标信息
+	nowTime := time.Now()
+	today := time.Now().Format(utils.FormatDate)
+	nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
+
+	statCond := "  and  create_time >= ? and create_time < ?"
+	var statPars []interface{}
+	statPars = append(statPars, today, nextDay)
+	//查询当日钢联所有的统计数据
+	updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
+	if err != nil {
+		err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
+		return
+	}
+	updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat)
+	if len(updateStatList) > 0 {
+		for _, v := range updateStatList {
+			updateStatMap[v.TerminalCode] = append(updateStatMap[v.TerminalCode], v)
+		}
+	}
+
+	cond := "  and  create_time >= ? and create_time < ?"
+	var pars []interface{}
+	pars = append(pars, today, nextDay)
+	//查询当日钢联所有的统计数据
+	statList, err := data_stat.GetEdbSourceStatByCondition(cond, pars)
+	if err != nil {
+		err = fmt.Errorf("查询钢联化工数据源统计报错,err: %s", err)
+		return
+	}
+	statMap := make(map[string]*data_stat.EdbSourceStat)
+	if len(statList) > 0 {
+		for _, v := range statList {
+			statMap[v.TerminalCode] = v
+		}
+	}
+
+	// 查询今日被删除的指标数
+	delList, err := data_stat.GetEdbDeleteLogNumByCreateTime(today, nextDay)
+	if err != nil {
+		err = fmt.Errorf("查询今日被删除指标数目报错,err: %s", err)
+		return
+	}
+	delMap := make(map[string]int)
+	if len(delList) > 0 {
+		for _, v := range delList {
+			delMap[v.TerminalCode] = v.Num
+		}
+	}
+	logStat := new(data_stat.EdbSourceStat)
+	//组装新增数据
+	addList := make([]*data_stat.EdbSourceStat, 0)
+	modifyList := make([]*data_stat.EdbSourceStat, 0)
+
+	for terminalCode, list := range updateStatMap {
+		tmp := new(data_stat.EdbSourceStat)
+		for k, v := range list {
+			if k == 0 {
+				tmp.SourceName = v.SourceName
+				tmp.Source = v.Source
+				tmp.TerminalCode = v.TerminalCode
+				tmp.ModifyTime = nowTime
+			}
+
+			tmp.EdbNum = tmp.EdbNum + 1
+			if v.IsAdd == 1 {
+				tmp.EdbNewNum = tmp.EdbNewNum + 1
+			}
+			if v.NeedRefresh == 1 {
+				tmp.NeedRefreshNum = tmp.NeedRefreshNum + 1
+			}
+
+			if v.HasRefresh == 1 {
+				tmp.HasRefreshNum = tmp.HasRefreshNum + 1
+			}
+
+			if v.SourceUpdateResult == 1 {
+				tmp.UpdateSuccessNum = tmp.UpdateSuccessNum + 1
+			} else {
+				tmp.UpdateFailedNum = tmp.UpdateFailedNum + 1
+			}
+
+			// todo 数据更新成功和更新失败,与刷新成功和刷新失败的含义一致
+			tmp.DataUpdateSuccessNum = tmp.UpdateSuccessNum
+		}
+		// 处理今天删除的指标数量
+		if dn, ok := delMap[terminalCode]; ok {
+			tmp.EdbDelNum = dn
+		}
+		// 判断是否需要新增还是更新
+		if exist, ok := statMap[terminalCode]; ok {
+			tmp.Id = exist.Id
+			modifyList = append(modifyList, tmp)
+		} else {
+			tmp.CreateTime = nowTime
+			addList = append(addList, tmp)
+		}
+	}
+
+	//判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
+	if len(addList) > 0 {
+		err = logStat.Add(addList)
+	}
+	if len(modifyList) > 0 {
+		err = data_stat.UpdateEdbSourceStatMulti(modifyList)
+	}
 	return
 }

+ 21 - 1
services/edb_terminal.go

@@ -43,7 +43,7 @@ func GetTerminal(source int, oldTerminalCode string) (edbTerminal *models.EdbTer
 		terminalNumMap[v.ServerUrl] = v.Total
 	}
 
-	terminalList, err := models.GetEdbTerminalListBySource(utils.DATA_SOURCE_WIND)
+	terminalList, err := models.GetEdbTerminalListBySource(source)
 	if err != nil {
 		return
 	}
@@ -73,3 +73,23 @@ func GetTerminal(source int, oldTerminalCode string) (edbTerminal *models.EdbTer
 	}*/
 	return
 }
+
+func GetFirstTerminal(source int, oldTerminalCode string) (edbTerminal *models.EdbTerminal, err error) {
+	if oldTerminalCode != "" {
+		edbTerminal, err = models.GetEdbTerminalByCode(oldTerminalCode)
+		if err != nil {
+			return
+		}
+		return
+	}
+
+	edbTerminal, err = models.GetEdbTerminalFirstBySource(source)
+	if err != nil {
+		if err.Error() == utils.ErrNoRow() {
+			err = errors.New("终端未配置")
+			return
+		}
+		return
+	}
+	return
+}