processor_business_logic.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // @Author gmy 2024/8/6 10:50:00
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "eta/eta_crawler/models"
  7. "eta/eta_crawler/utils"
  8. "fmt"
  9. "github.com/PuerkitoBio/goquery"
  10. "github.com/beego/beego/v2/core/logs"
  11. "github.com/chromedp/chromedp"
  12. "log"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. )
  17. const (
  18. sourceName = "lysww" // 粮油商务网
  19. )
  20. // ImportCostProcessor
  21. // @Description: 进口成本处理器
  22. type ImportCostProcessor struct{}
  23. func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
  24. fmt.Println("Processing import cost...")
  25. // 实现具体的处理逻辑
  26. return models.BaseFromLyData{}, nil
  27. }
  28. // ProcessingProfitProcessor
  29. // @Description: 加工利润处理器
  30. type ProcessingProfitProcessor struct{}
  31. func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
  32. fmt.Println("Processing processing profit...")
  33. // 实现具体的处理逻辑
  34. return models.BaseFromLyData{}, nil
  35. }
  36. // ProcessingReportProcessor
  37. // @Description: 加工报告处理器
  38. type ProcessingReportProcessor struct {
  39. }
  40. // TableData 用于存储表格的数据
  41. type TableData struct {
  42. Headers []string `json:"headers"`
  43. Rows [][]string `json:"rows"`
  44. }
  45. func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, indexId int) (models.BaseFromLyData, error) {
  46. logs.Info("Processing processing report...")
  47. // 解析关键字
  48. if len(keywords) < 3 {
  49. return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
  50. }
  51. // 拿到 行关键字和列关键字
  52. columnName := keywords[0]
  53. rowName := keywords[1]
  54. // 提取所有表格数据
  55. var tableData []TableData
  56. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  57. if err != nil {
  58. log.Fatal(err)
  59. }
  60. // 选择 id 为 "a_content" 的 div
  61. doc.Find("#a_content").Each(func(index int, item *goquery.Selection) {
  62. item.Find("table").Each(func(index int, table *goquery.Selection) {
  63. var headers []string
  64. var rows [][]string
  65. // 提取表头
  66. table.Find("thead th").Each(func(index int, th *goquery.Selection) {
  67. headers = append(headers, th.Text())
  68. })
  69. // 提取表格行数据
  70. table.Find("tbody tr").Each(func(index int, row *goquery.Selection) {
  71. var rowData []string
  72. row.Find("td").Each(func(index int, td *goquery.Selection) {
  73. rowData = append(rowData, td.Text())
  74. })
  75. rows = append(rows, rowData)
  76. })
  77. // 仅在表头存在时添加到结果中
  78. if len(headers) > 0 {
  79. tableData = append(tableData, TableData{
  80. Headers: headers,
  81. Rows: rows,
  82. })
  83. }
  84. })
  85. })
  86. // 打印提取的数据以进行调试
  87. dataJSON, _ := json.MarshalIndent(tableData, "", " ")
  88. fmt.Printf("Extracted Table Data: %s\n", dataJSON)
  89. // 提取日期信息
  90. var dateText string
  91. err = chromedp.Run(ctx,
  92. chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText),
  93. )
  94. if err != nil {
  95. return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to extract date: %v", err)
  96. }
  97. logs.Info("ProcessingReportProcessor Process() : Extracted Date: %s", dateText)
  98. // 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走
  99. format, err := utils.ConvertTimeFormat(dateText)
  100. if err != nil {
  101. return models.BaseFromLyData{}, err
  102. }
  103. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  104. if err != nil {
  105. return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
  106. }
  107. if len(indexData) > 0 {
  108. logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  109. return models.BaseFromLyData{}, nil
  110. }
  111. // 解析日期并计算当前周数
  112. targetWeek, err := utils.ParseDateAndWeek(dateText)
  113. if err != nil {
  114. return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err)
  115. }
  116. fmt.Printf("Target Week: %s\n", targetWeek)
  117. // 处理提取的表格数据
  118. for _, data := range tableData {
  119. tableHeaders := data.Headers
  120. tableRows := data.Rows
  121. // 查找目标列
  122. columnIdx := -1
  123. for i, header := range tableHeaders {
  124. headerString := extractChinese(header)
  125. if strings.Contains(columnName, headerString) {
  126. columnIdx = i
  127. break
  128. }
  129. }
  130. if columnIdx == -1 {
  131. logs.Error("ProcessingReportProcessor Process() : Column '%s' not found in table", columnName)
  132. continue
  133. }
  134. // 查找本周的列位置
  135. weekIdx := -1
  136. for i, header := range tableHeaders {
  137. if strings.Contains(header, targetWeek) && i > columnIdx {
  138. weekIdx = i
  139. break
  140. }
  141. }
  142. if weekIdx == -1 {
  143. fmt.Printf("Week column '%s' not found in table\n", targetWeek)
  144. continue
  145. }
  146. // 查找目标行
  147. for _, row := range tableRows {
  148. if len(row) > 0 && strings.Contains(row[0], rowName) {
  149. if weekIdx < len(row) {
  150. logs.Info("Value in column '%s' - '%s': %s", columnName, rowName, row[columnIdx])
  151. numFlag := isNumeric(row[columnIdx])
  152. if numFlag {
  153. value, err := strconv.ParseFloat(row[columnIdx], 64)
  154. if err != nil {
  155. logs.Error("ProcessingReportProcessor Process() : Error converting value to float64: %v", err)
  156. return models.BaseFromLyData{}, err
  157. }
  158. // 返回BaseFromLyData对象的数据
  159. baseFromLyData := models.BaseFromLyData{
  160. DataTime: dateText,
  161. Value: value,
  162. }
  163. return baseFromLyData, nil
  164. }
  165. } else {
  166. logs.Error("ProcessingReportProcessor Process() : Column index out of range")
  167. }
  168. }
  169. }
  170. }
  171. // TODO 后面把这个日志打印,不做返回错误处理,一个指标找不到会导致后续指标无法处理
  172. return models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : No matching row found for '%s'", rowName)
  173. }
  174. // 判断字符串是否是数字
  175. func isNumeric(value string) bool {
  176. // 正则表达式匹配整数和浮点数
  177. re := regexp.MustCompile(`^[+-]?(\d+(\.\d*)?|\.\d+)$`)
  178. return re.MatchString(value)
  179. }
  180. // 只保留汉字
  181. func extractChinese(text string) string {
  182. re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符
  183. return re.ReplaceAllString(text, "")
  184. }