|
- package services
- import (
- "encoding/json"
- "eta/eta_index_lib/models"
- "eta/eta_index_lib/utils"
- "fmt"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "time"
- "github.com/tealeg/xlsx"
- )
- const (
- ALL_DATE = iota + 1
- LAST_N_YEARS
- RANGE_DATE
- RANGE_DATE_TO_NOW
- )
- type EdbStlConfig struct {
- EdbInfoId int `description:"指标ID"`
- CalculateStlConfigId int `description:"计算的STL配置ID"`
- DataRangeType int `description:"数据时间类型:1-全部时间,2-最近N年,3-区间设置,4-区间设置(至今)"`
- StartDate string `description:"开始日期"`
- EndDate string `description:"结束日期"`
- LastNYear string `description:"最近N年"`
- Period int `description:"数据的周期,根据频率设置"`
- Seasonal int `description:"季节性成分窗口大小,一般为period+1,可以设置为大于period的正奇数"`
- Trend int `description:"趋势成分窗口大小,一般为period+1,可以设置为大于period的正奇数"`
- Fraction float64 `description:"趋势项的平滑系数,默认为0.2,区间为[0-1]"`
- Robust bool `description:"是否使用稳健方法: true(使用) false(不使用) "`
- TrendDeg int `description:"分解中趋势多项式次数,默认为1,不超过5的正整数"`
- SeasonalDeg int `description:"分解中季节性多项次数,默认为1,不超过5的正整数"`
- LowPassDeg int `description:"分解中低通滤波器次数,默认为1,不超过5的正整数"`
- }
- type ChartEdbInfo struct {
- EdbInfoId int
- Title string
- Unit string
- Frequency string
- MaxData float64
- MinData float64
- ClassifyId int
- ClassifyPath string
- DataList []*EdbData
- }
- type EdbData struct {
- Value float64
- DataTime string
- DataTimestamp int64
- }
- func RefreshStlData(edbInfoId int) (msg string, err error) {
- calculateStl, err := models.GetEdbInfoCalculateMappingDetail(edbInfoId)
- if err != nil {
- return
- }
- fromEdbInfo, err := models.GetEdbInfoById(calculateStl.FromEdbInfoId)
- if err != nil {
- return
- }
- var stlConfig EdbStlConfig
- if err = json.Unmarshal([]byte(calculateStl.CalculateFormula), &stlConfig); err != nil {
- return
- }
- var condition string
- var pars []interface{}
- switch stlConfig.DataRangeType {
- case ALL_DATE:
- case LAST_N_YEARS:
- condition += " AND data_time >=?"
- year := time.Now().Year()
- lastNyear, er := strconv.Atoi(stlConfig.LastNYear)
- if er != nil {
- msg = "最近N年输入不合法"
- err = er
- return
- }
- lastDate := time.Date(year-lastNyear, 1, 1, 0, 0, 0, 0, time.Local)
- pars = append(pars, lastDate)
- case RANGE_DATE:
- condition = " AND data_time >=? AND data_time <=?"
- pars = append(pars, stlConfig.StartDate, stlConfig.EndDate)
- case RANGE_DATE_TO_NOW:
- condition = " AND data_time >=?"
- pars = append(pars, stlConfig.StartDate)
- }
- condition += " AND edb_code =?"
- pars = append(pars, fromEdbInfo.EdbCode)
- edbData, err := models.GetEdbDataByCondition(fromEdbInfo.Source, fromEdbInfo.SubSource, condition, pars)
- if err != nil {
- return
- }
- var condMsg string
- if stlConfig.Period < 2 || stlConfig.Period > len(edbData) {
- condMsg += "period必须是一个大于等于2的正整数,且必须小于时间序列的长度"
- }
- if stlConfig.Seasonal < 3 || stlConfig.Seasonal%2 == 0 || stlConfig.Seasonal <= stlConfig.Period {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "seasonal必须是一个大于等于3的奇整数,且必须大于period"
- }
- if stlConfig.Trend < 3 || stlConfig.Trend%2 == 0 || stlConfig.Trend <= stlConfig.Period {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "trend必须是一个大于等于3的奇整数,且必须大于period"
- }
- if stlConfig.Fraction < 0 || stlConfig.Fraction > 1 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "fraction必须是一个介于[0-1]之间"
- }
- if 1 > stlConfig.TrendDeg || stlConfig.TrendDeg > 5 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "trend_deg请设置成1-5的整数"
- }
- if 1 > stlConfig.SeasonalDeg || stlConfig.SeasonalDeg > 5 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "seasonal_deg请设置成1-5的整数"
- }
- if 1 > stlConfig.LowPassDeg || stlConfig.LowPassDeg > 5 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "low_pass_deg请设置成1-5的整数"
- }
- if condMsg != "" {
- msg = condMsg
- err = fmt.Errorf("参数错误")
- return
- }
- dir, _ := os.Executable()
- exPath := filepath.Dir(dir) + "/static/stl_tmp"
- err = CheckOsPathAndMake(exPath)
- if err != nil {
- msg = "计算失败"
- return
- }
- loadFilePath := exPath + "/" + strconv.Itoa(fromEdbInfo.SysUserId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + ".xlsx"
- err = SaveToExcel(edbData, loadFilePath)
- if err != nil {
- msg = "保存数据到Excel失败"
- return
- }
- defer os.Remove(loadFilePath)
- saveFilePath := exPath + "/" + strconv.Itoa(fromEdbInfo.SysUserId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + "_res" + ".xlsx"
- err = execStlPythonCode(loadFilePath, saveFilePath, stlConfig.Period, stlConfig.Seasonal, stlConfig.Trend, stlConfig.TrendDeg, stlConfig.SeasonalDeg, stlConfig.LowPassDeg, stlConfig.Fraction, stlConfig.Robust)
- if err != nil {
- msg = "执行Python代码失败"
- return
- }
- trendChart, seasonalChart, residualChart, err := ParseStlExcel(saveFilePath)
- if err != nil {
- msg = "解析Excel失败"
- return
- }
- defer os.Remove(saveFilePath)
- edbInfo, err := models.GetEdbInfoById(edbInfoId)
- if err != nil {
- msg = "获取指标信息失败"
- return
- }
- err = SyncUpdateRelationEdbInfo(edbInfo, stlConfig, trendChart, seasonalChart, residualChart)
- if err != nil {
- msg = "更新关联指标失败"
- return
- }
- return
- }
- func SyncUpdateRelationEdbInfo(edbInfo *models.EdbInfo, config EdbStlConfig, trendData, seasonalData, residualData ChartEdbInfo) (err error) {
- configId, err := models.GetCalculateStlConfigMappingIdByEdbInfoId(edbInfo.EdbInfoId)
- if err != nil {
- return
- }
- mappingList, err := models.GetCalculateStlConfigMappingByConfigId(configId)
- if err != nil {
- return
- }
- for _, v := range mappingList {
- edbInfo, er := models.GetEdbInfoById(v.EdbInfoId)
- if er != nil {
- continue
- }
- switch v.StlEdbType {
- case 1:
- // 趋势指标
- er = UpdateStlEdbData(edbInfo, config, edbInfo.EdbCode, trendData)
- case 2:
- // 季节性指标
- er = UpdateStlEdbData(edbInfo, config, edbInfo.EdbCode, seasonalData)
- case 3:
- // 残差指标
- er = UpdateStlEdbData(edbInfo, config, edbInfo.EdbCode, residualData)
- default:
- utils.FileLog.Info("未知的stlEdbType类型, mapping:%v", v)
- continue
- }
- if er != nil {
- utils.FileLog.Error("更新指标数据失败, edbInfoId:%v, err:%v", v.EdbInfoId, er)
- err = er
- continue
- }
- }
- // 同步更新计算配置
- newStlConf := &models.CalculateStlConfig{
- CalculateStlConfigId: configId,
- Config: edbInfo.CalculateFormula,
- ModifyTime: time.Now(),
- }
- err = newStlConf.Update([]string{"config", "modify_time"})
- return
- }
- func UpdateStlEdbData(edbInfo *models.EdbInfo, config EdbStlConfig, edbCode string, edbData ChartEdbInfo) (err error) {
- var dataList []*models.EdbDataCalculateStl
- for _, v := range edbData.DataList {
- dataTime, _ := time.Parse(utils.FormatDate, v.DataTime)
- dataList = append(dataList, &models.EdbDataCalculateStl{
- EdbInfoId: edbData.EdbInfoId,
- EdbCode: edbCode,
- DataTime: dataTime,
- Value: v.Value,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: dataTime.UnixMilli(),
- })
- }
- err = models.DeleteAndInsertEdbDataCalculateStl(edbCode, dataList)
- if err != nil {
- return
- }
- models.ModifyEdbInfoDataStatus(int64(edbInfo.EdbInfoId), edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode)
- maxAndMinItem, _ := models.GetEdbInfoMaxAndMinInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode)
- if maxAndMinItem != nil {
- err = models.ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, maxAndMinItem)
- if err != nil {
- return
- }
- }
- bconfig, _ := json.Marshal(config)
- edbInfo.CalculateFormula = string(bconfig)
- edbInfo.ModifyTime = time.Now()
- err = edbInfo.Update([]string{"calculate_formula", "modify_time"})
- if err != nil {
- return
- }
- return
- }
- func CheckOsPathAndMake(path string) (err error) {
- if _, er := os.Stat(path); os.IsNotExist(er) {
- err = os.MkdirAll(path, os.ModePerm)
- }
- return
- }
- func SaveToExcel(data []*models.EdbInfoSearchData, filePath string) (err error) {
- xlsxFile := xlsx.NewFile()
- sheetNew, err := xlsxFile.AddSheet("Tmp")
- if err != nil {
- return
- }
- titleRow := sheetNew.AddRow()
- titleRow.AddCell().SetString("日期")
- titleRow.AddCell().SetString("值")
- for i, d := range data {
- row := sheetNew.Row(i + 1)
- row.AddCell().SetString(d.DataTime)
- row.AddCell().SetFloat(d.Value)
- }
- err = xlsxFile.Save(filePath)
- if err != nil {
- return
- }
- return
- }
- func ParseStlExcel(excelPath string) (TrendChart, SeasonalChart, ResidualChart ChartEdbInfo, err error) {
- file, err := xlsx.OpenFile(excelPath)
- if err != nil {
- return
- }
- for _, sheet := range file.Sheets {
- switch sheet.Name {
- case "季节":
- var MinData, MaxData float64
- for i, row := range sheet.Rows {
- if i == 0 {
- continue
- }
- var date string
- var dataTimestamp int64
- if row.Cells[0].Type() == xlsx.CellTypeNumeric {
- dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64)
- tmpTime := xlsx.TimeFromExcelTime(dataNum, false)
- date = tmpTime.Format(utils.FormatDate)
- dataTimestamp = tmpTime.UnixMilli()
- } else {
- timeDate, _ := time.Parse(utils.FormatDateTime, date)
- date = timeDate.Format(utils.FormatDate)
- dataTimestamp = timeDate.UnixMilli()
- }
- fv, _ := row.Cells[1].Float()
- if MinData == 0 || fv < MinData {
- MinData = fv
- }
- if MaxData == 0 || fv > MaxData {
- MaxData = fv
- }
- SeasonalChart.DataList = append(SeasonalChart.DataList, &EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp})
- }
- SeasonalChart.MinData = MinData
- SeasonalChart.MaxData = MaxData
- case "趋势":
- var MinData, MaxData float64
- for i, row := range sheet.Rows {
- if i == 0 {
- continue
- }
- var date string
- var dataTimestamp int64
- if row.Cells[0].Type() == xlsx.CellTypeNumeric {
- dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64)
- tmpTime := xlsx.TimeFromExcelTime(dataNum, false)
- date = tmpTime.Format(utils.FormatDate)
- dataTimestamp = tmpTime.UnixMilli()
- } else {
- timeDate, _ := time.Parse(utils.FormatDateTime, date)
- date = timeDate.Format(utils.FormatDate)
- dataTimestamp = timeDate.UnixMilli()
- }
- fv, _ := row.Cells[1].Float()
- if MinData == 0 || fv < MinData {
- MinData = fv
- }
- if MaxData == 0 || fv > MaxData {
- MaxData = fv
- }
- TrendChart.DataList = append(TrendChart.DataList, &EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp})
- }
- TrendChart.MaxData = MaxData
- TrendChart.MinData = MinData
- case "残差":
- var MinData, MaxData float64
- for i, row := range sheet.Rows {
- if i == 0 {
- continue
- }
- var date string
- var dataTimestamp int64
- if row.Cells[0].Type() == xlsx.CellTypeNumeric {
- dataNum, _ := strconv.ParseFloat(row.Cells[0].Value, 64)
- tmpTime := xlsx.TimeFromExcelTime(dataNum, false)
- date = tmpTime.Format(utils.FormatDate)
- dataTimestamp = tmpTime.UnixMilli()
- } else {
- timeDate, _ := time.Parse(utils.FormatDateTime, date)
- date = timeDate.Format(utils.FormatDate)
- dataTimestamp = timeDate.UnixMilli()
- }
- fv, _ := row.Cells[1].Float()
- if MinData == 0 || fv < MinData {
- MinData = fv
- }
- if MaxData == 0 || fv > MaxData {
- MaxData = fv
- }
- ResidualChart.DataList = append(ResidualChart.DataList, &EdbData{DataTime: date, Value: fv, DataTimestamp: dataTimestamp})
- }
- ResidualChart.MaxData = MaxData
- ResidualChart.MinData = MinData
- }
- }
- return
- }
- func execStlPythonCode(path, toPath string, period, seasonal, trend, trendDeg, seasonalDeg, lowPassDeg int, fraction float64, robust bool) (err error) {
- pythonCode := `
- import pandas as pd
- from statsmodels.tsa.seasonal import STL
- from statsmodels.nonparametric.smoothers_lowess import lowess
- from statsmodels.tsa.stattools import adfuller
- from statsmodels.stats.diagnostic import acorr_ljungbox
- import numpy as np
- import json
- import warnings
- warnings.filterwarnings('ignore')
- file_path = r"%s"
- df = pd.read_excel(file_path, parse_dates=['日期'])
- df.set_index('日期', inplace=True)
- period = %d
- seasonal = %d
- trend = %d
- fraction = %g
- seasonal_deg = %d
- trend_deg = %d
- low_pass_deg = %d
- robust = %s
- stl = STL(
- df['值'],
- period=period,
- seasonal=seasonal,
- trend=trend,
- low_pass=None,
- seasonal_deg=seasonal_deg,
- trend_deg=trend_deg,
- low_pass_deg=low_pass_deg,
- seasonal_jump=1,
- trend_jump=1,
- low_pass_jump=1,
- robust=robust
- )
- result = stl.fit()
- smoothed = lowess(df['值'], df.index, frac=fraction)
- trend_lowess = smoothed[:, 1]
- # 季节图
- seasonal_component = result.seasonal
- # 趋势图
- trend_lowess_series = pd.Series(trend_lowess, index=df.index)
- # 残差图
- residual_component = df['值'] - trend_lowess - seasonal_component
- # 计算打印残差的均值
- residual_mean = np.mean(residual_component)
- # 计算打印残差的方差
- residual_var = np.std(residual_component)
- # 计算打印残差的ADF检验结果, 输出p-value
- adf_result = adfuller(residual_component)
- # 根据p-value判断是否平稳
- lb_test = acorr_ljungbox(residual_component, lags=period, return_df=True)
- output_file = r"%s"
- with pd.ExcelWriter(output_file) as writer:
- # 保存季节图
- pd.Series(seasonal_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='季节', index=False)
- # 保存趋势图
- trend_lowess_series.to_frame(name='值').reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='趋势', index=False)
- # 保存残差图
- pd.Series(residual_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='残差', index=False)
- output = json.dumps({
- 'residual_mean': residual_mean,
- 'residual_var': residual_var,
- 'adf_p_value': adf_result[1],
- 'lb_test_p_value': lb_test['lb_pvalue'].values[0],
- 'lb_test_stat': lb_test['lb_stat'].values[0]
- })
- print(output)
- `
- robustStr := "True"
- if !robust {
- robustStr = "False"
- }
- pythonCode = fmt.Sprintf(pythonCode, path, period, seasonal, trend, fraction, seasonalDeg, trendDeg, lowPassDeg, robustStr, toPath)
- cmd := exec.Command(`python3`, "-c", pythonCode)
- _, err = cmd.CombinedOutput()
- if err != nil {
- return
- }
- defer cmd.Process.Kill()
- return
- }
|