processor_business_logic.go 13 KB


  1. // Package ruizide
  2. // @Author gmy 2024/10/21 10:50:00
  3. package main
  4. import (
  5. "encoding/json"
  6. "eta/eta_data_analysis/models"
  7. "eta/eta_data_analysis/utils"
  8. "fmt"
  9. "github.com/beego/beego/v2/core/logs"
  10. "math"
  11. "strconv"
  12. "strings"
  13. "unicode"
  14. )
  15. var classifyMap = map[string]string{
  16. "Road Index": "analytics library",
  17. "Road Active Fleet": "analytics library",
  18. "Aviation Index": "analytics library",
  19. "Aviation Active Fleet": "analytics library",
  20. "Demand-Gasoline": "analytics library",
  21. "Demand - Diesel": "analytics library",
  22. "Demand - Jet Fuel": "analytics library",
  23. "Demand - Maritime Bunker": "analytics library",
  24. "Oil_Demand_Signals_Weekly_Report": "analytics library",
  25. }
  26. // RoadIndexProcessor
  27. // @Description: AnalyticsLibrary处理器
  28. type RoadIndexProcessor struct{}
  29. func (p *RoadIndexProcessor) Process(tableName string, sheetName string, rowIndex int, rowData []string) ([]models.BaseFromRzdData, error) {
  30. logs.Info("Processing AnalyticsLibrary...")
  31. if rowIndex < 4 {
  32. return nil, nil
  33. }
  34. frequency := "日度"
  35. unit := "%"
  36. indexNameColOne := "Index"
  37. indexNameColTwo := "Index 7DMA"
  38. // step_1: 分类
  39. classifyId, err := dealClassify(tableName, sheetName)
  40. if err != nil {
  41. return nil, err
  42. }
  43. logs.Info("classifyId: %v", classifyId)
  44. // step_2: 指标
  45. // 指标名称
  46. indexNameOne := sheetName + "/" + rowData[len(rowData)-3] + "/" + indexNameColOne
  47. indexNameTwo := sheetName + "/" + rowData[len(rowData)-3] + "/" + indexNameColTwo
  48. // 生成指标编码
  49. indexCodeOne, err := getIndexId(sheetName, strings.ToLower(rowData[len(rowData)-3]), indexNameColOne)
  50. indexCodeTwo, err := getIndexId(sheetName, strings.ToLower(rowData[len(rowData)-3]), indexNameColTwo)
  51. var indexInfoMap map[string]string
  52. indexInfoMap[indexCodeOne] = indexNameOne
  53. indexInfoMap[indexCodeTwo] = indexNameTwo
  54. var indexInfoList []*models.IndexInfo
  55. valueOne, err := strconv.ParseFloat(rowData[len(rowData)-2], 64)
  56. if err != nil {
  57. return nil, err
  58. }
  59. dataTimeOne := rowData[1]
  60. formatOne, err := utils.ConvertDateFormat(dataTimeOne)
  61. if err != nil {
  62. return nil, err
  63. }
  64. indexInfoList = append(indexInfoList, &models.IndexInfo{
  65. IndexName: indexNameOne,
  66. IndexCode: indexCodeOne,
  67. Value: valueOne,
  68. DataTime: formatOne,
  69. })
  70. valueTwo, err := strconv.ParseFloat(rowData[len(rowData)-1], 64)
  71. if err != nil {
  72. return nil, err
  73. }
  74. indexInfoList = append(indexInfoList, &models.IndexInfo{
  75. IndexName: indexNameTwo,
  76. IndexCode: indexCodeTwo,
  77. Value: valueTwo,
  78. DataTime: formatOne,
  79. })
  80. indexInfoList, err = dealIndex(indexInfoList, frequency, unit, classifyId)
  81. if err != nil {
  82. return nil, err
  83. }
  84. logs.Info("indexInfoList: %v", indexInfoList)
  85. // step_3: 指标数据
  86. dataList, err := dealData(indexInfoList)
  87. if err != nil {
  88. return nil, err
  89. }
  90. logs.Info("dataList: %v", dataList)
  91. return dataList, err
  92. }
  93. // ReDashboardExportOneProcessor
  94. // @Description: ReDashboardExportOneProcessor处理器
  95. type ReDashboardExportOneProcessor struct{}
  96. /*
  97. func (p *ReDashboardExportOneProcessor) Process(tableName string, sheetName string, rowIndex int, rowData []string) ([]models.BaseFromRzdData, error) {
  98. logs.Info("Processing ReDashboardExportOne...")
  99. if rowIndex < 4 {
  100. return nil, nil
  101. }
  102. frequency := "季度"
  103. unit := "千桶每天"
  104. indexNameColOne := "Index"
  105. indexNameColTwo := "Index 7DMA"
  106. // step_1: 分类
  107. classifyId, err := dealClassify("cube dashboards", "Supply Revision Analysis")
  108. if err != nil {
  109. return nil, err
  110. }
  111. logs.Info("classifyId: %v", classifyId)
  112. // step_2: 指标
  113. indexOneId, indexTwoId, indexCodeOne, indexCodeTwo, err := dealIndex(sheetName, rowData, indexNameColOne, indexNameColTwo, frequency, unit, classifyId)
  114. if err != nil {
  115. return nil, err
  116. }
  117. logs.Info("indexOneId: %v, indexTwoId: %v, indexCodeOne: %v, indexCodeTwo: %v", indexOneId, indexTwoId, indexCodeOne, indexCodeTwo)
  118. // step_3: 指标数据
  119. dataList, err := dealData(indexOneId, indexTwoId, indexCodeOne, indexCodeTwo, rowData)
  120. if err != nil {
  121. return nil, err
  122. }
  123. logs.Info("dataList: %v", dataList)
  124. return dataList, err
  125. }
  126. */
  127. func dealData(indexInfoList []*models.IndexInfo) ([]models.BaseFromRzdData, error) {
  128. var dataList []models.BaseFromRzdData
  129. for _, indexInfo := range indexInfoList {
  130. paramsLib := make(map[string]interface{})
  131. paramsLib["IndexCode"] = indexInfo.IndexCode
  132. paramsLib["DataTime"] = indexInfo.DataTime
  133. postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_INDEX_DATA_BY_CODE_AND_TIME)
  134. if err != nil {
  135. return nil, err
  136. }
  137. var requestResponse models.RequestResponse[models.BaseFromRzdData]
  138. err = json.Unmarshal(postEdbLib, &requestResponse)
  139. if err != nil {
  140. return nil, err
  141. }
  142. if requestResponse.Data.BaseFromRzdIndexId == 0 {
  143. dataOne := models.BaseFromRzdData{
  144. BaseFromRzdIndexId: indexInfo.IndexInfoId,
  145. CreateTime: utils.GetCurrentTime(),
  146. DataTime: indexInfo.DataTime,
  147. IndexCode: indexInfo.IndexCode,
  148. ModifyTime: utils.GetCurrentTime(),
  149. Value: math.Round(indexInfo.Value*10000) / 10000,
  150. }
  151. dataList = append(dataList, dataOne)
  152. }
  153. }
  154. return dataList, nil
  155. }
  156. func dealIndex(indexInfoList []*models.IndexInfo, frequency string, unit string, classifyId int) ([]*models.IndexInfo, error) {
  157. for _, index := range indexInfoList {
  158. // 处理第一个指标
  159. paramsLib := make(map[string]interface{})
  160. paramsLib["indexCode"] = index.IndexCode
  161. postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_INDEX_BY_CODE)
  162. if err != nil {
  163. return nil, fmt.Errorf("getIndexId() : Failed to get rzd index by code: %v", err)
  164. }
  165. var requestResponse models.RequestResponse[models.BaseFromRzdIndex]
  166. err = json.Unmarshal(postEdbLib, &requestResponse)
  167. if err != nil {
  168. return nil, err
  169. }
  170. if requestResponse.Data.BaseFromRzdIndexId == 0 {
  171. indexOne := models.BaseFromRzdIndex{
  172. CreateTime: utils.GetCurrentTime(),
  173. ModifyTime: utils.GetCurrentTime(),
  174. BaseFromLyClassifyId: classifyId,
  175. IndexCode: index.IndexCode,
  176. IndexName: index.IndexName,
  177. Frequency: frequency,
  178. Unit: unit,
  179. }
  180. // 这里避免服务器宕机 出现唯一索引异常,进行分开保存
  181. postEdbLib, err = httpRequestFill(indexOne, utils.ADD_RZD_INDEX)
  182. if err != nil {
  183. return nil, fmt.Errorf("getIndexId() : Failed to get rzd index by code: %v", err)
  184. }
  185. var requestResponse models.RequestResponse[int]
  186. err = json.Unmarshal(postEdbLib, &requestResponse)
  187. if err != nil {
  188. return nil, err
  189. }
  190. logs.Info("indexOneId: %v", requestResponse.Data)
  191. index.IndexInfoId = requestResponse.Data
  192. } else {
  193. logs.Info("indexOneId: %v", requestResponse.Data.BaseFromRzdIndexId)
  194. index.IndexInfoId = requestResponse.Data.BaseFromRzdIndexId
  195. }
  196. }
  197. return indexInfoList, nil
  198. /*// 处理第一个指标
  199. paramsLib := make(map[string]interface{})
  200. paramsLib["indexCode"] = indexCodeOne
  201. postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_INDEX_BY_CODE)
  202. if err != nil {
  203. return 0, 0, fmt.Errorf("getIndexId() : Failed to get rzd index by code: %v", err)
  204. }
  205. var requestResponse models.RequestResponse[models.BaseFromRzdIndex]
  206. err = json.Unmarshal(postEdbLib, &requestResponse)
  207. if err != nil {
  208. return 0, 0, err
  209. }
  210. if requestResponse.Data.BaseFromRzdIndexId == 0 {
  211. indexOne := models.BaseFromRzdIndex{
  212. CreateTime: utils.GetCurrentTime(),
  213. ModifyTime: utils.GetCurrentTime(),
  214. BaseFromLyClassifyId: classifyId,
  215. IndexCode: indexCodeOne,
  216. IndexName: indexNameOne,
  217. Frequency: frequency,
  218. Unit: unit,
  219. }
  220. // 这里避免服务器宕机 出现唯一索引异常,进行分开保存
  221. postEdbLib, err = httpRequestFill(indexOne, utils.ADD_RZD_INDEX)
  222. if err != nil {
  223. return 0, 0, fmt.Errorf("getIndexId() : Failed to get rzd index by code: %v", err)
  224. }
  225. var requestResponse models.RequestResponse[int]
  226. err = json.Unmarshal(postEdbLib, &requestResponse)
  227. if err != nil {
  228. return 0, 0, err
  229. }
  230. indexOneId = requestResponse.Data
  231. logs.Info("indexOneId: %v", indexOneId)
  232. } else {
  233. indexOneId = requestResponse.Data.BaseFromRzdIndexId
  234. }
  235. // 处理第二个指标
  236. paramsLib = make(map[string]interface{})
  237. paramsLib["indexCode"] = indexCodeTwo
  238. postEdbLib, err = httpRequestFill(paramsLib, utils.GET_RZD_INDEX_BY_CODE)
  239. if err != nil {
  240. return 0, 0, fmt.Errorf("getIndexId() : Failed to get rzd index by code: %v", err)
  241. }
  242. var requestResponseTwo models.RequestResponse[models.BaseFromRzdIndex]
  243. err = json.Unmarshal(postEdbLib, &requestResponseTwo)
  244. if err != nil {
  245. return 0, 0, err
  246. }
  247. if requestResponseTwo.Data.BaseFromRzdIndexId == 0 {
  248. indexTwo := models.BaseFromRzdIndex{
  249. CreateTime: utils.GetCurrentTime(),
  250. ModifyTime: utils.GetCurrentTime(),
  251. BaseFromLyClassifyId: classifyId,
  252. IndexCode: indexCodeTwo,
  253. IndexName: indexNameTwo,
  254. Frequency: frequency,
  255. Unit: unit,
  256. }
  257. // 这里避免服务器宕机 出现唯一索引异常,进行分开保存
  258. var requestResponse models.RequestResponse[int]
  259. postEdbLib, err = httpRequestFill(indexTwo, utils.ADD_RZD_INDEX)
  260. if err != nil {
  261. return 0, 0, fmt.Errorf("getIndexId() : Failed to get rzd index by code: %v", err)
  262. }
  263. err = json.Unmarshal(postEdbLib, &requestResponse)
  264. if err != nil {
  265. return 0, 0, err
  266. }
  267. indexTwoId = requestResponse.Data
  268. logs.Info("indexTwoId: %v", indexTwoId)
  269. } else {
  270. indexTwoId = requestResponseTwo.Data.BaseFromRzdIndexId
  271. }*/
  272. //return indexOneId, indexTwoId, nil
  273. }
  274. func getIndexId(prefix string, area string, suffix string) (string, error) {
  275. prefixWords := strings.Fields(prefix) // 分割字符串为单词
  276. firstLetters := ""
  277. for _, word := range prefixWords {
  278. if len(word) > 0 {
  279. firstLetters += string(unicode.ToLower(rune(word[0]))) // 获取小写的第一个字母
  280. }
  281. }
  282. suffixWords := strings.Fields(suffix) // 分割字符串为单词
  283. SecondLetters := ""
  284. for _, word := range suffixWords {
  285. if len(word) > 0 {
  286. SecondLetters += string(unicode.ToLower(rune(word[0]))) // 获取小写的第一个字母
  287. }
  288. }
  289. indexCode := "rzd" + firstLetters + area + SecondLetters
  290. return indexCode, nil
  291. }
  292. // 处理分类
  293. func dealClassify(tableName, sheetName string) (int, error) {
  294. // 查询一级分类是否存在
  295. paramsLib := make(map[string]interface{})
  296. paramsLib["classifyName"] = classifyMap[tableName]
  297. postEdbLib, err := httpRequestFill(paramsLib, utils.GET_RZD_CLASSIFY_BY_NAME)
  298. if err != nil {
  299. return 0, fmt.Errorf("AnalyticsLibraryProcessor Process() : failed to get classify: %v", err)
  300. }
  301. var requestResponse models.RequestResponse[models.BaseFromRzdClassify]
  302. err = json.Unmarshal(postEdbLib, &requestResponse)
  303. if err != nil {
  304. return 0, err
  305. }
  306. // 处理一级分类
  307. var parentId int
  308. if requestResponse.Data.BaseFromRzdClassifyId == 0 {
  309. // 一级分类不存在,新增一级分类
  310. paramsLib = make(map[string]interface{})
  311. paramsLib["parentId"] = 0
  312. paramsLib["classifyName"] = classifyMap[tableName]
  313. postEdbLib, err = httpRequestFill(paramsLib, utils.ADD_RZD_CLASSIFY)
  314. if err != nil {
  315. return 0, fmt.Errorf("AnalyticsLibraryProcessor Process() : failed to add classify: %v", err)
  316. }
  317. var requestResponse models.RequestResponse[int]
  318. err = json.Unmarshal(postEdbLib, &requestResponse)
  319. if err != nil {
  320. return 0, err
  321. }
  322. parentId = requestResponse.Data
  323. } else {
  324. // 一级分类已存在,使用其 ID
  325. parentId = requestResponse.Data.BaseFromRzdClassifyId
  326. }
  327. // 查询二级分类是否存在
  328. paramsSubLib := make(map[string]interface{})
  329. paramsSubLib["classifyName"] = sheetName // 这里替换成实际的二级分类名称
  330. postSubEdbLib, err := httpRequestFill(paramsSubLib, utils.GET_RZD_CLASSIFY_BY_NAME)
  331. if err != nil {
  332. return 0, fmt.Errorf("AnalyticsLibraryProcessor Process() : failed to get sub classify: %v", err)
  333. }
  334. var subRequestResponse models.RequestResponse[models.BaseFromRzdClassify]
  335. err = json.Unmarshal(postSubEdbLib, &subRequestResponse)
  336. if err != nil {
  337. return 0, err
  338. }
  339. // 新增二级分类
  340. var classifyId int
  341. if subRequestResponse.Data.BaseFromRzdClassifyId == 0 {
  342. paramsLib = make(map[string]interface{})
  343. paramsLib["parentId"] = parentId
  344. paramsLib["classifyName"] = sheetName
  345. postEdbLib, err = httpRequestFill(paramsLib, utils.ADD_RZD_CLASSIFY)
  346. if err != nil {
  347. return 0, fmt.Errorf("AnalyticsLibraryProcessor Process() : failed to add classify: %v", err)
  348. }
  349. var requestResponse models.RequestResponse[int]
  350. err = json.Unmarshal(postEdbLib, &requestResponse)
  351. if err != nil {
  352. return 0, err
  353. }
  354. classifyId = requestResponse.Data
  355. } else {
  356. classifyId = subRequestResponse.Data.BaseFromRzdClassifyId
  357. }
  358. return classifyId, nil
  359. }