package models import ( "errors" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/shopspring/decimal" "strings" "time" ) // EdbDataPython python指标数据结构体 type EdbDataPython struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } // EdbDataFromPython 通过python代码获取到的指标数据 type EdbDataFromPython struct { Date map[int]string `json:"date"` Value map[int]float64 `json:"value"` } // AddPythonEdb 新增python运算指标 func AddPythonEdb(edbInfo *EdbInfo, item EdbDataFromPython, edbInfoList []*EdbInfo) (err error) { o := orm.NewOrm() //添加指标关系 for _, tmpEdbInfo := range edbInfoList { calculateMappingItem := new(EdbInfoCalculateMapping) calculateMappingItem.CreateTime = time.Now() calculateMappingItem.ModifyTime = time.Now() calculateMappingItem.Sort = 1 calculateMappingItem.EdbCode = edbInfo.EdbCode calculateMappingItem.EdbInfoId = edbInfo.EdbInfoId calculateMappingItem.FromEdbInfoId = tmpEdbInfo.EdbInfoId calculateMappingItem.FromEdbCode = tmpEdbInfo.EdbCode calculateMappingItem.FromEdbName = tmpEdbInfo.EdbName calculateMappingItem.FromSource = tmpEdbInfo.Source calculateMappingItem.FromSourceName = tmpEdbInfo.SourceName calculateMappingItem.Source = edbInfo.Source calculateMappingItem.SourceName = edbInfo.SourceName _, err = o.Insert(calculateMappingItem) if err != nil { return } } var isAdd bool addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for k, dateTimeStr := range item.Date { //格式化时间 currentDate, tmpErr := time.ParseInLocation(utils.FormatDate, dateTimeStr, time.Local) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) //值 val := item.Value[k] saveVal := utils.SubFloatToString(val, 20) addSql += GetAddSql(fmt.Sprint(edbInfo.EdbInfoId), edbInfo.EdbCode, dateTimeStr, timestampStr, saveVal) isAdd = true } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = o.Raw(addSql).Exec() if err != nil { return } } return } // EditPythonEdb 编辑python运算指标 func EditPythonEdb(edbInfoId int, edbCode string, item EdbDataFromPython) (err error) { o := orm.NewOrm() var isAdd bool addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for k, dateTimeStr := range item.Date { //格式化时间 currentDate, tmpErr := time.ParseInLocation(utils.FormatDate, dateTimeStr, time.Local) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) //值 val := item.Value[k] saveVal := utils.SubFloatToString(val, 20) addSql += GetAddSql(fmt.Sprint(edbInfoId), edbCode, dateTimeStr, timestampStr, saveVal) isAdd = true } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = o.Raw(addSql).Exec() if err != nil { return } } return } // RefreshAllPythonEdb 刷新所有 python运算指标 func RefreshAllPythonEdb(edbInfo *EdbInfo, item EdbDataFromPython) (err error) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { fmt.Println("RefreshAllPythonEdb,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() pythonDataMap := make(map[string]float64) pythonDate := make([]string, 0) for k, dateTimeStr := range item.Date { pythonDataMap[dateTimeStr] = item.Value[k] pythonDate = append(pythonDate, dateTimeStr) } //查询当前指标现有的数据 var condition string var pars []interface{} condition += " AND edb_info_id=? " pars = append(pars, edbInfo.EdbInfoId) //所有的数据 dataList, err := GetAllEdbDataPythonByEdbInfoId(edbInfo.EdbInfoId) if err != nil { return err } //待修改的指标数据map(index:日期,value:值) updateEdbDataMap := make(map[string]float64) removeDateList := make([]string, 0) //需要删除的日期 for _, v := range dataList { currDataTime := v.DataTime pythonData, ok := pythonDataMap[currDataTime] if !ok { // 如果python运算出来的数据中没有该日期,那么需要移除该日期的数据 removeDateList = append(removeDateList, currDataTime) } else { currValue, _ := decimal.NewFromFloat(pythonData).Truncate(4).Float64() //保留4位小数 //如果计算出来的值与库里面的值不匹配,那么就去修改该值 if v.Value != currValue { //将计算后的数据存入待拼接指标map里面,以便后续计算 updateEdbDataMap[currDataTime] = currValue } } //移除python指标数据中当天的日期 delete(pythonDataMap, currDataTime) } //sort.Strings(tbzEdbDataTimeList) //新增的数据入库 { addDataList := make([]*EdbDataPython, 0) for dataTime, dataValue := range pythonDataMap { //时间戳 currentDate, _ := time.ParseInLocation(utils.FormatDate, dataTime, time.Local) timestamp := currentDate.UnixNano() / 1e6 edbDataPython := &EdbDataPython{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: dataTime, Value: dataValue, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataPython) } //最后如果还有需要新增的数据,那么就统一入库 if len(addDataList) > 0 { _, tmpErr := o.InsertMulti(len(addDataList), addDataList) if tmpErr != nil { err = tmpErr return } } } //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了) { if len(removeDateList) > 0 { removeDateStr := strings.Join(removeDateList, `","`) removeDateStr = `"` + removeDateStr + `"` //如果拼接指标变更了,那么需要删除所有的指标数据 tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr) _, err = o.Raw(sql, edbInfo.EdbInfoId).Exec() if err != nil { err = errors.New("删除不存在的Python运算指标数据失败,Err:" + err.Error()) return } } } //修改现有的数据中对应的值 { tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) for edbDate, edbDataValue := range updateEdbDataMap { sql := fmt.Sprintf(` UPDATE %s set value = ?,modify_time=now() WHERE edb_info_id = ? and data_time = ? `, tableName) _, err = o.Raw(sql, edbDataValue, edbInfo.EdbInfoId, edbDate).Exec() if err != nil { err = errors.New("更新现有的Python运算指标数据失败,Err:" + err.Error()) return } } } return } // EditEdbInfoCalculateMapping 更新关联关系表 func EditEdbInfoCalculateMapping(edbInfo *EdbInfo, edbInfoList []*EdbInfo) (err error) { o := orm.NewOrm() var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) //查询出所有的关联指标 existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars) if err != nil { err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error()) return } existEdbInfoIdMap := make(map[int]int) isOpEdbInfoIdMap := make(map[int]int) for _, v := range existList { existEdbInfoIdMap[v.FromEdbInfoId] = v.FromEdbInfoId } //添加指标关系 for _, tmpEdbInfo := range edbInfoList { //如果该指标id已经处理过了,那么就不处理了 if _, ok := isOpEdbInfoIdMap[tmpEdbInfo.EdbInfoId]; ok { continue } if _, ok := existEdbInfoIdMap[tmpEdbInfo.EdbInfoId]; ok { //如果存在,那么就移除map里面的东西 delete(existEdbInfoIdMap, tmpEdbInfo.EdbInfoId) isOpEdbInfoIdMap[tmpEdbInfo.EdbInfoId] = tmpEdbInfo.EdbInfoId } else { calculateMappingItem := new(EdbInfoCalculateMapping) calculateMappingItem.CreateTime = time.Now() calculateMappingItem.ModifyTime = time.Now() calculateMappingItem.Sort = 1 calculateMappingItem.EdbCode = edbInfo.EdbCode calculateMappingItem.EdbInfoId = edbInfo.EdbInfoId calculateMappingItem.FromEdbInfoId = tmpEdbInfo.EdbInfoId calculateMappingItem.FromEdbCode = tmpEdbInfo.EdbCode calculateMappingItem.FromEdbName = tmpEdbInfo.EdbName calculateMappingItem.FromSource = tmpEdbInfo.Source calculateMappingItem.FromSourceName = tmpEdbInfo.SourceName calculateMappingItem.Source = edbInfo.Source calculateMappingItem.SourceName = edbInfo.SourceName _, err = o.Insert(calculateMappingItem) if err != nil { return } } } for _, v := range existEdbInfoIdMap { //删除,计算指标关联的,基础指标的关联关系 sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id=?` _, err = o.Raw(sql, edbInfo.EdbInfoId, v).Exec() if err != nil { err = errors.New("删除计算指标关联关系失败,Err:" + err.Error()) return } } return } // GetAllEdbDataPythonByEdbInfoId 根据指标id获取全部的数据 func GetAllEdbDataPythonByEdbInfoId(edbInfoId int) (items []*EdbDataPython, err error) { o := orm.NewOrm() sql := ` SELECT * FROM edb_data_python WHERE edb_info_id=? ORDER BY data_time DESC ` _, err = o.Raw(sql, edbInfoId).QueryRows(&items) return } // EdbInfoPythonSaveReq 计算(运算)指标请求参数 type EdbInfoPythonSaveReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbName string `description:"指标名称"` Frequency string `description:"频率"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` CalculateFormula string `description:"计算公式"` EdbInfoIdArr []struct { EdbInfoId int `description:"指标id"` FromTag string `description:"指标对应标签"` } } // ExecPythonEdbReq 执行python代码运算指标的请求参数 type ExecPythonEdbReq struct { PythonCode string `description:"python代码"` } // AddPythonEdbReq 添加python代码运算指标的请求参数 type AddPythonEdbReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbInfoId int `description:"指标id"` EdbName string `description:"指标名称"` Frequency string `description:"频度"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` PythonCode string `description:"python代码"` } // AnalysisPythonCode 解析Python代码,获取关联code func AnalysisPythonCode(pythonCode, edbName string) (edbInfoList []*EdbInfo) { tmpEdbCodeList := make([]string, 0) //临时指标code edbCodeLen := 0 //指标数 tmpList := strings.Split(pythonCode, "\n") for _, v := range tmpList { if strings.Contains(v, "edb_code") { edbCodeLen++ tmpCodeStrList := strings.Split(v, "edb_code") if len(tmpCodeStrList) > 1 { //根据单引号获取 tmpCodeStrList2 := strings.Split(tmpCodeStrList[1], "'") if len(tmpCodeStrList2) > 1 { if tmpCodeStrList2[1] != "" { tmpEdbCodeList = append(tmpEdbCodeList, tmpCodeStrList2[1]) } } //根据双引号获取 tmpCodeStrList3 := strings.Split(tmpCodeStrList[1], `"`) if len(tmpCodeStrList3) > 1 { if tmpCodeStrList3[1] != "" { tmpEdbCodeList = append(tmpEdbCodeList, tmpCodeStrList3[1]) } } } } } for _, v := range tmpEdbCodeList { fmt.Println(v) item, _ := GetEdbInfoOnlyByEdbCode(v) if item != nil { edbInfoList = append(edbInfoList, item) } } if len(edbInfoList) != edbCodeLen { //code匹配失败,需要短信提醒 go alarm_msg.SendAlarmMsg(fmt.Sprintf("python代码关联指标匹配失败,指标名称:%s;实际关联%d个,匹配上%d个", edbName, edbCodeLen, len(edbInfoList)), 3) } return }