package stl import ( "encoding/json" "errors" "eta/eta_api/models/data_manage" "eta/eta_api/models/data_manage/stl" "eta/eta_api/models/data_manage/stl/request" "eta/eta_api/models/data_manage/stl/response" "eta/eta_api/services/data" "eta/eta_api/services/data/data_manage_permission" "eta/eta_api/services/elastic" "eta/eta_api/utils" "fmt" "os" "os/exec" "path/filepath" "strconv" "strings" "time" "github.com/rdlucklib/rdluck_tools/paging" "github.com/shopspring/decimal" "github.com/tealeg/xlsx" ) const ( ALL_DATE = iota + 1 LAST_N_YEARS RANGE_DATE RANGE_DATE_TO_NOW ) var EDB_DATA_CALCULATE_STL_TREND_CACHE = `eta:stl_decompose:trend:config_id:` var EDB_DATA_CALCULATE_STL_SEASONAL_CACHE = `eta:stl_decompose:seasonal:config_id:` var EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE = `eta:stl_decompose:residual:config_id:` func GenerateStlEdbData(req *request.StlConfigReq, adminId int) (resp *response.StlPreviewResp, msg string, err error) { config, err := stl.GetCalculateStlConfigById(req.CalculateStlConfigId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "配置信息不存在,请重新计算" return } msg = "获取配置信息失败" return } var confReq request.StlConfigReq if err = json.Unmarshal([]byte(config.Config), &confReq); err != nil { msg = "预览失败" err = fmt.Errorf("配置信息解析失败, err:%s", err.Error()) return } edbInfo, err := data_manage.GetEdbInfoById(confReq.EdbInfoId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "指标不存在" return } msg = "获取指标信息失败" return } var condition string var pars []interface{} switch confReq.DataRangeType { case ALL_DATE: case LAST_N_YEARS: condition += " AND data_time >=?" year := time.Now().Year() lastNyear, er := strconv.Atoi(req.LastNYear) if er != nil { msg = "最近N年输入不合法" err = er return } if lastNyear <= 0 { msg = "最近N年输入不合法" err = fmt.Errorf("最近N年输入不合法") return } lastNyear = lastNyear - 1 lastDate := time.Date(year-lastNyear, 1, 1, 0, 0, 0, 0, time.Local) pars = append(pars, lastDate) case RANGE_DATE: condition = " AND data_time >=? AND data_time <=?" pars = append(pars, confReq.StartDate, confReq.EndDate) case RANGE_DATE_TO_NOW: condition = " AND data_time >=?" pars = append(pars, confReq.StartDate) } condition += " AND edb_code =?" pars = append(pars, edbInfo.EdbCode) edbData, err := data_manage.GetAllEdbDataListByCondition(condition, pars, edbInfo.Source, edbInfo.SubSource) if err != nil { msg = "获取指标数据失败" return } var condMsg string if confReq.Period < 2 || confReq.Period > len(edbData) { condMsg += "period必须是一个大于等于2的正整数,且必须小于时间序列的长度" } if confReq.Seasonal < 3 || confReq.Seasonal%2 == 0 || confReq.Seasonal <= confReq.Period { if condMsg != "" { condMsg += "\n" } condMsg += "seasonal必须是一个大于等于3的奇整数,且必须大于period" } if confReq.Trend < 3 || confReq.Trend%2 == 0 || confReq.Trend <= confReq.Period { if condMsg != "" { condMsg += "\n" } condMsg += "trend必须是一个大于等于3的奇整数,且必须大于period" } if confReq.Fraction < 0 || confReq.Fraction > 1 { if condMsg != "" { condMsg += "\n" } condMsg += "fraction必须是一个介于[0-1]之间" } if 1 > confReq.TrendDeg || confReq.TrendDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "trend_deg请设置成1-5的整数" } if 1 > confReq.SeasonalDeg || confReq.SeasonalDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "seasonal_deg请设置成1-5的整数" } if 1 > confReq.LowPassDeg || confReq.LowPassDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "low_pass_deg请设置成1-5的整数" } if condMsg != "" { msg = condMsg err = fmt.Errorf("参数错误") return } dir, _ := os.Executable() exPath := filepath.Dir(dir) + "/static/stl_tmp" err = CheckOsPathAndMake(exPath) if err != nil { msg = "计算失败" return } loadFilePath := exPath + "/" + strconv.Itoa(adminId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + ".xlsx" err = SaveToExcel(edbData, loadFilePath) if err != nil { msg = "保存数据到Excel失败" return } defer os.Remove(loadFilePath) saveFilePath := exPath + "/" + strconv.Itoa(adminId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + "_res" + ".xlsx" result, err := execStlPythonCode(loadFilePath, saveFilePath, confReq.Period, confReq.Seasonal, confReq.Trend, confReq.TrendDeg, confReq.SeasonalDeg, confReq.LowPassDeg, confReq.Fraction, confReq.Robust) if err != nil { msg = "计算失败,请重新选择指标和参数后计算" return } trendChart, seasonalChart, residualChart, err := ParseStlExcel(saveFilePath) if err != nil { msg = "解析Excel失败" return } defer os.Remove(saveFilePath) resp = new(response.StlPreviewResp) resp.OriginEdbInfo.EdbInfoId = edbInfo.EdbInfoId resp.OriginEdbInfo.Title = edbInfo.EdbName resp.OriginEdbInfo.ClassifyId = edbInfo.ClassifyId resp.OriginEdbInfo.MaxData = edbInfo.MaxValue resp.OriginEdbInfo.MinData = edbInfo.MinValue resp.OriginEdbInfo.Frequency = edbInfo.Frequency resp.OriginEdbInfo.Unit = edbInfo.Unit resp.OriginEdbInfo.DataList = formatEdbData(edbData) resp.TrendChartInfo.DataList = trendChart.DataList resp.TrendChartInfo.MaxData = trendChart.MaxData resp.TrendChartInfo.MinData = trendChart.MinData resp.TrendChartInfo.Title = edbInfo.EdbName + "Trend" resp.TrendChartInfo.ClassifyId = edbInfo.ClassifyId resp.TrendChartInfo.Frequency = edbInfo.Frequency resp.TrendChartInfo.Unit = edbInfo.Unit resp.SeasonalChartInfo.DataList = seasonalChart.DataList resp.SeasonalChartInfo.MaxData = seasonalChart.MaxData resp.SeasonalChartInfo.MinData = seasonalChart.MinData resp.SeasonalChartInfo.ClassifyId = edbInfo.ClassifyId resp.SeasonalChartInfo.Title = edbInfo.EdbName + "Seasonal" resp.SeasonalChartInfo.Frequency = edbInfo.Frequency resp.SeasonalChartInfo.Unit = edbInfo.Unit resp.ResidualChartInfo.DataList = residualChart.DataList resp.ResidualChartInfo.MaxData = residualChart.MaxData resp.ResidualChartInfo.MinData = residualChart.MinData resp.ResidualChartInfo.ClassifyId = edbInfo.ClassifyId resp.ResidualChartInfo.Title = edbInfo.EdbName + "Residual" resp.ResidualChartInfo.Frequency = edbInfo.Frequency resp.ResidualChartInfo.Unit = edbInfo.Unit resp.EvaluationResult.Mean = strconv.FormatFloat(result.ResidualMean, 'f', 4, 64) resp.EvaluationResult.Std = strconv.FormatFloat(result.ResidualVar, 'f', 4, 64) resp.EvaluationResult.AdfPValue = strconv.FormatFloat(result.AdfPValue, 'f', -1, 64) resp.EvaluationResult.LjungBoxPValue = strconv.FormatFloat(result.LbTestPValue, 'f', -1, 64) confMapping, err := stl.GetCalculateStlConfigMappingByConfigId(req.CalculateStlConfigId) if err != nil { msg = "获取配置信息失败" return } var relationEdbInfoId []int for _, mapping := range confMapping { switch mapping.StlEdbType { case 1: resp.TrendChartInfo.EdbInfoId = mapping.EdbInfoId relationEdbInfoId = append(relationEdbInfoId, mapping.EdbInfoId) case 2: resp.SeasonalChartInfo.EdbInfoId = mapping.EdbInfoId relationEdbInfoId = append(relationEdbInfoId, mapping.EdbInfoId) case 3: resp.ResidualChartInfo.EdbInfoId = mapping.EdbInfoId relationEdbInfoId = append(relationEdbInfoId, mapping.EdbInfoId) } } relationEdbInfo, err := data_manage.GetEdbInfoByIdList(relationEdbInfoId) if err != nil { msg = "获取关联指标信息失败" return } for _, info := range relationEdbInfo { switch info.EdbInfoId { case resp.TrendChartInfo.EdbInfoId: resp.TrendChartInfo.Title = info.EdbName resp.TrendChartInfo.ClassifyId = info.ClassifyId resp.TrendChartInfo.Frequency = info.Frequency resp.TrendChartInfo.Unit = info.Unit case resp.SeasonalChartInfo.EdbInfoId: resp.SeasonalChartInfo.Title = info.EdbName resp.SeasonalChartInfo.ClassifyId = info.ClassifyId resp.SeasonalChartInfo.Frequency = info.Frequency resp.SeasonalChartInfo.Unit = info.Unit case resp.ResidualChartInfo.EdbInfoId: resp.ResidualChartInfo.Title = info.EdbName resp.ResidualChartInfo.ClassifyId = info.ClassifyId resp.ResidualChartInfo.Frequency = info.Frequency resp.ResidualChartInfo.Unit = info.Unit } } bTrend, _ := json.Marshal(trendChart.DataList) bSeasonal, _ := json.Marshal(seasonalChart.DataList) bResidual, _ := json.Marshal(residualChart.DataList) err = utils.Rc.Put(EDB_DATA_CALCULATE_STL_TREND_CACHE+strconv.Itoa(config.CalculateStlConfigId), bTrend, time.Hour*2) if err != nil { msg = "计算失败,请重新计算" return } err = utils.Rc.Put(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE+strconv.Itoa(config.CalculateStlConfigId), bSeasonal, time.Hour*2) if err != nil { msg = "计算失败,请重新计算" return } utils.Rc.Put(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE+strconv.Itoa(config.CalculateStlConfigId), bResidual, time.Hour*2) if err != nil { msg = "计算失败,请重新计算" } return } func formatEdbData(items []*data_manage.EdbData) []*response.EdbData { res := make([]*response.EdbData, 0, len(items)) for _, item := range items { t, _ := time.Parse(utils.FormatDate, item.DataTime) res = append(res, &response.EdbData{ DataTime: item.DataTime, Value: item.Value, DataTimestamp: t.UnixMilli(), }) } return res } func CheckOsPathAndMake(path string) (err error) { if _, er := os.Stat(path); os.IsNotExist(er) { err = os.MkdirAll(path, os.ModePerm) } return } func ParseStlExcel(excelPath string) (TrendChart, SeasonalChart, ResidualChart response.ChartEdbInfo, err error) { file, err := xlsx.OpenFile(excelPath) if err != nil { return } for _, sheet := range file.Sheets { switch sheet.Name { case "季节": var MinData, MaxData float64 for i, row := range sheet.Rows { if i == 0 { continue } var date string var dataTimestamp int64 if row.Cells[0].Type() == xlsx.CellTypeNumeric { dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64) tmpTime := xlsx.TimeFromExcelTime(dataNum, false) date = tmpTime.Format(utils.FormatDate) dataTimestamp = tmpTime.UnixMilli() } else { timeDate, _ := time.Parse(utils.FormatDateTime, date) date = timeDate.Format(utils.FormatDate) dataTimestamp = timeDate.UnixMilli() } fv, _ := row.Cells[1].Float() if MinData == 0 || fv < MinData { MinData = fv } if MaxData == 0 || fv > MaxData { MaxData = fv } fv, _ = decimal.NewFromFloat(fv).Round(4).Float64() SeasonalChart.DataList = append(SeasonalChart.DataList, &response.EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp}) } SeasonalChart.MinData = MinData SeasonalChart.MaxData = MaxData case "趋势": var MinData, MaxData float64 for i, row := range sheet.Rows { if i == 0 { continue } var date string var dataTimestamp int64 if row.Cells[0].Type() == xlsx.CellTypeNumeric { dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64) tmpTime := xlsx.TimeFromExcelTime(dataNum, false) date = tmpTime.Format(utils.FormatDate) dataTimestamp = tmpTime.UnixMilli() } else { timeDate, _ := time.Parse(utils.FormatDateTime, date) date = timeDate.Format(utils.FormatDate) dataTimestamp = timeDate.UnixMilli() } fv, _ := row.Cells[1].Float() if MinData == 0 || fv < MinData { MinData = fv } if MaxData == 0 || fv > MaxData { MaxData = fv } fv, _ = decimal.NewFromFloat(fv).Round(4).Float64() TrendChart.DataList = append(TrendChart.DataList, &response.EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp}) } TrendChart.MaxData = MaxData TrendChart.MinData = MinData case "残差": var MinData, MaxData float64 for i, row := range sheet.Rows { if i == 0 { continue } var date string var dataTimestamp int64 if row.Cells[0].Type() == xlsx.CellTypeNumeric { dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64) tmpTime := xlsx.TimeFromExcelTime(dataNum, false) date = tmpTime.Format(utils.FormatDate) dataTimestamp = tmpTime.UnixMilli() } else { timeDate, _ := time.Parse(utils.FormatDateTime, date) date = timeDate.Format(utils.FormatDate) dataTimestamp = timeDate.UnixMilli() } fv, _ := row.Cells[1].Float() if MinData == 0 || fv < MinData { MinData = fv } if MaxData == 0 || fv > MaxData { MaxData = fv } fv, _ = decimal.NewFromFloat(fv).Round(4).Float64() ResidualChart.DataList = append(ResidualChart.DataList, &response.EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp}) } ResidualChart.MaxData = MaxData ResidualChart.MinData = MinData } } return } func SaveToExcel(data []*data_manage.EdbData, filePath string) (err error) { xlsxFile := xlsx.NewFile() sheetNew, err := xlsxFile.AddSheet("Tmp") if err != nil { return } titleRow := sheetNew.AddRow() titleRow.AddCell().SetString("日期") titleRow.AddCell().SetString("值") for i, d := range data { row := sheetNew.Row(i + 1) row.AddCell().SetString(d.DataTime) row.AddCell().SetFloat(d.Value) } err = xlsxFile.Save(filePath) if err != nil { return } return } type STLResult struct { ResidualMean float64 `json:"residual_mean"` ResidualVar float64 `json:"residual_var"` AdfPValue float64 `json:"adf_p_value"` LbTestPValue float64 `json:"lb_test_p_value"` LbTestStat float64 `json:"lb_test_stat"` } func execStlPythonCode(path, toPath string, period, seasonal, trend, trendDeg, seasonalDeg, lowPassDeg int, fraction float64, robust bool) (stlResult *STLResult, err error) { pythonCode := ` import json import warnings warnings.filterwarnings('ignore') import pandas as pd from statsmodels.tsa.seasonal import STL from statsmodels.nonparametric.smoothers_lowess import lowess from statsmodels.tsa.stattools import adfuller from statsmodels.stats.diagnostic import acorr_ljungbox import numpy as np file_path = r"%s" df = pd.read_excel(file_path, parse_dates=['日期'], engine='openpyxl') df.set_index('日期', inplace=True) df = df[df.index.notna()] period = %d seasonal = %d trend = %d fraction = %g seasonal_deg = %d trend_deg = %d low_pass_deg = %d robust = %s stl = STL( df['值'], period=period, seasonal=seasonal, trend=trend, low_pass=None, seasonal_deg=seasonal_deg, trend_deg=trend_deg, low_pass_deg=low_pass_deg, seasonal_jump=1, trend_jump=1, low_pass_jump=1, robust=robust ) result = stl.fit() smoothed = lowess(df['值'], np.arange(len(df)), frac=fraction) trend_lowess = smoothed[:, 1] # 季节图 seasonal_component = result.seasonal # 趋势图 trend_lowess_series = pd.Series(trend_lowess, index=df.index) # 残差图 residual_component = df['值'] - trend_lowess - seasonal_component # 计算打印残差的均值 residual_mean = np.mean(residual_component) # 计算打印残差的方差 residual_var = np.std(residual_component) # 计算打印残差的ADF检验结果, 输出p-value adf_result = adfuller(residual_component) # 根据p-value判断是否平稳 lb_test = acorr_ljungbox(residual_component, lags=period, return_df=True) output_file = r"%s" with pd.ExcelWriter(output_file) as writer: # 保存季节图 pd.Series(seasonal_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='季节', index=False) # 保存趋势图 trend_lowess_series.to_frame(name='值').reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='趋势', index=False) # 保存残差图 pd.Series(residual_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='残差', index=False) output = json.dumps({ 'residual_mean': residual_mean, 'residual_var': residual_var, 'adf_p_value': adf_result[1], 'lb_test_p_value': lb_test['lb_pvalue'].values[0], 'lb_test_stat': lb_test['lb_stat'].values[0] }) print(output) ` robustStr := "True" if !robust { robustStr = "False" } pythonCode = fmt.Sprintf(pythonCode, path, period, seasonal, trend, fraction, seasonalDeg, trendDeg, lowPassDeg, robustStr, toPath) utils.FileLog.Info("stl exec python code:%s", pythonCode) cmd := exec.Command(`python3`, "-c", pythonCode) output, err := cmd.CombinedOutput() if err != nil { utils.FileLog.Info(`execStlPythonCode error:%s, input: path:%s, toPath:%s, period:%d, seasonal:%d, trend:%d, trendDeg:%d, seasonalDeg:%d, lowPassDeg:%d, fraction:%g, robust:%s, output:%s`, err.Error(), path, toPath, period, seasonal, trend, trendDeg, seasonalDeg, lowPassDeg, fraction, robustStr, string(output)) return } defer cmd.Process.Kill() if err = json.Unmarshal(output, &stlResult); err != nil { utils.FileLog.Info(`execStlPythonCode Unmarshal error:%s, input: path:%s, toPath:%s, period:%d, seasonal:%d, trend:%d, trendDeg:%d, seasonalDeg:%d, lowPassDeg:%d, fraction:%g, robust:%s, output:%s`, err.Error(), path, toPath, period, seasonal, trend, trendDeg, seasonalDeg, lowPassDeg, fraction, robustStr, string(output)) return } return } func SaveStlConfig(req *request.StlConfigReq, adminId int) (configId int64, msg string, err error) { edbInfo, err := data_manage.GetEdbInfoById(req.EdbInfoId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "指标不存在" return } msg = "获取指标信息失败" return } var condition string var pars []interface{} switch req.DataRangeType { case ALL_DATE: case LAST_N_YEARS: condition += " AND data_time >=?" year := time.Now().Year() lastNyear, er := strconv.Atoi(req.LastNYear) if er != nil { msg = "最近N年输入不合法" err = er return } lastDate := time.Date(year-lastNyear, 1, 1, 0, 0, 0, 0, time.Local) pars = append(pars, lastDate) case RANGE_DATE: condition = " AND data_time >=? AND data_time <=?" pars = append(pars, req.StartDate, req.EndDate) case RANGE_DATE_TO_NOW: condition = " AND data_time >=?" pars = append(pars, req.StartDate) } condition += " AND edb_code =?" pars = append(pars, edbInfo.EdbCode) edbData, err := data_manage.GetAllEdbDataListByCondition(condition, pars, edbInfo.Source, edbInfo.SubSource) if err != nil { msg = "获取指标数据失败" return } var condMsg string if req.Period < 2 || req.Period > len(edbData) { condMsg += "period必须是一个大于等于2的正整数,且必须小于时间序列的长度" } if req.Seasonal < 3 || req.Seasonal%2 == 0 || req.Seasonal <= req.Period { if condMsg != "" { condMsg += "\n" } condMsg += "seasonal必须是一个大于等于3的奇整数,且必须大于period" } if req.Trend < 3 || req.Trend%2 == 0 || req.Trend <= req.Period { if condMsg != "" { condMsg += "\n" } condMsg += "trend必须是一个大于等于3的奇整数,且必须大于period" } if req.Fraction < 0 || req.Fraction > 1 { if condMsg != "" { condMsg += "\n" } condMsg += "fraction必须是一个介于[0-1]之间" } if 1 > req.TrendDeg || req.TrendDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "trend_deg请设置成1-5的整数" } if 1 > req.SeasonalDeg || req.SeasonalDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "seasonal_deg请设置成1-5的整数" } if 1 > req.LowPassDeg || req.LowPassDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "low_pass_deg请设置成1-5的整数" } if condMsg != "" { msg = condMsg err = fmt.Errorf("参数错误") return } b, err := json.Marshal(req) if err != nil { return } conf := new(stl.CalculateStlConfig) if req.CalculateStlConfigId > 0 { conf.CalculateStlConfigId = req.CalculateStlConfigId conf.Config = string(b) conf.ModifyTime = time.Now() err = conf.Update([]string{"Config", "ModifyTime"}) configId = int64(req.CalculateStlConfigId) } else { conf.Config = string(b) conf.SysUserId = adminId conf.CreateTime = time.Now() conf.ModifyTime = time.Now() configId, err = conf.Insert() } return } func SearchEdbInfoWithStl(adminId int, keyWord string, currentIndex, pageSize int) (resp data_manage.EdbInfoFilterDataResp, msg string, err error) { var edbInfoList []*data_manage.EdbInfoList noPermissionEdbInfoIdList := make([]int, 0) //无权限指标 // 获取当前账号的不可见指标 { obj := data_manage.EdbInfoNoPermissionAdmin{} confList, er := obj.GetAllListByAdminId(adminId) if er != nil && er.Error() != utils.ErrNoRow() { msg = "获取失败" err = fmt.Errorf("获取不可见指标配置数据失败,Err:" + er.Error()) return } for _, v := range confList { noPermissionEdbInfoIdList = append(noPermissionEdbInfoIdList, v.EdbInfoId) } } if currentIndex <= 0 { currentIndex = 1 } startSize := utils.StartIndex(currentIndex, pageSize) // 是否走ES isEs := false var total int64 if keyWord != "" { frequencyList := []string{"日度", "周度", "旬度", "月度", "季度"} // 普通的搜索 total, edbInfoList, err = elastic.SearchEdbInfoDataByfrequency(utils.DATA_INDEX_NAME, keyWord, startSize, pageSize, 0, frequencyList, noPermissionEdbInfoIdList) isEs = true } else { var condition string var pars []interface{} // 普通指标 condition += ` AND edb_info_type = ? ` pars = append(pars, 0) // 无权限指标id lenNoPermissionEdbInfoIdList := len(noPermissionEdbInfoIdList) if lenNoPermissionEdbInfoIdList > 0 { condition += ` AND edb_info_id not in (` + utils.GetOrmInReplace(lenNoPermissionEdbInfoIdList) + `) ` pars = append(pars, noPermissionEdbInfoIdList) } //频度 condition += ` AND frequency IN ('日度', '周度', '旬度', '月度', '季度') ` total, edbInfoList, err = data_manage.GetEdbInfoFilterList(condition, pars, startSize, pageSize) } if err != nil { edbInfoList = make([]*data_manage.EdbInfoList, 0) } page := paging.GetPaging(currentIndex, pageSize, int(total)) edbInfoListLen := len(edbInfoList) classifyIdList := make([]int, 0) for i := 0; i < edbInfoListLen; i++ { edbInfoList[i].EdbNameAlias = edbInfoList[i].EdbName classifyIdList = append(classifyIdList, edbInfoList[i].ClassifyId) } // 当前列表中的分类map classifyMap := make(map[int]*data_manage.EdbClassify) if edbInfoListLen > 0 { classifyList, er := data_manage.GetEdbClassifyByIdList(classifyIdList) if er != nil { msg = "获取失败" err = fmt.Errorf("获取分类列表失败,Err:" + er.Error()) return } for _, v := range classifyList { classifyMap[v.ClassifyId] = v } // 获取所有有权限的指标和分类 permissionEdbIdList, permissionClassifyIdList, er := data_manage_permission.GetUserEdbAndClassifyPermissionList(adminId, 0, 0) if er != nil { msg = "获取失败" err = fmt.Errorf("获取所有有权限的指标和分类失败,Err:" + er.Error()) return } // 如果是ES的话,需要重新查一下指标的信息,主要是为了把是否授权字段找出来 if isEs { edbInfoIdList := make([]int, 0) for i := 0; i < edbInfoListLen; i++ { edbInfoIdList = append(edbInfoIdList, edbInfoList[i].EdbInfoId) tmpEdbInfo := edbInfoList[i] if currClassify, ok := classifyMap[tmpEdbInfo.ClassifyId]; ok { edbInfoList[i].HaveOperaAuth = data_manage_permission.CheckEdbPermissionByPermissionIdList(tmpEdbInfo.IsJoinPermission, currClassify.IsJoinPermission, tmpEdbInfo.EdbInfoId, tmpEdbInfo.ClassifyId, permissionEdbIdList, permissionClassifyIdList) } } tmpEdbList, er := data_manage.GetEdbInfoByIdList(edbInfoIdList) if er != nil { msg = "获取失败" err = fmt.Errorf("获取所有有权限的指标失败,Err:" + er.Error()) return } edbInfoMap := make(map[int]*data_manage.EdbInfo) for _, v := range tmpEdbList { edbInfoMap[v.EdbInfoId] = v } for i := 0; i < edbInfoListLen; i++ { tmpEdbInfo, ok := edbInfoMap[edbInfoList[i].EdbInfoId] if !ok { continue } edbInfoList[i].IsJoinPermission = tmpEdbInfo.IsJoinPermission } } // 权限校验 for i := 0; i < edbInfoListLen; i++ { tmpEdbInfoItem := edbInfoList[i] if currClassify, ok := classifyMap[tmpEdbInfoItem.ClassifyId]; ok { edbInfoList[i].HaveOperaAuth = data_manage_permission.CheckEdbPermissionByPermissionIdList(tmpEdbInfoItem.IsJoinPermission, currClassify.IsJoinPermission, tmpEdbInfoItem.EdbInfoId, tmpEdbInfoItem.ClassifyId, permissionEdbIdList, permissionClassifyIdList) } } } for i := 0; i < edbInfoListLen; i++ { for j := 0; j < edbInfoListLen; j++ { if (edbInfoList[i].EdbNameAlias == edbInfoList[j].EdbNameAlias) && (edbInfoList[i].EdbInfoId != edbInfoList[j].EdbInfoId) && !(strings.Contains(edbInfoList[i].EdbName, edbInfoList[i].SourceName)) { edbInfoList[i].EdbName = edbInfoList[i].EdbName + "(" + edbInfoList[i].SourceName + ")" } } } //新增搜索词记录 { searchKeyword := new(data_manage.SearchKeyword) searchKeyword.KeyWord = keyWord searchKeyword.CreateTime = time.Now() go data_manage.AddSearchKeyword(searchKeyword) } resp = data_manage.EdbInfoFilterDataResp{ Paging: page, List: edbInfoList, } return } func SaveStlEdbInfo(req *request.SaveStlEdbInfoReq, adminId int, adminRealName, lang string) (addEdbInfoId int, isSendEmail bool, msg string, err error) { if req.EdbName == "" { msg = "指标名称不能为空" return } if req.Unit == "" { msg = "指标单位不能为空" return } if req.ClassifyId <= 0 { msg = "请选择分类" return } if req.Frequency == "" { msg = "指标频度不能为空" return } conf, err := stl.GetCalculateStlConfigById(req.CalculateStlConfigId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "未找到配置,请先进行计算" err = fmt.Errorf("配置不存在") return } msg = "获取失败" return } var stlConfig request.StlConfigReq if err = json.Unmarshal([]byte(conf.Config), &stlConfig); err != nil { msg = "获取失败" return } var edbInfoData []*response.EdbData switch req.StlEdbType { case 1: // 趋势指标 if ok := utils.Rc.IsExist(EDB_DATA_CALCULATE_STL_TREND_CACHE + strconv.Itoa(req.CalculateStlConfigId)); !ok { msg = "计算已过期,请重新计算" err = fmt.Errorf("not found") return } trendData, er := utils.Rc.RedisBytes(EDB_DATA_CALCULATE_STL_TREND_CACHE + strconv.Itoa(req.CalculateStlConfigId)) if er != nil { msg = "获取失败" err = fmt.Errorf("获取redis数据失败,Err:" + er.Error()) return } if er := json.Unmarshal(trendData, &edbInfoData); er != nil { msg = "获取失败" err = fmt.Errorf("json解析失败,Err:" + er.Error()) return } case 2: // 季节性指标 if ok := utils.Rc.IsExist(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE + strconv.Itoa(req.CalculateStlConfigId)); !ok { msg = "计算已过期,请重新计算" err = fmt.Errorf("not found") return } seasonalData, er := utils.Rc.RedisBytes(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE + strconv.Itoa(req.CalculateStlConfigId)) if er != nil { msg = "获取失败" err = fmt.Errorf("获取redis数据失败,Err:" + er.Error()) return } if er := json.Unmarshal(seasonalData, &edbInfoData); er != nil { msg = "获取失败" err = fmt.Errorf("json解析失败,Err:" + er.Error()) return } case 3: // 残差性指标 if ok := utils.Rc.IsExist(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE + strconv.Itoa(req.CalculateStlConfigId)); !ok { msg = "计算已过期,请重新计算" err = fmt.Errorf("not found") return } residualData, er := utils.Rc.RedisBytes(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE + strconv.Itoa(req.CalculateStlConfigId)) if er != nil { msg = "获取失败" err = fmt.Errorf("获取redis数据失败,Err:" + er.Error()) return } if er := json.Unmarshal(residualData, &edbInfoData); er != nil { msg = "获取失败" err = fmt.Errorf("json解析失败,Err:" + er.Error()) return } default: msg = "获取失败" err = fmt.Errorf("未知的计算类型") return } var opEdbInfoId int if req.EdbInfoId > 0 { opEdbInfoId = req.EdbInfoId // 检查指标名称是否存在 var condition string var pars []interface{} switch lang { case utils.EnLangVersion: condition += " AND edb_name_en = ? " default: condition += " AND edb_name=? " } pars = append(pars, req.EdbName) existEdbInfo, er := data_manage.GetEdbInfoByCondition(condition, pars) if er != nil && er.Error() != utils.ErrNoRow() { msg = "获取失败" return } switch lang { case utils.EnLangVersion: if existEdbInfo != nil && existEdbInfo.EdbNameEn == req.EdbName && req.EdbInfoId != existEdbInfo.EdbInfoId { msg = "指标名称已存在" err = fmt.Errorf("指标名称已存在") return } default: if existEdbInfo != nil && existEdbInfo.EdbName == req.EdbName && req.EdbInfoId != existEdbInfo.EdbInfoId { msg = "指标名称已存在" err = fmt.Errorf("指标名称已存在") return } } // 更新指标 edbInfo, er := data_manage.GetEdbInfoById(req.EdbInfoId) if er != nil { if er.Error() == utils.ErrNoRow() { msg = "未找到指标,请刷新后重试" err = er return } msg = "获取失败" err = er return } var updateCols []string switch lang { case utils.EnLangVersion: if edbInfo.EdbName != req.EdbName { edbInfo.EdbNameEn = req.EdbName updateCols = append(updateCols, "edb_name_en") } default: if edbInfo.EdbName != req.EdbName { edbInfo.EdbName = req.EdbName updateCols = append(updateCols, "edb_name") } } if edbInfo.ClassifyId != req.ClassifyId { // 更新分类 maxSort, er := data.GetEdbClassifyMaxSort(req.ClassifyId, 0) if er != nil { msg = "获取失败" err = fmt.Errorf("获取最大排序失败,Err:" + er.Error()) return } edbInfo.ClassifyId = req.ClassifyId edbInfo.Sort = maxSort + 1 updateCols = append(updateCols, "classify_id", "sort") } if edbInfo.Frequency != req.Frequency { edbInfo.Frequency = req.Frequency updateCols = append(updateCols, "frequency") } if edbInfo.Unit != req.Unit { edbInfo.Unit = req.Unit updateCols = append(updateCols, "unit") } edbInfo.CalculateFormula = conf.Config updateCols = append(updateCols, "calculate_formula") if len(updateCols) > 0 { edbInfo.ModifyTime = time.Now() updateCols = append(updateCols, "modify_time") err = edbInfo.Update(updateCols) if err != nil { msg = "保存失败" return } } var dataList []*stl.EdbDataCalculateStl for _, v := range edbInfoData { dataTime, _ := time.Parse(utils.FormatDate, v.DataTime) dataList = append(dataList, &stl.EdbDataCalculateStl{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: dataTime, Value: v.Value, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: dataTime.UnixMilli(), }) } err = stl.DeleteAndInsertEdbDataCalculateStl(edbInfo.EdbCode, dataList) if err != nil { msg = "保存失败" return } data_manage.ModifyEdbInfoDataStatus(int64(edbInfo.EdbInfoId), edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode) maxAndMinItem, _ := data_manage.GetEdbInfoMaxAndMinInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode) if maxAndMinItem != nil { err = data_manage.ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, maxAndMinItem) if err != nil { msg = "保存失败" err = errors.New("保存失败,Err:" + err.Error()) return } } } else { indexObj := new(stl.EdbDataCalculateStl) edbCode, er := utils.GenerateEdbCode(1, "stl") if er != nil { msg = "生成指标代码失败" err = fmt.Errorf("生成指标代码失败,Err:" + er.Error()) return } //判断指标名称是否存在 ok, er := CheckDulplicateEdbInfoName(req.EdbName, lang) if er != nil { msg = "保存失败" err = fmt.Errorf("检查指标名称是否存在失败,Err:" + er.Error()) return } if ok { msg = "指标名称已存在" err = fmt.Errorf("指标名称已存在") return } source := utils.DATA_SOURCE_CALCULATE_STL subSource := utils.DATA_SUB_SOURCE_EDB edbInfo := new(data_manage.EdbInfo) //获取该层级下最大的排序数 maxSort, er := data.GetEdbClassifyMaxSort(req.ClassifyId, 0) if er != nil { msg = "获取失败" err = fmt.Errorf("获取最大排序失败,Err:" + er.Error()) return } edbInfo.EdbCode = edbCode edbInfo.EdbName = req.EdbName edbInfo.EdbNameEn = req.EdbName edbInfo.EdbNameSource = req.EdbName edbInfo.Frequency = req.Frequency edbInfo.Unit = req.Unit edbInfo.UnitEn = req.Unit edbInfo.CalculateFormula = conf.Config edbInfo.ClassifyId = req.ClassifyId edbInfo.SysUserId = adminId edbInfo.SysUserRealName = adminRealName edbInfo.CreateTime = time.Now() edbInfo.ModifyTime = time.Now() edbInfo.Sort = maxSort + 1 edbInfo.DataDateType = `交易日` timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) edbInfo.UniqueCode = utils.MD5(utils.DATA_PREFIX + "_" + timestamp) itemVal, er := data_manage.GetEdbInfoMaxAndMinInfo(source, subSource, edbCode) if itemVal != nil && er == nil { edbInfo.MaxValue = itemVal.MaxValue edbInfo.MinValue = itemVal.MinValue } edbInfo.EdbType = 2 edbInfo.Source = source edbInfo.SubSource = subSource edbInfo.SourceName = "STL趋势分解" extra, _ := json.Marshal(req) edbInfo.Extra = string(extra) edbInfoId, er := data_manage.AddEdbInfo(edbInfo) if er != nil { msg = "保存失败" err = errors.New("保存失败,Err:" + er.Error()) return } edbInfo.EdbInfoId = int(edbInfoId) var dataList []*stl.EdbDataCalculateStl for _, v := range edbInfoData { dataTime, _ := time.Parse(utils.FormatDate, v.DataTime) dataList = append(dataList, &stl.EdbDataCalculateStl{ EdbInfoId: int(edbInfoId), EdbCode: edbCode, DataTime: dataTime, Value: v.Value, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: dataTime.UnixMilli(), }) } err = indexObj.BatchInsert(dataList) if err != nil { msg = "保存失败" return } //保存数据 data_manage.ModifyEdbInfoDataStatus(edbInfoId, source, subSource, edbCode) maxAndMinItem, _ := data_manage.GetEdbInfoMaxAndMinInfo(source, subSource, edbCode) if maxAndMinItem != nil { err = data_manage.ModifyEdbInfoMaxAndMinInfo(int(edbInfoId), maxAndMinItem) if err != nil { msg = "保存失败" err = errors.New("保存失败,Err:" + err.Error()) return } } // 保存配置映射 { stlMapping := new(stl.CalculateStlConfigMapping) stlMapping.EdbInfoId = int(edbInfoId) stlMapping.CalculateStlConfigId = req.CalculateStlConfigId stlMapping.StlEdbType = req.StlEdbType stlMapping.CreateTime = time.Now() stlMapping.ModifyTime = time.Now() _, err = stlMapping.Insert() if err != nil { msg = "保存失败" err = errors.New("保存配置映射失败,Err:" + err.Error()) return } } // 保存溯源信息 { fromEdbInfo, er := data_manage.GetEdbInfoById(stlConfig.EdbInfoId) if er != nil { if er.Error() == utils.ErrNoRow() { msg = "未找到指标,请刷新后重试" err = fmt.Errorf("指标不存在,err:" + er.Error()) return } msg = "获取失败" err = er return } edbCalculateMappingInfo := new(data_manage.EdbInfoCalculateMapping) edbCalculateMappingInfo.EdbInfoId = int(edbInfoId) edbCalculateMappingInfo.Source = source edbCalculateMappingInfo.SourceName = "STL趋势分解" edbCalculateMappingInfo.EdbCode = edbCode edbCalculateMappingInfo.FromEdbInfoId = fromEdbInfo.EdbInfoId edbCalculateMappingInfo.FromEdbCode = fromEdbInfo.EdbCode edbCalculateMappingInfo.FromEdbName = fromEdbInfo.EdbName edbCalculateMappingInfo.FromSource = fromEdbInfo.Source edbCalculateMappingInfo.FromSourceName = fromEdbInfo.SourceName edbCalculateMappingInfo.CreateTime = time.Now() edbCalculateMappingInfo.ModifyTime = time.Now() err = edbCalculateMappingInfo.Insert() if err != nil { msg = "保存失败" err = errors.New("保存溯源信息失败,Err:" + err.Error()) return } } //添加es data.AddOrEditEdbInfoToEs(int(edbInfoId)) opEdbInfoId = int(edbInfoId) } // 更新关联的同配置的指标 err = SyncUpdateRelationEdbInfo(req.CalculateStlConfigId, opEdbInfoId) if err != nil { msg = "更新关联的同配置的指标失败" return } addEdbInfoId = opEdbInfoId return } func SyncUpdateRelationEdbInfo(configId int, excludeId int) (err error) { mappingList, err := stl.GetCalculateStlConfigMappingByConfigId(configId) if err != nil { return } conf, err := stl.GetCalculateStlConfigById(configId) if err != nil { return } for _, v := range mappingList { edbInfo, er := data_manage.GetEdbInfoById(v.EdbInfoId) if er != nil { continue } if v.EdbInfoId == excludeId { continue } var edbInfoData []*response.EdbData switch v.StlEdbType { case 1: // 趋势指标 if ok := utils.Rc.IsExist(EDB_DATA_CALCULATE_STL_TREND_CACHE + strconv.Itoa(v.CalculateStlConfigId)); !ok { utils.FileLog.Info(EDB_DATA_CALCULATE_STL_TREND_CACHE + strconv.Itoa(v.CalculateStlConfigId) + "指标数据不存在") continue } trendData, er := utils.Rc.RedisBytes(EDB_DATA_CALCULATE_STL_TREND_CACHE + strconv.Itoa(v.CalculateStlConfigId)) if er != nil { utils.FileLog.Info(EDB_DATA_CALCULATE_STL_TREND_CACHE + strconv.Itoa(v.CalculateStlConfigId) + "redis获取失败,err:" + er.Error()) continue } if er := json.Unmarshal(trendData, &edbInfoData); er != nil { utils.FileLog.Info("redis获取解析, body:%s,err:%s", string(trendData), er.Error()) continue } case 2: // 季节性指标 if ok := utils.Rc.IsExist(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE + strconv.Itoa(v.CalculateStlConfigId)); !ok { utils.FileLog.Info(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE + strconv.Itoa(v.CalculateStlConfigId) + "指标数据不存在") continue } seasonalData, er := utils.Rc.RedisBytes(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE + strconv.Itoa(v.CalculateStlConfigId)) if er != nil { utils.FileLog.Info(EDB_DATA_CALCULATE_STL_SEASONAL_CACHE + strconv.Itoa(v.CalculateStlConfigId) + "redis获取失败,err:" + er.Error()) continue } if er := json.Unmarshal(seasonalData, &edbInfoData); er != nil { utils.FileLog.Info("redis数据解析失败, body:%s,err:%s", string(seasonalData), er.Error()) continue } case 3: // 残差性指标 if ok := utils.Rc.IsExist(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE + strconv.Itoa(v.CalculateStlConfigId)); !ok { utils.FileLog.Info(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE + strconv.Itoa(v.CalculateStlConfigId) + "指标数据不存在") continue } residualData, er := utils.Rc.RedisBytes(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE + strconv.Itoa(v.CalculateStlConfigId)) if er != nil { utils.FileLog.Info(EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE + strconv.Itoa(v.CalculateStlConfigId) + "redis获取失败,err:" + er.Error()) continue } if er := json.Unmarshal(residualData, &edbInfoData); er != nil { utils.FileLog.Info("redis数据解析失败, body:%s,err:%s", string(residualData), er.Error()) continue } default: utils.FileLog.Info("未知的stlEdbType类型, mapping:%v", v) continue } var dataList []*stl.EdbDataCalculateStl for _, v := range edbInfoData { dataTime, _ := time.Parse(utils.FormatDate, v.DataTime) dataList = append(dataList, &stl.EdbDataCalculateStl{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: dataTime, Value: v.Value, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: dataTime.UnixMilli(), }) } err = stl.DeleteAndInsertEdbDataCalculateStl(edbInfo.EdbCode, dataList) if err != nil { return } data_manage.ModifyEdbInfoDataStatus(int64(edbInfo.EdbInfoId), edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode) maxAndMinItem, _ := data_manage.GetEdbInfoMaxAndMinInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode) if maxAndMinItem != nil { err = data_manage.ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, maxAndMinItem) if err != nil { return } } edbInfo.CalculateFormula = conf.Config edbInfo.ModifyTime = time.Now() err = edbInfo.Update([]string{"calculate_formula", "modify_time"}) if err != nil { return } } return } func GetStlConfig(edbInfoId int) (resp *response.StlConfigResp, msg string, err error) { configId, err := stl.GetCalculateStlConfigMappingIdByEdbInfoId(edbInfoId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "未找到指标信息, 请选择其他指标" return } msg = "查询失败" return } queryEdbInfo, err := data_manage.GetEdbInfoById(edbInfoId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "未找到指标,请刷新后重试" return } msg = "获取失败" return } var req request.StlConfigReq if err = json.Unmarshal([]byte(queryEdbInfo.CalculateFormula), &req); err != nil { msg = "获取失败" return } edbInfo, err := data_manage.GetEdbInfoById(req.EdbInfoId) if err != nil { if err.Error() == utils.ErrNoRow() { msg = "未找到指标,请刷新后重试" return } msg = "获取失败" return } resp = &response.StlConfigResp{ CalculateStlConfigId: configId, EdbInfoId: req.EdbInfoId, EdbInfoName: edbInfo.EdbName, DataRangeType: req.DataRangeType, StartDate: req.StartDate, EndDate: req.EndDate, LastNYear: req.LastNYear, Period: req.Period, Seasonal: req.Seasonal, Trend: req.Trend, Fraction: req.Fraction, Robust: req.Robust, TrendDeg: req.TrendDeg, SeasonalDeg: req.SeasonalDeg, LowPassDeg: req.LowPassDeg, } return } func CheckDulplicateEdbInfoName(edbName, lang string) (ok bool, err error) { var count int var condition string var pars []interface{} switch lang { case utils.EnLangVersion: condition += " AND edb_name_en = ? " default: condition += " AND edb_name=? " } pars = append(pars, edbName) count, err = data_manage.GetEdbInfoCountByCondition(condition, pars) if err != nil { return } if count > 0 { ok = true return } return }