package models import ( "errors" "eta_gn/eta_index_lib/global" "eta_gn/eta_index_lib/services/alarm_msg" "eta_gn/eta_index_lib/utils" "fmt" "github.com/shopspring/decimal" "strings" "time" ) type EdbDataPython struct { EdbDataId int `gorm:"column:edb_data_id;primaryKey"` // 指标数据ID EdbInfoId int `gorm:"column:edb_info_id"` // 指标信息ID EdbCode string `gorm:"column:edb_code"` // 指标编码 DataTime string `gorm:"column:data_time"` // 数据时间 Value float64 `gorm:"column:value"` // 数据值 CreateTime time.Time `gorm:"column:create_time"` // 创建时间 ModifyTime time.Time `gorm:"column:modify_time"` // 修改时间 DataTimestamp int64 `gorm:"column:data_timestamp"` // 数据时间戳 } func (m *EdbDataPython) TableName() string { return "edb_data_python" } type EdbDataFromPython struct { Date map[int]string `json:"date"` Value map[int]float64 `json:"value"` } func AddPythonEdb(edbInfo *EdbInfo, item EdbDataFromPython, edbInfoList []*EdbInfo) (err error) { for _, tmpEdbInfo := range edbInfoList { calculateMappingItem := new(EdbInfoCalculateMapping) calculateMappingItem.CreateTime = time.Now() calculateMappingItem.ModifyTime = time.Now() calculateMappingItem.Sort = 1 calculateMappingItem.EdbCode = edbInfo.EdbCode calculateMappingItem.EdbInfoId = edbInfo.EdbInfoId calculateMappingItem.FromEdbInfoId = tmpEdbInfo.EdbInfoId calculateMappingItem.FromEdbCode = tmpEdbInfo.EdbCode calculateMappingItem.FromEdbName = tmpEdbInfo.EdbName calculateMappingItem.FromSource = tmpEdbInfo.Source calculateMappingItem.FromSourceName = tmpEdbInfo.SourceName calculateMappingItem.Source = edbInfo.Source calculateMappingItem.SourceName = edbInfo.SourceName err = global.DEFAULT_DmSQL.Create(calculateMappingItem).Error if err != nil { return } } var isAdd bool addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for k, dateTimeStr := range item.Date { currentDate, tmpErr := time.ParseInLocation(utils.FormatDate, dateTimeStr, time.Local) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) val := item.Value[k] saveVal := utils.SubFloatToString(val, 20) addSql += GetAddSql(fmt.Sprint(edbInfo.EdbInfoId), edbInfo.EdbCode, dateTimeStr, timestampStr, saveVal) isAdd = true } if isAdd { addSql = strings.TrimRight(addSql, ",") err = global.DEFAULT_DmSQL.Exec(addSql).Error if err != nil { return } } return } func RefreshAllPythonEdb(edbInfo *EdbInfo, item EdbDataFromPython) (err error) { pythonDataMap := make(map[string]float64) pythonDate := make([]string, 0) for k, dateTimeStr := range item.Date { pythonDataMap[dateTimeStr] = item.Value[k] pythonDate = append(pythonDate, dateTimeStr) } var condition string var pars []interface{} condition += " AND edb_info_id=? " pars = append(pars, edbInfo.EdbInfoId) dataList, err := GetAllEdbDataPythonByEdbInfoId(edbInfo.EdbInfoId) if err != nil { return err } updateEdbDataMap := make(map[string]float64) removeDateList := make([]string, 0) //需要删除的日期 for _, v := range dataList { currDataTime := v.DataTime pythonData, ok := pythonDataMap[currDataTime] if !ok { removeDateList = append(removeDateList, currDataTime) } else { currValue, _ := decimal.NewFromFloat(pythonData).Truncate(4).Float64() //保留4位小数 if v.Value != currValue { updateEdbDataMap[currDataTime] = currValue } } delete(pythonDataMap, currDataTime) } { addDataList := make([]*EdbDataPython, 0) for dataTime, dataValue := range pythonDataMap { currentDate, _ := time.ParseInLocation(utils.FormatDate, dataTime, time.Local) timestamp := currentDate.UnixNano() / 1e6 edbDataPython := &EdbDataPython{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: dataTime, Value: dataValue, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataPython) } if len(addDataList) > 0 { tmpErr := global.DEFAULT_DmSQL.CreateInBatches(addDataList, 500).Error if tmpErr != nil { err = tmpErr return } } } { if len(removeDateList) > 0 { removeDateStr := strings.Join(removeDateList, `","`) removeDateStr = `"` + removeDateStr + `"` tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr) err = global.DEFAULT_DmSQL.Exec(sql, edbInfo.EdbInfoId).Error if err != nil { err = errors.New("删除不存在的Python运算指标数据失败,Err:" + err.Error()) return } } } { tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) for edbDate, edbDataValue := range updateEdbDataMap { sql := fmt.Sprintf(` UPDATE %s set value = ?,modify_time=now() WHERE edb_info_id = ? and data_time = ? `, tableName) err = global.DEFAULT_DmSQL.Exec(sql, edbDataValue, edbInfo.EdbInfoId, edbDate).Error if err != nil { err = errors.New("更新现有的Python运算指标数据失败,Err:" + err.Error()) return } } } return } func EditEdbInfoCalculateMapping(edbInfo *EdbInfo, edbInfoList []*EdbInfo) (err error) { var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars) if err != nil { err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error()) return } existEdbInfoIdMap := make(map[int]int) isOpEdbInfoIdMap := make(map[int]int) for _, v := range existList { existEdbInfoIdMap[v.FromEdbInfoId] = v.FromEdbInfoId } for _, tmpEdbInfo := range edbInfoList { if _, ok := isOpEdbInfoIdMap[tmpEdbInfo.EdbInfoId]; ok { continue } if _, ok := existEdbInfoIdMap[tmpEdbInfo.EdbInfoId]; ok { delete(existEdbInfoIdMap, tmpEdbInfo.EdbInfoId) isOpEdbInfoIdMap[tmpEdbInfo.EdbInfoId] = tmpEdbInfo.EdbInfoId } else { calculateMappingItem := new(EdbInfoCalculateMapping) calculateMappingItem.CreateTime = time.Now() calculateMappingItem.ModifyTime = time.Now() calculateMappingItem.Sort = 1 calculateMappingItem.EdbCode = edbInfo.EdbCode calculateMappingItem.EdbInfoId = edbInfo.EdbInfoId calculateMappingItem.FromEdbInfoId = tmpEdbInfo.EdbInfoId calculateMappingItem.FromEdbCode = tmpEdbInfo.EdbCode calculateMappingItem.FromEdbName = tmpEdbInfo.EdbName calculateMappingItem.FromSource = tmpEdbInfo.Source calculateMappingItem.FromSourceName = tmpEdbInfo.SourceName calculateMappingItem.Source = edbInfo.Source calculateMappingItem.SourceName = edbInfo.SourceName err = global.DEFAULT_DmSQL.Create(calculateMappingItem).Error if err != nil { return } } } for _, v := range existEdbInfoIdMap { sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id=?` err = global.DEFAULT_DmSQL.Exec(sql, edbInfo.EdbInfoId, v).Error if err != nil { err = errors.New("删除计算指标关联关系失败,Err:" + err.Error()) return } } return } func GetAllEdbDataPythonByEdbInfoId(edbInfoId int) (items []*EdbDataPython, err error) { sql := ` SELECT * FROM edb_data_python WHERE edb_info_id=? ORDER BY data_time DESC ` err = global.DEFAULT_DmSQL.Raw(sql, edbInfoId).Scan(&items).Error return } type EdbInfoPythonSaveReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbName string `description:"指标名称"` Frequency string `description:"频率"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` CalculateFormula string `description:"计算公式"` EdbInfoIdArr []struct { EdbInfoId int `description:"指标id"` FromTag string `description:"指标对应标签"` } } type ExecPythonEdbReq struct { PythonCode string `description:"python代码"` } type AddPythonEdbReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbInfoId int `description:"指标id"` EdbName string `description:"指标名称"` Frequency string `description:"频度"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` PythonCode string `description:"python代码"` } func AnalysisPythonCode(pythonCode, edbName string) (edbInfoList []*EdbInfo) { tmpEdbCodeList := make([]string, 0) //临时指标code edbCodeLen := 0 //指标数 tmpList := strings.Split(pythonCode, "\n") for _, v := range tmpList { if strings.Contains(v, "edb_code") { edbCodeLen++ tmpCodeStrList := strings.Split(v, "edb_code") if len(tmpCodeStrList) > 1 { tmpCodeStrList2 := strings.Split(tmpCodeStrList[1], "'") if len(tmpCodeStrList2) > 1 { if tmpCodeStrList2[1] != "" { tmpEdbCodeList = append(tmpEdbCodeList, tmpCodeStrList2[1]) } } tmpCodeStrList3 := strings.Split(tmpCodeStrList[1], `"`) if len(tmpCodeStrList3) > 1 { if tmpCodeStrList3[1] != "" { tmpEdbCodeList = append(tmpEdbCodeList, tmpCodeStrList3[1]) } } } } } for _, v := range tmpEdbCodeList { fmt.Println(v) item, _ := GetEdbInfoOnlyByEdbCode(v) if item != nil { edbInfoList = append(edbInfoList, item) } } if len(edbInfoList) != edbCodeLen { go alarm_msg.SendAlarmMsg(fmt.Sprintf("python代码关联指标匹配失败,指标名称:%s;实际关联%d个,匹配上%d个", edbName, edbCodeLen, len(edbInfoList)), 3) } return }