123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- package stl
- import (
- "encoding/json"
- "eta/eta_api/models/data_manage"
- "eta/eta_api/models/data_manage/stl"
- "eta/eta_api/models/data_manage/stl/request"
- "eta/eta_api/models/data_manage/stl/response"
- "eta/eta_api/services/data/data_manage_permission"
- "eta/eta_api/services/elastic"
- "eta/eta_api/utils"
- "fmt"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- "github.com/rdlucklib/rdluck_tools/paging"
- "github.com/tealeg/xlsx"
- )
- const (
- ALL_DATE = iota + 1
- LAST_N_YEARS
- RANGE_DATE
- RANGE_DATE_TO_NOW
- )
- var EDB_DATA_CALCULATE_STL_TREND_CACHE = `eta:stl_decompose:trend:config_id:`
- var EDB_DATA_CALCULATE_STL_SEASONAL_CACHE = `eta:stl_decompose:seasonal:config_id:`
- var EDB_DATA_CALCULATE_STL_RESIDUAL_CACHE = `eta:stl_decompose:residual:config_id:`
- func GenerateStlEdbData(req *request.StlConfigReq, adminId int) (resp *response.StlPreviewResp, msg string, err error) {
- edbInfo, err := data_manage.GetEdbInfoById(req.EdbInfoId)
- if err != nil {
- if err.Error() == utils.ErrNoRow() {
- msg = "指标不存在"
- return
- }
- msg = "获取指标信息失败"
- return
- }
- var condition string
- var pars []interface{}
- switch req.DataRangeType {
- case ALL_DATE:
- case LAST_N_YEARS:
- condition += " AND data_time >=?"
- year := time.Now().Year()
- lastDate := time.Date(year-req.LastNYear, 1, 1, 0, 0, 0, 0, time.Local)
- pars = append(pars, lastDate)
- case RANGE_DATE:
- condition = " AND data_time >=? AND data_time <=?"
- pars = append(pars, req.StartDate, req.EndDate)
- case RANGE_DATE_TO_NOW:
- condition = " AND data_time >=?"
- pars = append(pars, req.StartDate)
- }
- condition += " AND edb_code =?"
- pars = append(pars, edbInfo.EdbCode)
- edbData, err := data_manage.GetAllEdbDataListByCondition(condition, pars, edbInfo.Source, edbInfo.SubSource)
- if err != nil {
- msg = "获取指标数据失败"
- return
- }
- var condMsg string
- if req.Period < 2 || req.Period > len(edbData) {
- condMsg += "period必须是一个大于等于2的正整数,且必须小于时间序列的长度"
- }
- if req.Seasonal < 3 || req.Seasonal%2 == 0 || req.Seasonal <= req.Period {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "seasonal必须是一个大于等于3的奇整数,且必须大于period"
- }
- if req.Trend < 3 || req.Trend%2 == 0 || req.Trend <= req.Period {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "trend必须是一个大于等于3的奇整数,且必须大于period"
- }
- if req.Fraction < 0 || req.Fraction > 1 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "fraction必须是一个介于[0-1]之间"
- }
- if 1 > req.TrendDeg || req.TrendDeg > 5 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "trend_deg请设置成1-5的整数"
- }
- if 1 > req.SeasonalDeg || req.SeasonalDeg > 5 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "seasonal_deg请设置成1-5的整数"
- }
- if 1 > req.LowPassDeg || req.LowPassDeg > 5 {
- if condMsg != "" {
- condMsg += "\n"
- }
- condMsg += "low_pass_deg请设置成1-5的整数"
- }
- if condMsg != "" {
- msg = condMsg
- err = fmt.Errorf("参数错误")
- return
- }
- dir, _ := os.Executable()
- exPath := filepath.Dir(dir) + "/static/stl_tmp"
- err = CheckOsPathAndMake(exPath)
- if err != nil {
- msg = "计算失败"
- return
- }
- loadFilePath := exPath + "/" + strconv.Itoa(adminId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + ".xlsx"
- err = SaveToExcel(edbData, loadFilePath)
- if err != nil {
- msg = "保存数据到Excel失败"
- return
- }
- defer os.Remove(loadFilePath)
- saveFilePath := exPath + "/" + strconv.Itoa(adminId) + "_" + time.Now().Format(utils.FormatDateTimeUnSpace) + "_res" + ".xlsx"
- result, err := execStlPythonCode(loadFilePath, saveFilePath, req.Period, req.Seasonal, req.Trend, req.TrendDeg, req.SeasonalDeg, req.LowPassDeg, req.Fraction, req.Robust)
- if err != nil {
- msg = "计算失败,请重新选择指标和参数后计算"
- }
- trendChart, seasonalChart, residualChart, err := ParseStlExcel(saveFilePath)
- if err != nil {
- msg = "解析Excel失败"
- return
- }
- defer os.Remove(saveFilePath)
- resp = new(response.StlPreviewResp)
- resp.OriginEdbInfo.Title = edbInfo.EdbName
- resp.OriginEdbInfo.Frequency = edbInfo.Frequency
- resp.OriginEdbInfo.Unit = edbInfo.Unit
- resp.OriginEdbInfo.DataList = formatEdbData(edbData)
- resp.TrendChartInfo = trendChart
- resp.TrendChartInfo.Title = edbInfo.EdbName + "Trend"
- resp.TrendChartInfo.Frequency = edbInfo.Frequency
- resp.TrendChartInfo.Unit = edbInfo.Unit
- resp.SeasonalChartInfo = seasonalChart
- resp.SeasonalChartInfo.Title = edbInfo.EdbName + "Seasonal"
- resp.SeasonalChartInfo.Frequency = edbInfo.Frequency
- resp.SeasonalChartInfo.Unit = edbInfo.Unit
- resp.ResidualChartInfo = residualChart
- resp.ResidualChartInfo.Title = edbInfo.EdbName + "Residual"
- resp.ResidualChartInfo.Frequency = edbInfo.Frequency
- resp.ResidualChartInfo.Unit = edbInfo.Unit
- resp.EvaluationResult.Mean = strconv.FormatFloat(result.ResidualMean, 'f', 4, 64)
- resp.EvaluationResult.Std = strconv.FormatFloat(result.ResidualVar, 'f', 4, 64)
- resp.EvaluationResult.AdfPValue = strconv.FormatFloat(result.AdfPValue, 'f', -1, 64)
- resp.EvaluationResult.LjungBoxPValue = strconv.FormatFloat(result.LbTestPValue, 'f', -1, 64)
- utils.Rc.Put(EDB_DATA_CALCULATE_STL_TREND_CACHE+strconv.Itoa(req.CalculateStlConfigId)+":"+resp.TrendChartInfo.Title, trendChart, time.Hour)
- utils.Rc.Put(EDB_DATA_CALCULATE_STL_TREND_CACHE+strconv.Itoa(req.CalculateStlConfigId)+":"+resp.SeasonalChartInfo.Title, seasonalChart, time.Hour)
- utils.Rc.Put(EDB_DATA_CALCULATE_STL_TREND_CACHE+strconv.Itoa(req.CalculateStlConfigId)+":"+resp.ResidualChartInfo.Title, residualChart, time.Hour)
- return
- }
- func formatEdbData(items []*data_manage.EdbData) []*response.EdbData {
- res := make([]*response.EdbData, 0, len(items))
- for _, item := range items {
- res = append(res, &response.EdbData{
- DataTime: item.DataTime,
- Value: strconv.FormatFloat(item.Value, 'f', -1, 64),
- })
- }
- return res
- }
- func CheckOsPathAndMake(path string) (err error) {
- if _, er := os.Stat(path); os.IsNotExist(er) {
- err = os.MkdirAll(path, os.ModePerm)
- }
- return
- }
- func ParseStlExcel(excelPath string) (TrendChart, SeasonalChart, ResidualChart response.ChartEdbInfo, err error) {
- file, err := xlsx.OpenFile(excelPath)
- if err != nil {
- return
- }
- for _, sheet := range file.Sheets {
- switch sheet.Name {
- case "季节":
- for i, row := range sheet.Rows {
- if i == 0 {
- continue
- }
- date := row.Cells[0].String()
- date = strings.Split(date, " ")[0]
- fv, _ := row.Cells[1].Float()
- value := strconv.FormatFloat(fv, 'f', 4, 64)
- SeasonalChart.DataList = append(SeasonalChart.DataList, &response.EdbData{DataTime: date, Value: value})
- }
- case "趋势":
- for i, row := range sheet.Rows {
- if i == 0 {
- continue
- }
- date := row.Cells[0].String()
- date = strings.Split(date, " ")[0]
- fv, _ := row.Cells[1].Float()
- value := strconv.FormatFloat(fv, 'f', 4, 64)
- TrendChart.DataList = append(TrendChart.DataList, &response.EdbData{DataTime: date, Value: value})
- }
- case "残差":
- for i, row := range sheet.Rows {
- if i == 0 {
- continue
- }
- date := row.Cells[0].String()
- date = strings.Split(date, " ")[0]
- fv, _ := row.Cells[1].Float()
- value := strconv.FormatFloat(fv, 'f', 4, 64)
- ResidualChart.DataList = append(ResidualChart.DataList, &response.EdbData{DataTime: date, Value: value})
- }
- }
- }
- return
- }
- func SaveToExcel(data []*data_manage.EdbData, filePath string) (err error) {
- xlsxFile := xlsx.NewFile()
- sheetNew, err := xlsxFile.AddSheet("Tmp")
- if err != nil {
- return
- }
- titleRow := sheetNew.AddRow()
- titleRow.AddCell().SetString("日期")
- titleRow.AddCell().SetString("值")
- for i, d := range data {
- row := sheetNew.Row(i + 1)
- row.AddCell().SetString(d.DataTime)
- row.AddCell().SetFloat(d.Value)
- }
- err = xlsxFile.Save(filePath)
- if err != nil {
- return
- }
- return
- }
- type STLResult struct {
- ResidualMean float64 `json:"residual_mean"`
- ResidualVar float64 `json:"residual_var"`
- AdfPValue float64 `json:"adf_p_value"`
- LbTestPValue float64 `json:"lb_test_p_value"`
- LbTestStat float64 `json:"lb_test_stat"`
- }
- func execStlPythonCode(path, toPath string, period, seasonal, trend, trendDeg, seasonalDeg, lowPassDeg int, fraction float64, robust bool) (stlResult *STLResult, err error) {
- pythonCode := `
- import pandas as pd
- from statsmodels.tsa.seasonal import STL
- from statsmodels.nonparametric.smoothers_lowess import lowess
- from statsmodels.tsa.stattools import adfuller
- from statsmodels.stats.diagnostic import acorr_ljungbox
- import numpy as np
- import json
- import warnings
- warnings.filterwarnings('ignore')
- file_path = r"%s"
- df = pd.read_excel(file_path, parse_dates=['日期'])
- df.set_index('日期', inplace=True)
- period = %d
- seasonal = %d
- trend = %d
- fraction = %g
- seasonal_deg = %d
- trend_deg = %d
- low_pass_deg = %d
- robust = %s
- stl = STL(
- df['值'],
- period=period,
- seasonal=seasonal,
- trend=trend,
- low_pass=None,
- seasonal_deg=seasonal_deg,
- trend_deg=trend_deg,
- low_pass_deg=low_pass_deg,
- seasonal_jump=1,
- trend_jump=1,
- low_pass_jump=1,
- robust=robust
- )
- result = stl.fit()
- smoothed = lowess(df['值'], df.index, frac=fraction)
- trend_lowess = smoothed[:, 1]
- # 季节图
- seasonal_component = result.seasonal
- # 趋势图
- trend_lowess_series = pd.Series(trend_lowess, index=df.index)
- # 残差图
- residual_component = df['值'] - trend_lowess - seasonal_component
- # 计算打印残差的均值
- residual_mean = np.mean(residual_component)
- # 计算打印残差的方差
- residual_var = np.std(residual_component)
- # 计算打印残差的ADF检验结果, 输出p-value
- adf_result = adfuller(residual_component)
- # 根据p-value判断是否平稳
- lb_test = acorr_ljungbox(residual_component, lags=period, return_df=True)
- output_file = r"%s"
- with pd.ExcelWriter(output_file) as writer:
- # 保存季节图
- pd.Series(seasonal_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='季节', index=False)
- # 保存趋势图
- trend_lowess_series.to_frame(name='值').reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='趋势', index=False)
- # 保存残差图
- pd.Series(residual_component, index=df.index, name='值').to_frame().reset_index().rename(columns={'index': '日期'}).to_excel(writer, sheet_name='残差', index=False)
- output = json.dumps({
- 'residual_mean': residual_mean,
- 'residual_var': residual_var,
- 'adf_p_value': adf_result[1],
- 'lb_test_p_value': lb_test['lb_pvalue'].values[0],
- 'lb_test_stat': lb_test['lb_stat'].values[0]
- })
- print(output)
- `
- robustStr := "True"
- if !robust {
- robustStr = "False"
- }
- pythonCode = fmt.Sprintf(pythonCode, path, period, seasonal, trend, fraction, seasonalDeg, trendDeg, lowPassDeg, robustStr, toPath)
- cmd := exec.Command(`D:\conda\envs\py311\python`, "-c", pythonCode)
- output, err := cmd.CombinedOutput()
- if err != nil {
- return
- }
- defer cmd.Process.Kill()
- if err = json.Unmarshal(output, &stlResult); err != nil {
- return
- }
- return
- }
- func SaveStlConfig(req *request.StlConfigReq, adminId int) (insertId int64, err error) {
- conf := new(stl.CalculateStlConfig)
- b, _ := json.Marshal(req)
- conf.Config = string(b)
- conf.SysUserId = adminId
- conf.CreateTime = time.Now()
- conf.ModifyTime = time.Now()
- insertId, err = conf.Insert()
- return
- }
- func SearchEdbInfoWithStl(adminId int, keyWord string, currentIndex, pageSize int) (resp data_manage.EdbInfoFilterDataResp, msg string, err error) {
- var edbInfoList []*data_manage.EdbInfoList
- noPermissionEdbInfoIdList := make([]int, 0) //无权限指标
- // 获取当前账号的不可见指标
- {
- obj := data_manage.EdbInfoNoPermissionAdmin{}
- confList, er := obj.GetAllListByAdminId(adminId)
- if er != nil && er.Error() != utils.ErrNoRow() {
- msg = "获取失败"
- err = fmt.Errorf("获取不可见指标配置数据失败,Err:" + er.Error())
- return
- }
- for _, v := range confList {
- noPermissionEdbInfoIdList = append(noPermissionEdbInfoIdList, v.EdbInfoId)
- }
- }
- if currentIndex <= 0 {
- currentIndex = 1
- }
- startSize := utils.StartIndex(currentIndex, pageSize)
- // 是否走ES
- isEs := false
- var total int64
- if keyWord != "" {
- var keyWordArr []string
- keyWordArr = append(keyWordArr, keyWord)
- newKeyWord := strings.Split(keyWord, " ")
- keyWordArr = append(keyWordArr, newKeyWord...)
- frequencyList := []string{"日度", "周度", "旬度", "月度", "季度"}
- // 普通的搜索
- total, edbInfoList, err = elastic.SearchEdbInfoDataByfrequency(utils.DATA_INDEX_NAME, keyWord, startSize, pageSize, 0, frequencyList, noPermissionEdbInfoIdList)
- isEs = true
- } else {
- var condition string
- var pars []interface{}
- // 普通指标
- condition += ` AND edb_info_type = ? `
- pars = append(pars, 0)
- // 无权限指标id
- lenNoPermissionEdbInfoIdList := len(noPermissionEdbInfoIdList)
- if lenNoPermissionEdbInfoIdList > 0 {
- condition += ` AND edb_info_id not in (` + utils.GetOrmInReplace(lenNoPermissionEdbInfoIdList) + `) `
- pars = append(pars, noPermissionEdbInfoIdList)
- }
- //频度
- condition += ` AND frequency IN ('日度', '周度', '旬度', '月度', '季度') `
- total, edbInfoList, err = data_manage.GetEdbInfoFilterList(condition, pars, startSize, pageSize)
- }
- if err != nil {
- edbInfoList = make([]*data_manage.EdbInfoList, 0)
- }
- page := paging.GetPaging(currentIndex, pageSize, int(total))
- edbInfoListLen := len(edbInfoList)
- classifyIdList := make([]int, 0)
- for i := 0; i < edbInfoListLen; i++ {
- edbInfoList[i].EdbNameAlias = edbInfoList[i].EdbName
- classifyIdList = append(classifyIdList, edbInfoList[i].ClassifyId)
- }
- // 当前列表中的分类map
- classifyMap := make(map[int]*data_manage.EdbClassify)
- if edbInfoListLen > 0 {
- classifyList, er := data_manage.GetEdbClassifyByIdList(classifyIdList)
- if er != nil {
- msg = "获取失败"
- err = fmt.Errorf("获取分类列表失败,Err:" + er.Error())
- return
- }
- for _, v := range classifyList {
- classifyMap[v.ClassifyId] = v
- }
- // 获取所有有权限的指标和分类
- permissionEdbIdList, permissionClassifyIdList, er := data_manage_permission.GetUserEdbAndClassifyPermissionList(adminId, 0, 0)
- if er != nil {
- msg = "获取失败"
- err = fmt.Errorf("获取所有有权限的指标和分类失败,Err:" + er.Error())
- return
- }
- // 如果是ES的话,需要重新查一下指标的信息,主要是为了把是否授权字段找出来
- if isEs {
- edbInfoIdList := make([]int, 0)
- for i := 0; i < edbInfoListLen; i++ {
- edbInfoIdList = append(edbInfoIdList, edbInfoList[i].EdbInfoId)
- tmpEdbInfo := edbInfoList[i]
- if currClassify, ok := classifyMap[tmpEdbInfo.ClassifyId]; ok {
- edbInfoList[i].HaveOperaAuth = data_manage_permission.CheckEdbPermissionByPermissionIdList(tmpEdbInfo.IsJoinPermission, currClassify.IsJoinPermission, tmpEdbInfo.EdbInfoId, tmpEdbInfo.ClassifyId, permissionEdbIdList, permissionClassifyIdList)
- }
- }
- tmpEdbList, er := data_manage.GetEdbInfoByIdList(edbInfoIdList)
- if er != nil {
- msg = "获取失败"
- err = fmt.Errorf("获取所有有权限的指标失败,Err:" + er.Error())
- return
- }
- edbInfoMap := make(map[int]*data_manage.EdbInfo)
- for _, v := range tmpEdbList {
- edbInfoMap[v.EdbInfoId] = v
- }
- for i := 0; i < edbInfoListLen; i++ {
- tmpEdbInfo, ok := edbInfoMap[edbInfoList[i].EdbInfoId]
- if !ok {
- continue
- }
- edbInfoList[i].IsJoinPermission = tmpEdbInfo.IsJoinPermission
- }
- }
- // 权限校验
- for i := 0; i < edbInfoListLen; i++ {
- tmpEdbInfoItem := edbInfoList[i]
- if currClassify, ok := classifyMap[tmpEdbInfoItem.ClassifyId]; ok {
- edbInfoList[i].HaveOperaAuth = data_manage_permission.CheckEdbPermissionByPermissionIdList(tmpEdbInfoItem.IsJoinPermission, currClassify.IsJoinPermission, tmpEdbInfoItem.EdbInfoId, tmpEdbInfoItem.ClassifyId, permissionEdbIdList, permissionClassifyIdList)
- }
- }
- }
- for i := 0; i < edbInfoListLen; i++ {
- for j := 0; j < edbInfoListLen; j++ {
- if (edbInfoList[i].EdbNameAlias == edbInfoList[j].EdbNameAlias) &&
- (edbInfoList[i].EdbInfoId != edbInfoList[j].EdbInfoId) &&
- !(strings.Contains(edbInfoList[i].EdbName, edbInfoList[i].SourceName)) {
- edbInfoList[i].EdbName = edbInfoList[i].EdbName + "(" + edbInfoList[i].SourceName + ")"
- }
- }
- }
- //新增搜索词记录
- {
- searchKeyword := new(data_manage.SearchKeyword)
- searchKeyword.KeyWord = keyWord
- searchKeyword.CreateTime = time.Now()
- go data_manage.AddSearchKeyword(searchKeyword)
- }
- resp = data_manage.EdbInfoFilterDataResp{
- Paging: page,
- List: edbInfoList,
- }
- return
- }
|