package services import ( "encoding/json" "eta/eta_index_lib/models" "eta/eta_index_lib/utils" "fmt" "os" "os/exec" "path/filepath" "strconv" "time" "github.com/tealeg/xlsx" ) const ( ALL_DATE = iota + 1 LAST_N_YEARS RANGE_DATE RANGE_DATE_TO_NOW ) type EdbStlConfig struct { EdbInfoId int `description:"指标ID"` CalculateStlConfigId int `description:"计算的STL配置ID"` DataRangeType int `description:"数据时间类型:1-全部时间,2-最近N年,3-区间设置,4-区间设置(至今)"` StartDate string `description:"开始日期"` EndDate string `description:"结束日期"` LastNYear string `description:"最近N年"` Period int `description:"数据的周期,根据频率设置"` Seasonal int `description:"季节性成分窗口大小,一般为period+1,可以设置为大于period的正奇数"` Trend int `description:"趋势成分窗口大小,一般为period+1,可以设置为大于period的正奇数"` Fraction float64 `description:"趋势项的平滑系数,默认为0.2,区间为[0-1]"` Robust bool `description:"是否使用稳健方法: true(使用) false(不使用) "` TrendDeg int `description:"分解中趋势多项式次数,默认为1,不超过5的正整数"` SeasonalDeg int `description:"分解中季节性多项次数,默认为1,不超过5的正整数"` LowPassDeg int `description:"分解中低通滤波器次数,默认为1,不超过5的正整数"` } type ChartEdbInfo struct { EdbInfoId int Title string Unit string Frequency string MaxData float64 MinData float64 ClassifyId int ClassifyPath string DataList []*EdbData } type EdbData struct { Value float64 DataTime string DataTimestamp int64 } func RefreshStlData(edbInfoId int) (msg string, err error) { calculateStl, err := models.GetEdbInfoCalculateMappingDetail(edbInfoId) if err != nil { return } fromEdbInfo, err := models.GetEdbInfoById(calculateStl.FromEdbInfoId) if err != nil { return } var stlConfig EdbStlConfig if err = json.Unmarshal([]byte(calculateStl.CalculateFormula), &stlConfig); err != nil { return } var condition string var pars []interface{} switch stlConfig.DataRangeType { case ALL_DATE: case LAST_N_YEARS: condition += " AND data_time >=?" year := time.Now().Year() lastNyear, er := strconv.Atoi(stlConfig.LastNYear) if er != nil { msg = "最近N年输入不合法" err = er return } lastDate := time.Date(year-lastNyear, 1, 1, 0, 0, 0, 0, time.Local) pars = append(pars, lastDate) case RANGE_DATE: condition = " AND data_time >=? AND data_time <=?" pars = append(pars, stlConfig.StartDate, stlConfig.EndDate) case RANGE_DATE_TO_NOW: condition = " AND data_time >=?" pars = append(pars, stlConfig.StartDate) } condition += " AND edb_code =?" pars = append(pars, fromEdbInfo.EdbCode) edbData, err := models.GetEdbDataByCondition(fromEdbInfo.Source, fromEdbInfo.SubSource, condition, pars) if err != nil { return } var condMsg string if stlConfig.Period < 2 || stlConfig.Period > len(edbData) { condMsg += "period必须是一个大于等于2的正整数,且必须小于时间序列的长度" } if stlConfig.Seasonal < 3 || stlConfig.Seasonal%2 == 0 || stlConfig.Seasonal <= stlConfig.Period { if condMsg != "" { condMsg += "\n" } condMsg += "seasonal必须是一个大于等于3的奇整数,且必须大于period" } if stlConfig.Trend < 3 || stlConfig.Trend%2 == 0 || stlConfig.Trend <= stlConfig.Period { if condMsg != "" { condMsg += "\n" } condMsg += "trend必须是一个大于等于3的奇整数,且必须大于period" } if stlConfig.Fraction < 0 || stlConfig.Fraction > 1 { if condMsg != "" { condMsg += "\n" } condMsg += "fraction必须是一个介于[0-1]之间" } if 1 > stlConfig.TrendDeg || stlConfig.TrendDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "trend_deg请设置成1-5的整数" } if 1 > stlConfig.SeasonalDeg || stlConfig.SeasonalDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "seasonal_deg请设置成1-5的整数" } if 1 > stlConfig.LowPassDeg || stlConfig.LowPassDeg > 5 { if condMsg != "" { condMsg += "\n" } condMsg += "low_pass_deg请设置成1-5的整数" } if condMsg != "" { msg = condMsg err = fmt.Errorf("参数错误") return } dir, _ := os.Executable() exPath := filepath.Dir(dir) + "/static/stl_tmp" err = CheckOsPathAndMake(exPath) if err != nil { msg = "计算失败" return } loadFilePath := exPath + "/" + strconv.Itoa(fromEdbInfo.SysUserId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + ".xlsx" err = SaveToExcel(edbData, loadFilePath) if err != nil { msg = "保存数据到Excel失败" return } defer os.Remove(loadFilePath) saveFilePath := exPath + "/" + strconv.Itoa(fromEdbInfo.SysUserId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + "_res" + ".xlsx" err = execStlPythonCode(loadFilePath, saveFilePath, stlConfig.Period, stlConfig.Seasonal, stlConfig.Trend, stlConfig.TrendDeg, stlConfig.SeasonalDeg, stlConfig.LowPassDeg, stlConfig.Fraction, stlConfig.Robust) if err != nil { msg = "执行Python代码失败" return } trendChart, seasonalChart, residualChart, err := ParseStlExcel(saveFilePath) if err != nil { msg = "解析Excel失败" return } defer os.Remove(saveFilePath) edbInfo, err := models.GetEdbInfoById(edbInfoId) if err != nil { msg = "获取指标信息失败" return } err = SyncUpdateRelationEdbInfo(edbInfo, stlConfig, trendChart, seasonalChart, residualChart) if err != nil { msg = "更新关联指标失败" return } return } func SyncUpdateRelationEdbInfo(edbInfo *models.EdbInfo, config EdbStlConfig, trendData, seasonalData, residualData ChartEdbInfo) (err error) { configId, err := models.GetCalculateStlConfigMappingIdByEdbInfoId(edbInfo.EdbInfoId) if err != nil { return } mappingList, err := models.GetCalculateStlConfigMappingByConfigId(configId) if err != nil { return } for _, v := range mappingList { edbInfo, er := models.GetEdbInfoById(v.EdbInfoId) if er != nil { continue } switch v.StlEdbType { case 1: // 趋势指标 er = UpdateStlEdbData(edbInfo, config, edbInfo.EdbCode, trendData) case 2: // 季节性指标 er = UpdateStlEdbData(edbInfo, config, edbInfo.EdbCode, seasonalData) case 3: // 残差指标 er = UpdateStlEdbData(edbInfo, config, edbInfo.EdbCode, residualData) default: utils.FileLog.Info("未知的stlEdbType类型, mapping:%v", v) continue } if er != nil { utils.FileLog.Error("更新指标数据失败, edbInfoId:%v, err:%v", v.EdbInfoId, er) err = er continue } } // 同步更新计算配置 newStlConf := &models.CalculateStlConfig{ CalculateStlConfigId: configId, Config: edbInfo.CalculateFormula, ModifyTime: time.Now(), } err = newStlConf.Update([]string{"config", "modify_time"}) return } func UpdateStlEdbData(edbInfo *models.EdbInfo, config EdbStlConfig, edbCode string, edbData ChartEdbInfo) (err error) { var dataList []*models.EdbDataCalculateStl for _, v := range edbData.DataList { dataTime, _ := time.Parse(utils.FormatDate, v.DataTime) dataList = append(dataList, &models.EdbDataCalculateStl{ EdbInfoId: edbData.EdbInfoId, EdbCode: edbCode, DataTime: dataTime, Value: v.Value, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: dataTime.UnixMilli(), }) } err = models.DeleteAndInsertEdbDataCalculateStl(edbCode, dataList) if err != nil { return } models.ModifyEdbInfoDataStatus(int64(edbInfo.EdbInfoId), edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode) maxAndMinItem, _ := models.GetEdbInfoMaxAndMinInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode) if maxAndMinItem != nil { err = models.ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, maxAndMinItem) if err != nil { return } } bconfig, _ := json.Marshal(config) edbInfo.CalculateFormula = string(bconfig) edbInfo.ModifyTime = time.Now() err = edbInfo.Update([]string{"calculate_formula", "modify_time"}) if err != nil { return } return } func CheckOsPathAndMake(path string) (err error) { if _, er := os.Stat(path); os.IsNotExist(er) { err = os.MkdirAll(path, os.ModePerm) } return } func SaveToExcel(data []*models.EdbInfoSearchData, filePath string) (err error) { xlsxFile := xlsx.NewFile() sheetNew, err := xlsxFile.AddSheet("Tmp") if err != nil { return } titleRow := sheetNew.AddRow() titleRow.AddCell().SetString("日期") titleRow.AddCell().SetString("值") for i, d := range data { row := sheetNew.Row(i + 1) row.AddCell().SetString(d.DataTime) row.AddCell().SetFloat(d.Value) } err = xlsxFile.Save(filePath) if err != nil { return } return } func ParseStlExcel(excelPath string) (TrendChart, SeasonalChart, ResidualChart ChartEdbInfo, err error) { file, err := xlsx.OpenFile(excelPath) if err != nil { return } for _, sheet := range file.Sheets { switch sheet.Name { case "季节": var MinData, MaxData float64 for i, row := range sheet.Rows { if i == 0 { continue } var date string var dataTimestamp int64 if row.Cells[0].Type() == xlsx.CellTypeNumeric { dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64) tmpTime := xlsx.TimeFromExcelTime(dataNum, false) date = tmpTime.Format(utils.FormatDate) dataTimestamp = tmpTime.UnixMilli() } else { timeDate, _ := time.Parse(utils.FormatDateTime, date) date = timeDate.Format(utils.FormatDate) dataTimestamp = timeDate.UnixMilli() } fv, _ := row.Cells[1].Float() if MinData == 0 || fv < MinData { MinData = fv } if MaxData == 0 || fv > MaxData { MaxData = fv } SeasonalChart.DataList = append(SeasonalChart.DataList, &EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp}) } SeasonalChart.MinData = MinData SeasonalChart.MaxData = MaxData case "趋势": var MinData, MaxData float64 for i, row := range sheet.Rows { if i == 0 { continue } var date string var dataTimestamp int64 if row.Cells[0].Type() == xlsx.CellTypeNumeric { dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64) tmpTime := xlsx.TimeFromExcelTime(dataNum, false) date = tmpTime.Format(utils.FormatDate) dataTimestamp = tmpTime.UnixMilli() } else { timeDate, _ := time.Parse(utils.FormatDateTime, date) date = timeDate.Format(utils.FormatDate) dataTimestamp = timeDate.UnixMilli() } fv, _ := row.Cells[1].Float() if MinData == 0 || fv < MinData { MinData = fv } if MaxData == 0 || fv > MaxData { MaxData = fv } TrendChart.DataList = append(TrendChart.DataList, &EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp}) } TrendChart.MaxData = MaxData TrendChart.MinData = MinData case "残差": var MinData, MaxData float64 for i, row := range sheet.Rows { if i == 0 { continue } var date string var dataTimestamp int64 if row.Cells[0].Type() == xlsx.CellTypeNumeric { dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64) tmpTime := xlsx.TimeFromExcelTime(dataNum, false) date = tmpTime.Format(utils.FormatDate) dataTimestamp = tmpTime.UnixMilli() } else { timeDate, _ := time.Parse(utils.FormatDateTime, date) date = timeDate.Format(utils.FormatDate) dataTimestamp = timeDate.UnixMilli() } fv, _ := row.Cells[1].Float() if MinData == 0 || fv < MinData { MinData = fv } if MaxData == 0 || fv > MaxData { MaxData = fv } ResidualChart.DataList = append(ResidualChart.DataList, &EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp}) } ResidualChart.MaxData = MaxData ResidualChart.MinData = MinData } } return } func execStlPythonCode(path, toPath string, period, seasonal, trend, trendDeg, seasonalDeg, lowPassDeg int, fraction float64, robust bool) (err error) { pythonCode := ` import pandas as pd from statsmodels.tsa.seasonal import STL from statsmodels.nonparametric.smoothers_lowess import lowess from statsmodels.tsa.stattools import adfuller from statsmodels.stats.diagnostic import acorr_ljungbox import numpy as np import json import warnings warnings.filterwarnings('ignore') file_path = r"%s" df = pd.read_excel(file_path, parse_dates=['日期']) df.set_index('日期', inplace=True) period = %d seasonal = %d trend = %d fraction = %g seasonal_deg = %d trend_deg = %d low_pass_deg = %d robust = %s stl = STL( df['值'], period=period, seasonal=seasonal, trend=trend, low_pass=None, seasonal_deg=seasonal_deg, trend_deg=trend_deg, low_pass_deg=low_pass_deg, seasonal_jump=1, trend_jump=1, low_pass_jump=1, robust=robust ) result = stl.fit() smoothed = lowess(df['值'], df.index, frac=fraction) trend_lowess = smoothed[:, 1] # 季节图 seasonal_component = result.seasonal # 趋势图 trend_lowess_series = pd.Series(trend_lowess, index=df.index) # 残差图 residual_component = df['值'] - trend_lowess - seasonal_component # 计算打印残差的均值 residual_mean = np.mean(residual_component) # 计算打印残差的方差 residual_var = np.std(residual_component) # 计算打印残差的ADF检验结果, 输出p-value adf_result = adfuller(residual_component) # 根据p-value判断是否平稳 lb_test = acorr_ljungbox(residual_component, lags=period, return_df=True) output_file = r"%s" with pd.ExcelWriter(output_file) as writer: # 保存季节图 pd.Series(seasonal_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='季节', index=False) # 保存趋势图 trend_lowess_series.to_frame(name='值').reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='趋势', index=False) # 保存残差图 pd.Series(residual_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='残差', index=False) output = json.dumps({ 'residual_mean': residual_mean, 'residual_var': residual_var, 'adf_p_value': adf_result[1], 'lb_test_p_value': lb_test['lb_pvalue'].values[0], 'lb_test_stat': lb_test['lb_stat'].values[0] }) print(output) ` robustStr := "True" if !robust { robustStr = "False" } pythonCode = fmt.Sprintf(pythonCode, path, period, seasonal, trend, fraction, seasonalDeg, trendDeg, lowPassDeg, robustStr, toPath) cmd := exec.Command(`python3`, "-c", pythonCode) _, err = cmd.CombinedOutput() if err != nil { return } defer cmd.Process.Kill() return }