package models import ( "errors" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/shopspring/decimal" "hongze/hongze_edb_lib/services" "hongze/hongze_edb_lib/utils" "strings" "time" ) // EdbDataPython python指标数据结构体 type EdbDataPython struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } // AddPythonEdb 新增python运算指标 func AddPythonEdb(edbInfoId int, edbCode string, item services.EdbDataFromPython) (err error) { var errMsg string o := orm.NewOrm() defer func() { if err != nil { go utils.SendEmail(utils.APP_NAME_CN+"【"+utils.RunMode+"】"+"失败提醒", " 同花顺数据获取失败:err:"+errMsg, utils.EmailSendToUsers) } }() var isAdd bool addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for k, dateTimeStr := range item.Date { //格式化时间 currentDate, tmpErr := time.Parse(utils.FormatDate, dateTimeStr) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) //值 val := item.Value[k] saveVal := utils.SubFloatToString(val, 20) addSql += GetAddSql(fmt.Sprint(edbInfoId), edbCode, dateTimeStr, timestampStr, saveVal) isAdd = true } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = o.Raw(addSql).Exec() if err != nil { errMsg = " tx.Exec Err :" + err.Error() return } } return } // EditPythonEdb 编辑python运算指标 func EditPythonEdb(edbInfoId int, edbCode string, item services.EdbDataFromPython) (err error) { var errMsg string o := orm.NewOrm() defer func() { if err != nil { go utils.SendEmail(utils.APP_NAME_CN+"【"+utils.RunMode+"】"+"失败提醒", " 同花顺数据获取失败:err:"+errMsg, utils.EmailSendToUsers) } }() var isAdd bool addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for k, dateTimeStr := range item.Date { //格式化时间 currentDate, tmpErr := time.Parse(utils.FormatDate, dateTimeStr) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) //值 val := item.Value[k] saveVal := utils.SubFloatToString(val, 20) addSql += GetAddSql(fmt.Sprint(edbInfoId), edbCode, dateTimeStr, timestampStr, saveVal) isAdd = true } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = o.Raw(addSql).Exec() if err != nil { errMsg = " tx.Exec Err :" + err.Error() return } } return } // RefreshAllPythonEdb 刷新所有 python运算指标 func RefreshAllPythonEdb(edbInfo *EdbInfo, item services.EdbDataFromPython) (err error) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { fmt.Println("RefreshAllPythonEdb,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() pythonDataMap := make(map[string]float64) pythonDate := make([]string, 0) for k, dateTimeStr := range item.Date { pythonDataMap[dateTimeStr] = item.Value[k] pythonDate = append(pythonDate, dateTimeStr) } //查询当前指标现有的数据 var condition string var pars []interface{} condition += " AND edb_info_id=? " pars = append(pars, edbInfo.EdbInfoId) //所有的数据 dataList, err := GetAllEdbDataPythonByEdbInfoId(edbInfo.EdbInfoId) if err != nil { return err } //待修改的指标数据map(index:日期,value:值) updateEdbDataMap := make(map[string]float64) removeDateList := make([]string, 0) //需要删除的日期 for _, v := range dataList { currDataTime := v.DataTime pythonData, ok := pythonDataMap[currDataTime] if !ok { // 如果python运算出来的数据中没有该日期,那么需要移除该日期的数据 removeDateList = append(removeDateList, currDataTime) } else { currValue, _ := decimal.NewFromFloat(pythonData).Truncate(4).Float64() //保留4位小数 //如果计算出来的值与库里面的值不匹配,那么就去修改该值 if v.Value != currValue { //将计算后的数据存入待拼接指标map里面,以便后续计算 updateEdbDataMap[currDataTime] = currValue } } //移除python指标数据中当天的日期 delete(pythonDataMap, currDataTime) } //sort.Strings(tbzEdbDataTimeList) //新增的数据入库 { addDataList := make([]*EdbDataPython, 0) for dataTime, dataValue := range pythonDataMap { //时间戳 currentDate, _ := time.Parse(utils.FormatDate, dataTime) timestamp := currentDate.UnixNano() / 1e6 edbDataPython := &EdbDataPython{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: dataTime, Value: dataValue, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataPython) } //最后如果还有需要新增的数据,那么就统一入库 if len(addDataList) > 0 { _, tmpErr := o.InsertMulti(len(addDataList), addDataList) if tmpErr != nil { err = tmpErr return } } } //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了) { if len(removeDateList) > 0 { removeDateStr := strings.Join(removeDateList, `","`) removeDateStr = `"` + removeDateStr + `"` //如果拼接指标变更了,那么需要删除所有的指标数据 tableName := GetEdbDataTableName(edbInfo.Source) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr) _, err = o.Raw(sql, edbInfo.EdbInfoId).Exec() if err != nil { err = errors.New("删除不存在的Python运算指标数据失败,Err:" + err.Error()) return } } } //修改现有的数据中对应的值 { tableName := GetEdbDataTableName(edbInfo.Source) for edbDate, edbDataValue := range updateEdbDataMap { sql := fmt.Sprintf(` UPDATE %s set value = ?,modify_time=now() WHERE edb_info_id = ? and data_time = ? `, tableName) _, err = o.Raw(sql, edbDataValue, edbInfo.EdbInfoId, edbDate).Exec() if err != nil { err = errors.New("更新现有的Python运算指标数据失败,Err:" + err.Error()) return } } } return } // GetAllEdbDataPythonByEdbInfoId 根据指标id获取全部的数据 func GetAllEdbDataPythonByEdbInfoId(edbInfoId int) (items []*EdbDataPython, err error) { o := orm.NewOrm() sql := ` SELECT * FROM edb_data_python WHERE edb_info_id=? ORDER BY data_time DESC ` _, err = o.Raw(sql, edbInfoId).QueryRows(&items) return } // EdbInfoPythonSaveReq 计算(运算)指标请求参数 type EdbInfoPythonSaveReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbName string `description:"指标名称"` Frequency string `description:"频率"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` CalculateFormula string `description:"计算公式"` EdbInfoIdArr []struct { EdbInfoId int `description:"指标id"` FromTag string `description:"指标对应标签"` } } // ExecPythonEdbReq 执行python代码运算指标的请求参数 type ExecPythonEdbReq struct { PythonCode string `description:"python代码"` } // AddPythonEdbReq 添加python代码运算指标的请求参数 type AddPythonEdbReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbInfoId int `description:"指标id"` EdbName string `description:"指标名称"` Frequency string `description:"频度"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` PythonCode string `description:"python代码"` }