base_from_kpler.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_data_analysis/models"
  6. "eta/eta_data_analysis/services/kpler"
  7. "eta/eta_data_analysis/utils"
  8. "fmt"
  9. "io/fs"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "github.com/patrickmn/go-cache"
  16. )
  17. func KplerExcelDataWatch(cont context.Context) (err error) {
  18. fmt.Println("kplerExcelWatch start")
  19. utils.FileLog.Info("kplerExcelWatch start")
  20. defer func() {
  21. if err != nil {
  22. fmt.Println("kplerExcelDataWatch Err:" + err.Error())
  23. utils.FileLog.Info(fmt.Sprintf("kplerExcelDataWatch, Err: %s", err))
  24. }
  25. }()
  26. cacheClient := utils.CacheClient
  27. if cacheClient == nil {
  28. utils.CacheClient = cache.New(365*24*time.Hour, 365*24*time.Hour)
  29. }
  30. err = filepath.Walk(utils.KplerExcelFilePath, func(path string, info fs.FileInfo, err error) error {
  31. if err != nil {
  32. return err
  33. }
  34. if !info.IsDir() {
  35. fileInfo, e := os.Stat(path)
  36. if e != nil {
  37. err = e
  38. fmt.Println("os.Stat:", err.Error())
  39. utils.FileLog.Info(fmt.Sprintf("os.Stat, Err: %s", err))
  40. return err
  41. }
  42. winFileAttr := fileInfo.Sys().(*syscall.Win32FileAttributeData)
  43. modifyTimeStr := utils.SecondToTime(winFileAttr.LastWriteTime.Nanoseconds() / 1e9).Format(utils.FormatDateTime)
  44. fmt.Println("文件的修改时间modifyTimeStr:", modifyTimeStr)
  45. existModifyTime, ok := cacheClient.Get(path)
  46. fmt.Println("缓存里的时间existModifyTime:", existModifyTime)
  47. if ok {
  48. existModifyTimeStr := existModifyTime.(string)
  49. if existModifyTimeStr != modifyTimeStr {
  50. err = GetKplerDataByExcel(path)
  51. }
  52. } else {
  53. err = GetKplerDataByExcel(path)
  54. }
  55. cacheClient.Delete(path)
  56. cacheClient.Set(path, modifyTimeStr, 24*time.Hour)
  57. }
  58. return nil
  59. })
  60. return
  61. }
  62. // Main function for standalone testing
  63. func GetKplerDataByExcel(filePath string) (err error) {
  64. //filePath = "services/kpler/crude.xlsx"
  65. fmt.Println("Starting Kpler data processing...")
  66. // Process the Excel data
  67. indexData, err :=kpler.ProcessKplerData(filePath)
  68. if err != nil {
  69. fmt.Printf("Error processing Excel data: %v\n", err)
  70. return
  71. }
  72. indexList := make([]*models.HandleKplerExcelData, 0)
  73. // Print the processed data
  74. for k, index := range indexData {
  75. // 解析请求参数
  76. if index.Request != "" {
  77. flowsRequestItem, err := kpler.ParseSpecificKplerFormulaV2(index.Request)
  78. if err != nil {
  79. fmt.Printf("Error parsing formula: %v\n", err)
  80. continue
  81. }
  82. indexName := fmt.Sprintf("%s_%s", index.Title, index.Name)
  83. unit := flowsRequestItem.Unit
  84. sort := k
  85. classifyName := ""
  86. productNameSlice := flowsRequestItem.Products
  87. productNames := ""
  88. if len(productNameSlice) > 0 {
  89. for _, productName := range productNameSlice {
  90. if classifyName == "" {
  91. classifyName = productName.Name
  92. }
  93. productNames += productName.Name + ","
  94. }
  95. }
  96. productNames = strings.TrimSuffix(productNames, ",")
  97. fromZoneNameSlice := flowsRequestItem.Origins
  98. fromZoneNames := ""
  99. if len(fromZoneNameSlice) > 0 {
  100. for _, fromZoneName := range fromZoneNameSlice {
  101. fromZoneNames += fromZoneName.Name + ","
  102. }
  103. }
  104. fromZoneNames = strings.TrimSuffix(fromZoneNames, ",")
  105. toZoneNames := ""
  106. toZoneNameSlice := flowsRequestItem.Destinations
  107. if len(toZoneNameSlice) > 0 {
  108. for _, toZoneName := range toZoneNameSlice {
  109. toZoneNames += toZoneName.Name + ","
  110. }
  111. }
  112. toZoneNames = strings.TrimSuffix(toZoneNames, ",")
  113. flowDirection := flowsRequestItem.FlowDirection
  114. granularity := flowsRequestItem.Granularity
  115. split := flowsRequestItem.Split
  116. excelDataMap := make(map[string]string)
  117. if len(index.DataPoints) > 0 {
  118. for _, dataPoint := range index.DataPoints {
  119. excelDataMap[dataPoint.EndDate] = dataPoint.Value
  120. }
  121. }
  122. tmp := models.HandleKplerExcelData{
  123. IndexName: indexName,
  124. Unit: unit,
  125. Sort: sort,
  126. ClassifyName: classifyName,
  127. ProductNames: productNames,
  128. FromZoneNames: fromZoneNames,
  129. ToZoneNames: toZoneNames,
  130. FlowDirection: flowDirection,
  131. Granularity: granularity,
  132. Split: split,
  133. SplitName: index.Name,
  134. ExcelQueryUrl: index.Request,
  135. ExcelDataMap: excelDataMap,
  136. }
  137. indexList = append(indexList, &tmp)
  138. }
  139. }
  140. if len(indexList) > 0 {
  141. params := make(map[string]interface{})
  142. params["List"] = indexList
  143. params["TerminalCode"] = ""
  144. result, e := PostEdbLib(params, utils.LIB_ROUTE_KPLER_DATA)
  145. if e != nil {
  146. b, _ := json.Marshal(params)
  147. utils.FileLog.Info(fmt.Sprintf("sheet :GetKplerDataByExcel PostEdbLib err: %s, params: %s", e.Error(), string(b)))
  148. return
  149. }
  150. resp := new(models.BaseEdbLibResponse)
  151. if e := json.Unmarshal(result, &resp); e != nil {
  152. utils.FileLog.Info(fmt.Sprintf("sheet :GetKplerDataByExcel json.Unmarshal err: %s", e))
  153. return
  154. }
  155. if resp.Ret != 200 {
  156. utils.FileLog.Info(fmt.Sprintf("sheet :GetKplerDataByExcel Msg: %s, ErrMsg: %s", resp.Msg, resp.ErrMsg))
  157. return
  158. }
  159. }
  160. // 传递list给指标服务
  161. fmt.Println("GetKplerDataByExcel completed successfully!")
  162. return
  163. }
  164. // 定时调用python脚本刷新kpler
  165. func RefreshKplerByExcel(cont context.Context) (err error) {
  166. if utils.KplerRefreshUrl == "" {
  167. return
  168. }
  169. //查询utils.KplerExcelFilePath目录下所有excel文件
  170. files, err := filepath.Glob(utils.KplerExcelFilePath + "/*.xlsx")
  171. if err != nil {
  172. utils.FileLog.Info("RefreshKplerByExcel Err:" + err.Error())
  173. return
  174. }
  175. for _, file := range files {
  176. fmt.Println("RefreshKplerByExcel file:" + file)
  177. kplerRefreshUrl := fmt.Sprintf("%s/kpler/refresh?FilePath=%s", utils.KplerRefreshUrl, file)
  178. body, er := HttpGet(kplerRefreshUrl)
  179. if er != nil {
  180. utils.FileLog.Info("RefreshKplerByExcel Err:" + er.Error())
  181. return
  182. }
  183. utils.FileLog.Info("RefreshKplerByExcel Result:" + string(body))
  184. }
  185. return
  186. }