processor_business_logic.go 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242
  1. // Package liangyou
  2. // @Author gmy 2024/8/6 10:50:00
  3. package liangyou
  4. import (
  5. "context"
  6. "eta/eta_crawler/models"
  7. "eta/eta_crawler/utils"
  8. "fmt"
  9. "github.com/PuerkitoBio/goquery"
  10. "github.com/beego/beego/v2/core/logs"
  11. "github.com/chromedp/chromedp"
  12. "log"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "unicode"
  17. )
  18. const (
  19. lySourceName = "lysww" // 粮油商务网
  20. )
  21. // TableData 用于存储表格的数据
  22. type TableData struct {
  23. Headers []string `json:"headers"`
  24. Rows [][]string `json:"rows"`
  25. }
  26. // ImportCostProcessor
  27. // @Description: 进口成本处理器
  28. type ImportCostProcessor struct{}
  29. func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  30. logs.Info("Processing import cost...")
  31. // 解析关键字
  32. if len(keywords) < 5 {
  33. return []models.BaseFromLyData{}, fmt.Errorf("ImportCostProcessor Process() : keywords must contain at least 5 elements")
  34. }
  35. // 拿到 行关键字和列关键字
  36. columnName := keywords[len(keywords)-4]
  37. rowVariety := keywords[0]
  38. rowPort := keywords[len(keywords)-3]
  39. indexNamePrefix := keywords[:1]
  40. indexNameSuffix := keywords[1:]
  41. // 提取所有表格数据
  42. tableData := getNoHeadTableData(reportContent)
  43. // 提取日期信息
  44. dateText, err := getDateInfo(ctx)
  45. if err != nil {
  46. return []models.BaseFromLyData{}, err
  47. }
  48. // 时间格式转换
  49. format, err := utils.ConvertTimeFormat(dateText)
  50. if err != nil {
  51. return []models.BaseFromLyData{}, err
  52. }
  53. // 解析日期并计算当前月份
  54. var targetMonths []string
  55. if product == "油菜籽" {
  56. targetMonths, err = utils.ParseDateAndMonthColzaOil(format)
  57. } else {
  58. targetMonths, err = utils.ParseDateAndMonth(dateText)
  59. }
  60. if err != nil {
  61. return []models.BaseFromLyData{}, fmt.Errorf("ImportCostProcessor Process() : Failed to parse date: %v", err)
  62. }
  63. fmt.Printf("Target Month: %s\n", targetMonths)
  64. // 处理提取的表格数据
  65. var result []models.BaseFromLyData
  66. for _, data := range tableData {
  67. tableHeaders := data.Headers
  68. tableRows := data.Rows
  69. // 查找目标列
  70. columnIdx := -1
  71. for i, header := range tableHeaders {
  72. if strings.Contains(header, columnName) {
  73. columnIdx = i
  74. break
  75. }
  76. }
  77. if columnIdx == -1 {
  78. log.Printf("ImportCostProcessor Process() : Column '%s' not found in table", columnName)
  79. continue
  80. }
  81. // 处理表格中的每一行
  82. //var flag bool = true
  83. var previousRowVariety string
  84. var previousRowPort string
  85. for rowIndex, row := range tableRows {
  86. if len(row) == len(tableHeaders) {
  87. previousRowVariety = row[0]
  88. previousRowPort = row[1]
  89. } else if len(row) == len(tableHeaders)-1 {
  90. previousRowPort = row[0]
  91. row = append([]string{previousRowVariety}, row...)
  92. tableRows[rowIndex] = row
  93. } else if len(row) == len(tableHeaders)-2 {
  94. row = append([]string{previousRowVariety, previousRowPort}, row...)
  95. tableRows[rowIndex] = row
  96. }
  97. for _, targetMonth := range targetMonths {
  98. if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort {
  99. if columnIdx < len(row) {
  100. // 指标名称
  101. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  102. indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
  103. // 指标编码
  104. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  105. // 指标id获取
  106. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  107. if err != nil {
  108. logs.Error("ImportCostProcessor Process() : Failed to get index id: %v", err)
  109. continue
  110. }
  111. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  112. if err != nil {
  113. logs.Error("ImportCostProcessor Process() : Failed to get data by index id and date: %v", err)
  114. continue
  115. }
  116. if len(indexData) > 0 {
  117. logs.Info("ImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  118. continue
  119. }
  120. valueStr := row[columnIdx]
  121. value, err := strconv.ParseFloat(valueStr, 64)
  122. if err != nil {
  123. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  124. }
  125. // 创建并添加到结果列表
  126. baseFromLyData := models.BaseFromLyData{
  127. DataTime: format,
  128. Value: value,
  129. BaseFromLyIndexId: indexId,
  130. IndexCode: indexCode,
  131. }
  132. result = append(result, baseFromLyData)
  133. } else {
  134. log.Printf("ImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort)
  135. }
  136. break
  137. }
  138. }
  139. }
  140. }
  141. return result, nil
  142. }
  143. // ProcessingProfitProcessor
  144. // @Description: 加工利润处理器
  145. type ProcessingProfitProcessor struct{}
  146. func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  147. fmt.Println("Processing processing profit...")
  148. // 解析关键字
  149. if len(keywords) < 4 {
  150. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingProfitProcessor Process() : keywords must contain at least 4 elements")
  151. }
  152. // 拿到 行关键字和列关键字
  153. columnName := keywords[1]
  154. rowVariety := keywords[0]
  155. indexNamePrefix := keywords[:1]
  156. indexNameSuffix := keywords[1:]
  157. // 提取所有表格数据
  158. tableData := getNoHeadTableData(reportContent)
  159. // 提取日期信息
  160. dateText, err := getDateInfo(ctx)
  161. if err != nil {
  162. return []models.BaseFromLyData{}, err
  163. }
  164. // 时间格式转换
  165. format, err := utils.ConvertTimeFormat(dateText)
  166. if err != nil {
  167. return []models.BaseFromLyData{}, err
  168. }
  169. // 解析日期并计算当前月份 和 后两月
  170. yearMonths, err := utils.ConvertTimeFormatToYearMonth(format)
  171. if err != nil {
  172. return nil, err
  173. }
  174. fmt.Printf("Target yearMonth: %s\n", yearMonths)
  175. // 处理提取的表格数据
  176. var result []models.BaseFromLyData
  177. for _, data := range tableData {
  178. tableHeaders := data.Headers
  179. tableRows := data.Rows
  180. // 查找目标列
  181. columnIdx := -1
  182. for i, header := range tableHeaders {
  183. if strings.Contains(columnName, header) {
  184. columnIdx = i
  185. break
  186. }
  187. }
  188. if columnIdx == -1 {
  189. log.Printf("ProcessingProfitProcessor Process() : Column '%s' not found in table", columnName)
  190. continue
  191. }
  192. // 处理表格中的每一行
  193. var previousRowVariety string
  194. for rowIndex, row := range tableRows {
  195. if len(row) == len(tableHeaders) {
  196. previousRowVariety = row[0]
  197. } else if len(row) == len(tableHeaders)-1 {
  198. row = append([]string{previousRowVariety}, row...)
  199. tableRows[rowIndex] = row
  200. }
  201. for _, targetMonth := range yearMonths {
  202. if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth {
  203. if columnIdx < len(row) {
  204. // 指标名称
  205. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  206. indexName := strings.Join(indexNameList[:len(keywords)-1], ":")
  207. // 指标编码
  208. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  209. // 指标id获取
  210. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  211. if err != nil {
  212. logs.Error("ProcessingProfitProcessor Process() : Failed to get index id: %v", err)
  213. continue
  214. }
  215. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  216. if err != nil {
  217. logs.Error("ProcessingProfitProcessor Process() : Failed to get data by index id and date: %v", err)
  218. continue
  219. }
  220. if len(indexData) > 0 {
  221. logs.Info("ProcessingProfitProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  222. continue
  223. }
  224. valueStr := row[columnIdx]
  225. value, err := strconv.ParseFloat(valueStr, 64)
  226. if err != nil {
  227. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  228. }
  229. // 创建并添加到结果列表
  230. baseFromLyData := models.BaseFromLyData{
  231. DataTime: format,
  232. Value: value,
  233. BaseFromLyIndexId: indexId,
  234. IndexCode: indexCode,
  235. }
  236. result = append(result, baseFromLyData)
  237. } else {
  238. log.Printf("ProcessingProfitProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  239. }
  240. break
  241. }
  242. }
  243. }
  244. }
  245. return result, nil
  246. }
  247. // ShippingCostProcessor
  248. // @Description: 船运费用处理器
  249. type ShippingCostProcessor struct{}
  250. func (p *ShippingCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  251. fmt.Println("Processing processing profit...")
  252. // 解析关键字
  253. if len(keywords) < 4 {
  254. return []models.BaseFromLyData{}, fmt.Errorf("ShippingCostProcessor Process() : keywords must contain at least 5 elements")
  255. }
  256. // 拿到 行关键字和列关键字
  257. columnName := keywords[len(keywords)-3]
  258. rowVariety := keywords[0]
  259. rowDestination := keywords[1]
  260. rowShipType := keywords[2]
  261. // 提取所有表格数据
  262. tableData := getNoHeadTableData(reportContent)
  263. // 提取日期信息
  264. dateText, err := getDateInfo(ctx)
  265. if err != nil {
  266. return []models.BaseFromLyData{}, err
  267. }
  268. // 时间格式转换
  269. format, err := utils.ConvertTimeFormat(dateText)
  270. if err != nil {
  271. return []models.BaseFromLyData{}, err
  272. }
  273. // 处理提取的表格数据
  274. var result []models.BaseFromLyData
  275. for _, data := range tableData {
  276. tableHeaders := data.Headers
  277. tableRows := data.Rows
  278. // 查找目标列
  279. columnIdx := -1
  280. for i, header := range tableHeaders {
  281. if strings.Contains(header, columnName) {
  282. columnIdx = i
  283. break
  284. }
  285. }
  286. if columnIdx == -1 {
  287. log.Printf("ShippingCostProcessor Process() : Column '%s' not found in table", columnName)
  288. continue
  289. }
  290. // 处理表格中的每一行
  291. for rowIndex, row := range tableRows {
  292. if len(row) == len(tableHeaders)-1 {
  293. row = append([]string{rowVariety}, row...)
  294. tableRows[rowIndex] = row
  295. rowShipType, err = extractValueInParentheses(rowVariety)
  296. if err != nil {
  297. logs.Error("ShippingCostProcessor Process() : Failed to extract value in parentheses: %v", err)
  298. continue
  299. }
  300. }
  301. if len(row) >= len(tableHeaders) && row[0] == rowVariety && (row[1] == rowDestination || strings.Contains(row[0], row[1])) && row[2] == rowShipType {
  302. if columnIdx < len(row) {
  303. // 指标名称
  304. indexName := strings.Join(keywords[:len(keywords)-3], `:`)
  305. // 指标编码
  306. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  307. // 指标id获取
  308. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  309. if err != nil {
  310. logs.Error("ShippingCostProcessor Process() : Failed to get index id: %v", err)
  311. continue
  312. }
  313. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  314. if err != nil {
  315. logs.Error("ShippingCostProcessor Process() : Failed to get data by index id and date: %v", err)
  316. continue
  317. }
  318. if len(indexData) > 0 {
  319. logs.Info("ShippingCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  320. continue
  321. }
  322. valueStr := row[columnIdx]
  323. value, err := strconv.ParseFloat(valueStr, 64)
  324. if err != nil {
  325. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  326. }
  327. // 创建并添加到结果列表
  328. baseFromLyData := models.BaseFromLyData{
  329. DataTime: format,
  330. Value: value,
  331. BaseFromLyIndexId: indexId,
  332. IndexCode: indexCode,
  333. }
  334. result = append(result, baseFromLyData)
  335. } else {
  336. log.Printf("ShippingCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  337. }
  338. break
  339. }
  340. }
  341. }
  342. return result, nil
  343. }
  344. // SupplyDemandBalanceProcessor
  345. // @Description: 供需平衡处理器
  346. type SupplyDemandBalanceProcessor struct{}
  347. func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  348. // https://www.fao.com.cn/art/gG7gKTCNDHLJNsq9QRYjoQ==.htm
  349. logs.Info("Processing processing report...")
  350. // 解析关键字
  351. if len(keywords) < 4 {
  352. return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 4 elements")
  353. }
  354. // 拿到 行关键字和列关键字
  355. var columnName string
  356. rowVariety := keywords[1]
  357. // 提取所有表格数据
  358. tableData := getTableData(reportContent, true)
  359. logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
  360. // 提取日期信息
  361. dateText, err := getDateInfo(ctx)
  362. if err != nil {
  363. return []models.BaseFromLyData{}, err
  364. }
  365. // 时间格式转换
  366. format, err := utils.ConvertTimeFormat(dateText)
  367. if err != nil {
  368. return []models.BaseFromLyData{}, err
  369. }
  370. currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format)
  371. if err != nil {
  372. return nil, err
  373. }
  374. month, err := utils.GetCurrentMonth(format)
  375. if err != nil {
  376. return nil, err
  377. }
  378. monthSuffix := "预估"
  379. logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix)
  380. // 处理提取的表格数据
  381. var result []models.BaseFromLyData
  382. headers := tableData.Headers
  383. rows := tableData.Rows
  384. for _, year := range currentYearAndNextYear {
  385. columnName = year + month + monthSuffix
  386. isCurrentYear, err := utils.IsCurrentYear(year)
  387. if err != nil {
  388. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to determine if year is current year: %v", err)
  389. continue
  390. }
  391. if !isCurrentYear {
  392. format, err = utils.GetNextYearLastDay(format)
  393. if err != nil {
  394. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get next year last day: %v", err)
  395. continue
  396. }
  397. }
  398. // 查找目标列
  399. columnIdx := -1
  400. for i, header := range headers {
  401. if strings.Contains(columnName, header) {
  402. columnIdx = i
  403. break
  404. }
  405. }
  406. if columnIdx == -1 {
  407. logs.Error("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
  408. continue
  409. }
  410. // 处理表格中的每一行
  411. for _, row := range rows {
  412. if len(row) >= len(headers) && row[0] == rowVariety {
  413. if columnIdx < len(row) {
  414. // 指标名称
  415. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  416. // 指标编码
  417. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  418. // 指标id获取
  419. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  420. if err != nil {
  421. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
  422. continue
  423. }
  424. valueStr := row[columnIdx]
  425. value, err := strconv.ParseFloat(valueStr, 64)
  426. if err != nil {
  427. return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  428. }
  429. yearMonth, err := utils.GetYearMonth(format)
  430. if err != nil {
  431. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get year month: %v", err)
  432. continue
  433. }
  434. indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, yearMonth)
  435. if err != nil {
  436. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err)
  437. continue
  438. }
  439. if len(indexData) > 0 {
  440. logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  441. // 存在走更新逻辑 主要更新今年在去年的预估值
  442. indexData := indexData[0]
  443. if indexData.Value != value {
  444. time, err := utils.StringToTime(indexData.ModifyTime)
  445. if err != nil {
  446. return nil, err
  447. }
  448. timeZero, err := utils.StringToTimeZero(format)
  449. if err != nil {
  450. return nil, err
  451. }
  452. if time.Before(timeZero) {
  453. // 更新指标数据
  454. err := models.UpdateLyDataById(indexData.BaseFromLyDataId, value)
  455. if err != nil {
  456. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to update data: %v", err)
  457. continue
  458. }
  459. // 更新指标库数据
  460. edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(indexData.IndexCode, yearMonth)
  461. if err != nil {
  462. return nil, err
  463. }
  464. if len(edbIndexData) > 0 {
  465. err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value)
  466. if err != nil {
  467. return nil, err
  468. }
  469. }
  470. }
  471. }
  472. continue
  473. }
  474. // 创建并添加到结果列表
  475. baseFromLyData := models.BaseFromLyData{
  476. DataTime: format,
  477. Value: value,
  478. BaseFromLyIndexId: indexId,
  479. IndexCode: indexCode,
  480. }
  481. result = append(result, baseFromLyData)
  482. } else {
  483. log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  484. }
  485. break
  486. }
  487. }
  488. }
  489. return result, nil
  490. }
  491. // PurchaseShippingProcessor
  492. // @Description: 采购装船处理器
  493. type PurchaseShippingProcessor struct{}
  494. func (p *PurchaseShippingProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  495. logs.Info("Processing purchase shipping...")
  496. // 解析关键字
  497. if len(keywords) < 3 {
  498. return []models.BaseFromLyData{}, fmt.Errorf("PurchaseShippingProcessor Process() : keywords must contain at least 3 elements")
  499. }
  500. // 拿到 行关键字和列关键字
  501. columnName := keywords[len(keywords)-3]
  502. // 提取所有表格数据
  503. tableData := getPurchaseShippingTableData(reportContent)
  504. logs.Info("PurchaseShippingProcessor Process() : Table data: %v", tableData)
  505. // 提取日期信息
  506. dateText, err := getDateInfo(ctx)
  507. if err != nil {
  508. return []models.BaseFromLyData{}, err
  509. }
  510. // 时间格式转换
  511. format, err := utils.ConvertTimeFormat(dateText)
  512. if err != nil {
  513. return []models.BaseFromLyData{}, err
  514. }
  515. // 处理提取的表格数据
  516. var result []models.BaseFromLyData
  517. headers := tableData.Headers
  518. rows := tableData.Rows
  519. // 查找目标列
  520. columnIdx := -1
  521. for i, header := range headers {
  522. if strings.Contains(columnName, header) {
  523. columnIdx = i
  524. break
  525. }
  526. }
  527. if columnIdx == -1 {
  528. log.Printf("PurchaseShippingProcessor Process() : Column '%s' not found in table", columnName)
  529. } else {
  530. // 处理表格中的每一行
  531. for _, row := range rows {
  532. if len(row) >= len(headers) {
  533. if columnIdx < len(row) {
  534. if !isNumber(row[columnIdx]) {
  535. continue
  536. }
  537. // 指标名称
  538. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  539. // 指标编码
  540. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  541. // 指标id获取
  542. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  543. if err != nil {
  544. logs.Error("PurchaseShippingProcessor Process() : Failed to get index id: %v", err)
  545. continue
  546. }
  547. var yearMonth string
  548. number, err := utils.ConvertMonthToNumber(row[1])
  549. if err != nil {
  550. return nil, err
  551. }
  552. yearMonth = row[0] + "-" + number
  553. isSameMonth, err := utils.IsSameMonth(format, yearMonth)
  554. if err != nil {
  555. return nil, err
  556. }
  557. if isSameMonth {
  558. yearMonth = format
  559. } else {
  560. lastDayOfMonth, err := utils.GetLastDayOfMonth(yearMonth)
  561. if err != nil {
  562. return nil, err
  563. }
  564. yearMonth = lastDayOfMonth
  565. }
  566. valueStr := row[columnIdx]
  567. value, err := strconv.ParseFloat(valueStr, 64)
  568. if err != nil {
  569. return []models.BaseFromLyData{}, fmt.Errorf("PurchaseShippingProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  570. }
  571. month, err := utils.GetYearMonth(yearMonth)
  572. if err != nil {
  573. return nil, err
  574. }
  575. indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, month)
  576. if err != nil {
  577. logs.Error("PurchaseShippingProcessor Process() : Failed to get data by index id and date: %v", err)
  578. continue
  579. }
  580. if len(indexData) > 0 {
  581. if indexData[0].Value != value {
  582. logs.Info("PurchaseShippingProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  583. lyData := indexData[0]
  584. time, err := utils.StringToTime(lyData.ModifyTime)
  585. if err != nil {
  586. return nil, err
  587. }
  588. timeZero, err := utils.StringToTimeZero(format)
  589. if err != nil {
  590. return nil, err
  591. }
  592. if time.Before(timeZero) {
  593. // 更新指标数据
  594. err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
  595. if err != nil {
  596. return nil, err
  597. }
  598. // 同步更新指标库数据 须根据指标编码和日期更新
  599. edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(lyData.IndexCode, month)
  600. if err != nil {
  601. return nil, err
  602. }
  603. if len(edbIndexData) > 0 {
  604. err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value)
  605. if err != nil {
  606. return nil, err
  607. }
  608. }
  609. }
  610. }
  611. continue
  612. }
  613. // 创建并添加到结果列表
  614. baseFromLyData := models.BaseFromLyData{
  615. DataTime: yearMonth,
  616. Value: value,
  617. BaseFromLyIndexId: indexId,
  618. IndexCode: indexCode,
  619. }
  620. result = append(result, baseFromLyData)
  621. continue
  622. } else {
  623. log.Printf("PurchaseShippingProcessor Process() : Column index out of range for row '%s'", columnName)
  624. }
  625. break
  626. }
  627. }
  628. }
  629. return result, nil
  630. }
  631. // ProcessingReportProcessor
  632. // @Description: 加工报告处理器
  633. type ProcessingReportProcessor struct {
  634. }
  635. func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  636. logs.Info("Processing processing report...")
  637. // 解析关键字
  638. if len(keywords) < 3 {
  639. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
  640. }
  641. // 拿到 行关键字和列关键字
  642. columnName := keywords[0]
  643. rowName := keywords[1]
  644. // 提取所有表格数据
  645. tableData := getAllTableData(reportContent)
  646. // 提取日期信息
  647. dateText, err := getDateInfo(ctx)
  648. if err != nil {
  649. return []models.BaseFromLyData{}, err
  650. }
  651. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  652. // 指标编码
  653. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  654. // 指标id获取
  655. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  656. if err != nil {
  657. return nil, err
  658. }
  659. // 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走
  660. format, err := utils.ConvertTimeFormat(dateText)
  661. if err != nil {
  662. return []models.BaseFromLyData{}, err
  663. }
  664. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  665. if err != nil {
  666. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
  667. }
  668. if len(indexData) > 0 {
  669. logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  670. // 不必做更新处理,报告每周刷新,即使本周和上周数据一致,也需要每周记录
  671. return []models.BaseFromLyData{}, nil
  672. }
  673. // 解析日期并计算当前周数
  674. targetWeek, err := utils.ParseDateAndWeek(dateText)
  675. if err != nil {
  676. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err)
  677. }
  678. fmt.Printf("Target Week: %s\n", targetWeek)
  679. var result []models.BaseFromLyData
  680. // 处理提取的表格数据
  681. for _, data := range tableData {
  682. tableHeaders := data.Headers
  683. tableRows := data.Rows
  684. // 查找目标列
  685. columnIdx := -1
  686. for i, header := range tableHeaders {
  687. headerString := extractChinese(header)
  688. if strings.Contains(columnName, headerString) {
  689. // 这个表格不是很好处理,这里写的有些僵硬,后续需要优化
  690. if columnName == "国内大豆开机率" {
  691. i = i + 2
  692. }
  693. columnIdx = i
  694. break
  695. }
  696. }
  697. if columnIdx == -1 {
  698. logs.Error("ProcessingReportProcessor Process() : Column '%s' not found in table", columnName)
  699. continue
  700. }
  701. // 查找本周的列位置
  702. weekIdx := -1
  703. for i, header := range tableHeaders {
  704. if strings.Contains(header, targetWeek) && i > columnIdx {
  705. weekIdx = i
  706. break
  707. }
  708. }
  709. if weekIdx == -1 {
  710. fmt.Printf("Week column '%s' not found in table\n", targetWeek)
  711. continue
  712. }
  713. // 查找目标行
  714. for _, row := range tableRows {
  715. if strings.Contains(row[0], rowName) {
  716. if columnIdx < len(row) {
  717. // 指标名称
  718. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  719. // 指标编码
  720. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  721. // 指标id获取
  722. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  723. if err != nil {
  724. logs.Error("ProcessingReportProcessor Process() : Failed to get index id: %v", err)
  725. continue
  726. }
  727. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  728. if err != nil {
  729. logs.Error("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
  730. continue
  731. }
  732. if len(indexData) > 0 {
  733. logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  734. // 无需走更新逻辑,报告每日更新,即使今天和昨天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  735. continue
  736. }
  737. valueStr := row[columnIdx]
  738. value, err := strconv.ParseFloat(valueStr, 64)
  739. if err != nil {
  740. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  741. }
  742. // 创建并添加到结果列表
  743. baseFromLyData := models.BaseFromLyData{
  744. DataTime: format,
  745. Value: value,
  746. BaseFromLyIndexId: indexId,
  747. IndexCode: indexCode,
  748. }
  749. result = append(result, baseFromLyData)
  750. continue
  751. } else {
  752. log.Printf("ProcessingReportProcessor Process() : Column index out of range for row '%s', '%s'", rowName, columnName)
  753. }
  754. break
  755. }
  756. }
  757. }
  758. return result, nil
  759. }
  760. // InventoryAnalysisProcessor
  761. // @Description: 库存分析处理器
  762. type InventoryAnalysisProcessor struct{}
  763. func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  764. // https://www.fao.com.cn/art/yg1IKj9FpPEIDv2LefnPhQ==.htm
  765. logs.Info("Processing inventory analysis...")
  766. // 解析关键字
  767. if len(keywords) < 4 {
  768. return []models.BaseFromLyData{}, fmt.Errorf("InventoryAnalysisProcessor Process() : keywords must contain at least 4 elements")
  769. }
  770. // 拿到 行关键字和列关键字
  771. columnName := keywords[0]
  772. rowVariety := keywords[1]
  773. columnSuffix := "本周"
  774. columnName = columnName + columnSuffix
  775. // 提取所有表格数据
  776. tableData := getTableData(reportContent, true)
  777. logs.Info("InventoryAnalysisProcessor Process() : Table data: %v", tableData)
  778. // 提取日期信息
  779. dateText, err := getDateInfo(ctx)
  780. if err != nil {
  781. return []models.BaseFromLyData{}, err
  782. }
  783. // 时间格式转换
  784. format, err := utils.ConvertTimeFormat(dateText)
  785. if err != nil {
  786. return []models.BaseFromLyData{}, err
  787. }
  788. // 处理提取的表格数据
  789. var result []models.BaseFromLyData
  790. headers := tableData.Headers
  791. rows := tableData.Rows
  792. // 查找目标列
  793. columnIdx := -1
  794. for i, header := range headers {
  795. header := removeParentheses(header)
  796. if strings.Contains(columnName, header) {
  797. columnIdx = i
  798. break
  799. }
  800. }
  801. if columnIdx == -1 {
  802. logs.Error("InventoryAnalysisProcessor Process() : Column '%s' not found in table", columnName)
  803. } else {
  804. // 处理表格中的每一行
  805. for _, row := range rows {
  806. if len(row) >= len(headers) && strings.Contains(row[0], rowVariety) {
  807. if columnIdx < len(row) {
  808. // 指标名称
  809. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  810. indexName = removeParentheses(indexName)
  811. // 指标编码
  812. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  813. // 指标id获取
  814. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  815. if err != nil {
  816. logs.Error("InventoryAnalysisProcessor Process() : Failed to get index id: %v", err)
  817. continue
  818. }
  819. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  820. if err != nil {
  821. logs.Error("InventoryAnalysisProcessor Process() : Failed to get data by index id and date: %v", err)
  822. continue
  823. }
  824. if len(indexData) > 0 {
  825. logs.Info("InventoryAnalysisProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  826. // 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
  827. continue
  828. }
  829. valueStr := row[columnIdx]
  830. value, err := strconv.ParseFloat(valueStr, 64)
  831. if err != nil {
  832. return []models.BaseFromLyData{}, fmt.Errorf("InventoryAnalysisProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  833. }
  834. // 创建并添加到结果列表
  835. baseFromLyData := models.BaseFromLyData{
  836. DataTime: format,
  837. Value: value,
  838. BaseFromLyIndexId: indexId,
  839. IndexCode: indexCode,
  840. }
  841. result = append(result, baseFromLyData)
  842. continue
  843. } else {
  844. log.Printf("InventoryAnalysisProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  845. }
  846. break
  847. }
  848. }
  849. }
  850. return result, nil
  851. }
  852. // PriceSpreadArbitrageProcessor
  853. // @Description: 价差套利处理器
  854. type PriceSpreadArbitrageProcessor struct{}
  855. func (p *PriceSpreadArbitrageProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  856. fmt.Println("Processing processing profit...")
  857. // 解析关键字
  858. if len(keywords) < 4 {
  859. return []models.BaseFromLyData{}, fmt.Errorf("PriceSpreadArbitrageProcessor Process() : keywords must contain at least 4 elements")
  860. }
  861. // 拿到 行关键字和列关键字
  862. var columnDate string
  863. rowVariety := keywords[0]
  864. rowBourse := keywords[1]
  865. // 提取所有表格数据
  866. tableData := getNoHeadTableData(reportContent)
  867. // 提取日期信息
  868. dateText, err := getDateInfo(ctx)
  869. if err != nil {
  870. return []models.BaseFromLyData{}, err
  871. }
  872. // 时间格式转换
  873. format, err := utils.ConvertTimeFormat(dateText)
  874. if err != nil {
  875. return []models.BaseFromLyData{}, err
  876. }
  877. day, err := utils.ConvertTimeFormatToYearMonthDay(format)
  878. if err != nil {
  879. return nil, err
  880. }
  881. columnDate = day
  882. // 处理提取的表格数据
  883. var result []models.BaseFromLyData
  884. for _, data := range tableData {
  885. tableHeaders := data.Headers
  886. tableRows := data.Rows
  887. // 查找目标列
  888. columnIdx := -1
  889. for i, header := range tableHeaders {
  890. if strings.Contains(header, columnDate) {
  891. columnIdx = i
  892. break
  893. }
  894. }
  895. if columnIdx == -1 {
  896. log.Printf("PriceSpreadArbitrageProcessor Process() : Column '%s' not found in table", columnDate)
  897. continue
  898. }
  899. // 处理表格中的每一行
  900. for _, row := range tableRows {
  901. if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == rowBourse {
  902. if columnIdx < len(row) {
  903. // 指标名称
  904. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  905. // 指标编码
  906. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  907. // 指标id获取
  908. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  909. if err != nil {
  910. logs.Error("PriceSpreadArbitrageProcessor Process() : Failed to get index id: %v", err)
  911. continue
  912. }
  913. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  914. if err != nil {
  915. logs.Error("PriceSpreadArbitrageProcessor Process() : Failed to get data by index id and date: %v", err)
  916. continue
  917. }
  918. if len(indexData) > 0 {
  919. logs.Info("PriceSpreadArbitrageProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  920. // 无需走更新逻辑,报告每天更新,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  921. continue
  922. }
  923. valueStr := row[columnIdx]
  924. value, err := strconv.ParseFloat(valueStr, 64)
  925. if err != nil {
  926. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  927. }
  928. // 创建并添加到结果列表
  929. baseFromLyData := models.BaseFromLyData{
  930. DataTime: format,
  931. Value: value,
  932. BaseFromLyIndexId: indexId,
  933. IndexCode: indexCode,
  934. }
  935. result = append(result, baseFromLyData)
  936. } else {
  937. log.Printf("PriceSpreadArbitrageProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate)
  938. }
  939. break
  940. }
  941. }
  942. }
  943. return result, nil
  944. }
  945. // DailyTransactionProcessor
  946. // @Description: 每日成交处理器
  947. type DailyTransactionProcessor struct{}
  948. func (p *DailyTransactionProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  949. fmt.Println("Processing processing profit...")
  950. // 解析关键字
  951. if len(keywords) < 4 {
  952. return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : keywords must contain at least 4 elements")
  953. }
  954. // 获取第一个表格
  955. areaTableData := getNoHeadTableData(reportContent)[0]
  956. // 获取第二个表格
  957. blocTableData := getTableData(reportContent, false)
  958. logs.Info("DailyTransactionProcessor Process() : areaTableData data: %v, blocTableData data: %v", areaTableData, blocTableData)
  959. // 提取日期信息
  960. dateText, err := getDateInfo(ctx)
  961. if err != nil {
  962. return []models.BaseFromLyData{}, err
  963. }
  964. // 时间格式转换
  965. format, err := utils.ConvertTimeFormat(dateText)
  966. if err != nil {
  967. return []models.BaseFromLyData{}, err
  968. }
  969. // 处理提取的表格数据
  970. var result []models.BaseFromLyData
  971. areaHeaders := areaTableData.Headers
  972. areaRows := areaTableData.Rows
  973. // 第一个表格
  974. // 拿到 行关键字和列关键字
  975. columnArea := keywords[1]
  976. var rowAreaMonthDays []string
  977. rowWeek := "地区总计"
  978. monthDay, err := utils.GetWeekdaysInSameWeek(format)
  979. if err != nil {
  980. return nil, err
  981. }
  982. rowAreaMonthDays = monthDay
  983. // 查找目标列
  984. areaColumnIdx := -1
  985. for i, header := range areaHeaders {
  986. if strings.Contains(header, columnArea) {
  987. areaColumnIdx = i
  988. break
  989. }
  990. }
  991. if areaColumnIdx == -1 {
  992. log.Printf("DailyTransactionProcessor Process() : One Column '%s' not found in table", columnArea)
  993. } else if !strings.Contains(strings.Join(keywords[:len(keywords)-3], ":"), "主要集团") {
  994. for _, row := range areaRows {
  995. if len(row) >= len(areaHeaders) && row[0] == rowWeek {
  996. if areaColumnIdx < len(row) {
  997. // 指标名称
  998. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  999. // 指标编码
  1000. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1001. // 指标id获取
  1002. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1003. if err != nil {
  1004. logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
  1005. continue
  1006. }
  1007. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1008. if err != nil {
  1009. logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
  1010. continue
  1011. }
  1012. if len(indexData) > 0 {
  1013. logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1014. // 无需走更新逻辑,报告每周更新,一周出来一周中每天得数据,即使本周和上周数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  1015. continue
  1016. }
  1017. valueStr := row[areaColumnIdx]
  1018. isChinese := IsChinese(valueStr)
  1019. if isChinese {
  1020. continue
  1021. }
  1022. value, err := strconv.ParseFloat(valueStr, 64)
  1023. if err != nil {
  1024. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1025. }
  1026. // 创建并添加到结果列表
  1027. var dealDate string
  1028. if row[0] == rowWeek {
  1029. dealDate = format
  1030. } else {
  1031. date, err := utils.ConvertToDate(row[0])
  1032. if err != nil {
  1033. return nil, err
  1034. }
  1035. dealDate = date
  1036. }
  1037. baseFromLyData := models.BaseFromLyData{
  1038. DataTime: dealDate,
  1039. Value: value,
  1040. BaseFromLyIndexId: indexId,
  1041. IndexCode: indexCode,
  1042. }
  1043. result = append(result, baseFromLyData)
  1044. } else {
  1045. log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea)
  1046. }
  1047. break
  1048. } else {
  1049. for _, monthDay := range rowAreaMonthDays {
  1050. if len(row) >= len(areaHeaders) && (row[0] == monthDay && !strings.Contains(strings.Join(keywords[:len(keywords)-3], ":"), "周度")) {
  1051. if areaColumnIdx < len(row) {
  1052. // 指标名称
  1053. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  1054. // 指标编码
  1055. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1056. // 指标id获取
  1057. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1058. if err != nil {
  1059. logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
  1060. continue
  1061. }
  1062. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1063. if err != nil {
  1064. logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
  1065. continue
  1066. }
  1067. if len(indexData) > 0 {
  1068. logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1069. // 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
  1070. continue
  1071. }
  1072. valueStr := row[areaColumnIdx]
  1073. isChinese := IsChinese(valueStr)
  1074. if isChinese {
  1075. continue
  1076. }
  1077. value, err := strconv.ParseFloat(valueStr, 64)
  1078. if err != nil {
  1079. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1080. }
  1081. // 创建并添加到结果列表
  1082. var dealDate string
  1083. if row[0] == rowWeek {
  1084. dealDate = format
  1085. } else {
  1086. date, err := utils.ConvertToDate(row[0])
  1087. if err != nil {
  1088. return nil, err
  1089. }
  1090. dealDate = date
  1091. }
  1092. baseFromLyData := models.BaseFromLyData{
  1093. DataTime: dealDate,
  1094. Value: value,
  1095. BaseFromLyIndexId: indexId,
  1096. IndexCode: indexCode,
  1097. }
  1098. result = append(result, baseFromLyData)
  1099. } else {
  1100. log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea)
  1101. }
  1102. break
  1103. }
  1104. }
  1105. }
  1106. }
  1107. }
  1108. // 第二个表格
  1109. // 拿到 行关键字和列关键字
  1110. columnBloc := keywords[len(keywords)-3]
  1111. rowBloc := keywords[1]
  1112. blocHeaders := blocTableData.Headers
  1113. blocRows := blocTableData.Rows
  1114. // 查找目标列
  1115. blocColumnIdx := -1
  1116. for i, header := range blocHeaders {
  1117. if strings.Contains(header, columnBloc) {
  1118. blocColumnIdx = i
  1119. break
  1120. }
  1121. }
  1122. if blocColumnIdx == -1 {
  1123. log.Printf("DailyTransactionProcessor Process() : Two Column '%s' not found in table", columnBloc)
  1124. } else {
  1125. // 处理表格中的每一行
  1126. for _, row := range blocRows {
  1127. if len(row) >= len(blocHeaders) && strings.Contains(row[0], rowBloc) {
  1128. if blocColumnIdx < len(row) {
  1129. // 指标名称
  1130. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  1131. indexName = removeParentheses(indexName)
  1132. // 指标编码
  1133. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1134. // 指标id获取
  1135. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1136. if err != nil {
  1137. logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
  1138. continue
  1139. }
  1140. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1141. if err != nil {
  1142. logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
  1143. continue
  1144. }
  1145. if len(indexData) > 0 {
  1146. logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1147. continue
  1148. }
  1149. valueStr := row[blocColumnIdx]
  1150. value, err := strconv.ParseFloat(valueStr, 64)
  1151. if err != nil {
  1152. return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  1153. }
  1154. // 创建并添加到结果列表
  1155. baseFromLyData := models.BaseFromLyData{
  1156. DataTime: format,
  1157. Value: value,
  1158. BaseFromLyIndexId: indexId,
  1159. IndexCode: indexCode,
  1160. }
  1161. result = append(result, baseFromLyData)
  1162. } else {
  1163. log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", rowBloc, columnBloc)
  1164. }
  1165. break
  1166. }
  1167. }
  1168. }
  1169. return result, nil
  1170. }
  1171. // PalmOilImportCostProcessor 棕榈油进口成本
  1172. type PalmOilImportCostProcessor struct{}
  1173. func (p *PalmOilImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1174. logs.Info("Processing palm oil import cost...")
  1175. // 解析关键字
  1176. if len(keywords) < 5 {
  1177. return []models.BaseFromLyData{}, fmt.Errorf("PalmOilImportCostProcessor Process() : keywords must contain at least 5 elements")
  1178. }
  1179. // 拿到 行关键字和列关键字
  1180. columnName := keywords[len(keywords)-4]
  1181. rowVariety := keywords[0]
  1182. rowPort := keywords[len(keywords)-3]
  1183. indexNamePrefix := keywords[:1]
  1184. indexNameSuffix := keywords[1:]
  1185. // 提取所有表格数据
  1186. tableData := getNoHeadTableData(reportContent)
  1187. // 提取日期信息
  1188. dateText, err := getDateInfo(ctx)
  1189. if err != nil {
  1190. return []models.BaseFromLyData{}, err
  1191. }
  1192. // 时间格式转换
  1193. format, err := utils.ConvertTimeFormat(dateText)
  1194. if err != nil {
  1195. return []models.BaseFromLyData{}, err
  1196. }
  1197. // 解析日期并计算当前月份
  1198. targetMonths, err := utils.GetYearMonthNoYear(format)
  1199. if err != nil {
  1200. return []models.BaseFromLyData{}, fmt.Errorf("PalmOilImportCostProcessor Process() : Failed to parse date: %v", err)
  1201. }
  1202. fmt.Printf("Target Month: %s\n", targetMonths)
  1203. // 处理提取的表格数据
  1204. var result []models.BaseFromLyData
  1205. for _, data := range tableData {
  1206. tableHeaders := data.Headers
  1207. tableRows := data.Rows
  1208. // 查找目标列
  1209. columnIdx := -1
  1210. for i, header := range tableHeaders {
  1211. if strings.Contains(header, columnName) {
  1212. columnIdx = i
  1213. break
  1214. }
  1215. }
  1216. if columnIdx == -1 {
  1217. log.Printf("PalmOilImportCostProcessor Process() : Column '%s' not found in table", columnName)
  1218. continue
  1219. }
  1220. // 处理表格中的每一行
  1221. //var flag bool = true
  1222. var previousRowVariety string
  1223. var previousRowPort string
  1224. var previousRowFob string
  1225. for rowIndex, row := range tableRows {
  1226. if len(row) == len(tableHeaders) {
  1227. previousRowVariety = row[0]
  1228. previousRowPort = row[1]
  1229. previousRowFob = row[2]
  1230. } else if len(row) == len(tableHeaders)-1 {
  1231. previousRowPort = row[0]
  1232. previousRowFob = row[1]
  1233. row = append([]string{previousRowVariety}, row...)
  1234. tableRows[rowIndex] = row
  1235. } else if len(row) == len(tableHeaders)-2 {
  1236. // 这段这里不需要。。。先保留吧
  1237. previousRowFob = row[0]
  1238. row = append([]string{previousRowVariety, previousRowPort}, row...)
  1239. tableRows[rowIndex] = row
  1240. } else if len(row) == len(tableHeaders)-3 {
  1241. row = append([]string{previousRowVariety, previousRowPort, previousRowFob}, row...)
  1242. tableRows[rowIndex] = row
  1243. }
  1244. for _, targetMonth := range targetMonths {
  1245. if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort {
  1246. if columnIdx < len(row) {
  1247. // 指标名称
  1248. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  1249. indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
  1250. // 指标编码
  1251. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1252. // 指标id获取
  1253. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1254. if err != nil {
  1255. logs.Error("PalmOilImportCostProcessor Process() : Failed to get index id: %v", err)
  1256. continue
  1257. }
  1258. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1259. if err != nil {
  1260. logs.Error("PalmOilImportCostProcessor Process() : Failed to get data by index id and date: %v", err)
  1261. continue
  1262. }
  1263. if len(indexData) > 0 {
  1264. logs.Info("PalmOilImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1265. continue
  1266. }
  1267. valueStr := row[columnIdx]
  1268. value, err := strconv.ParseFloat(valueStr, 64)
  1269. if err != nil {
  1270. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1271. }
  1272. // 创建并添加到结果列表
  1273. baseFromLyData := models.BaseFromLyData{
  1274. DataTime: format,
  1275. Value: value,
  1276. BaseFromLyIndexId: indexId,
  1277. IndexCode: indexCode,
  1278. }
  1279. result = append(result, baseFromLyData)
  1280. } else {
  1281. log.Printf("PalmOilImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort)
  1282. }
  1283. break
  1284. }
  1285. }
  1286. }
  1287. }
  1288. return result, nil
  1289. }
  1290. // ImportEstimateProcessor
  1291. // @Description: 进口预估处理器
  1292. type ImportEstimateProcessor struct{}
  1293. func (p *ImportEstimateProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1294. logs.Info("Processing import estimate...")
  1295. // 解析关键字
  1296. if len(keywords) < 4 {
  1297. return []models.BaseFromLyData{}, fmt.Errorf("ImportEstimateProcessor Process() : keywords must contain at least 4 elements")
  1298. }
  1299. // 拿到 行关键字和列关键字
  1300. var columnDates []string
  1301. rowVariety := keywords[1]
  1302. // 提取所有表格数据
  1303. tableData := getNoHeadTableData(reportContent)
  1304. // 提取日期信息
  1305. dateText, err := getDateInfo(ctx)
  1306. if err != nil {
  1307. return []models.BaseFromLyData{}, err
  1308. }
  1309. // 时间格式转换
  1310. format, err := utils.ConvertTimeFormat(dateText)
  1311. if err != nil {
  1312. return []models.BaseFromLyData{}, err
  1313. }
  1314. columnDates, err = utils.GetNextThreeMonthsNoYear(format)
  1315. if err != nil {
  1316. return nil, err
  1317. }
  1318. monthsLastDay, err := utils.GetNextThreeMonthsLastDay(format)
  1319. if err != nil {
  1320. return nil, err
  1321. }
  1322. // 处理提取的表格数据
  1323. var result []models.BaseFromLyData
  1324. for _, data := range tableData {
  1325. tableHeaders := data.Headers
  1326. tableRows := data.Rows
  1327. // 查找目标列
  1328. for _, columnDate := range columnDates {
  1329. columnIdx := -1
  1330. for i, tableHeader := range tableHeaders {
  1331. if strings.Contains(tableHeader, columnDate) {
  1332. columnIdx = i
  1333. break
  1334. }
  1335. }
  1336. if columnIdx == -1 {
  1337. log.Printf("ImportEstimateProcessor Process() : Column '%s' not found in table", columnDate)
  1338. continue
  1339. } else {
  1340. // 处理表格中的每一行
  1341. for _, row := range tableRows {
  1342. if len(row) >= len(tableHeaders) && strings.Contains(row[0], rowVariety) && isNumber(row[columnIdx]) {
  1343. if columnIdx < len(row) {
  1344. // 指标名称
  1345. indexName := strings.Join(keywords[:len(keywords)-2], `:`)
  1346. // 指标编码
  1347. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1348. // 指标id获取
  1349. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1350. if err != nil {
  1351. logs.Error("ImportEstimateProcessor Process() : Failed to get index id: %v", err)
  1352. continue
  1353. }
  1354. toNumber, err := utils.ConvertMonthToNumber(columnDate)
  1355. if err != nil {
  1356. logs.Error("ImportEstimateProcessor Process() : Failed to convert month to number: %v", err)
  1357. continue
  1358. }
  1359. slice, err := utils.GetElementInSlice(monthsLastDay, toNumber)
  1360. if err != nil {
  1361. logs.Error("ImportEstimateProcessor Process() : Failed to get element in slice: %v", err)
  1362. continue
  1363. }
  1364. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, slice)
  1365. if err != nil {
  1366. logs.Error("ImportEstimateProcessor Process() : Failed to get data by index id and date: %v", err)
  1367. continue
  1368. }
  1369. valueStr := row[columnIdx]
  1370. value, err := strconv.ParseFloat(valueStr, 64)
  1371. if err != nil {
  1372. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1373. }
  1374. if len(indexData) > 0 {
  1375. if indexData[0].Value != value {
  1376. logs.Info("ImportEstimateProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1377. lyData := indexData[0]
  1378. time, err := utils.StringToTime(lyData.ModifyTime)
  1379. if err != nil {
  1380. return nil, err
  1381. }
  1382. timeZero, err := utils.StringToTimeZero(format)
  1383. if err != nil {
  1384. return nil, err
  1385. }
  1386. if lyData.Value != value && time.Before(timeZero) {
  1387. // 更新指标数据
  1388. err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
  1389. if err != nil {
  1390. return nil, err
  1391. }
  1392. // 同步更新指标库数据
  1393. lyEdbIndexData, err := models.GetLyEdbDataByIndexCodeAndExactDataTime(lyData.IndexCode, lyData.DataTime)
  1394. if err != nil {
  1395. return nil, err
  1396. }
  1397. if len(lyEdbIndexData) > 0 {
  1398. err := models.UpdateLyEdbDataById(lyEdbIndexData[0].EdbInfoId, value)
  1399. if err != nil {
  1400. return nil, err
  1401. }
  1402. }
  1403. }
  1404. }
  1405. continue
  1406. }
  1407. // 创建并添加到结果列表
  1408. baseFromLyData := models.BaseFromLyData{
  1409. DataTime: slice,
  1410. Value: value,
  1411. BaseFromLyIndexId: indexId,
  1412. IndexCode: indexCode,
  1413. }
  1414. result = append(result, baseFromLyData)
  1415. } else {
  1416. log.Printf("ImportEstimateProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate)
  1417. }
  1418. break
  1419. }
  1420. }
  1421. }
  1422. }
  1423. }
  1424. return result, nil
  1425. }
  1426. // InternationalPriceProcessor
  1427. // @Description: 国际价格处理器
  1428. type InternationalPriceProcessor struct{}
  1429. func (p *InternationalPriceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1430. logs.Info("Processing international price...")
  1431. // 解析关键字
  1432. if len(keywords) < 4 {
  1433. return []models.BaseFromLyData{}, fmt.Errorf("InternationalPriceProcessor Process() : keywords must contain at least 4 elements")
  1434. }
  1435. // 拿到 行关键字和列关键字
  1436. columnName := keywords[1]
  1437. rowVariety := keywords[0]
  1438. indexNamePrefix := keywords[:1]
  1439. indexNameSuffix := keywords[1:]
  1440. // 提取所有表格数据
  1441. tableData := getNoHeadTableData(reportContent)
  1442. // 提取日期信息
  1443. dateText, err := getDateInfo(ctx)
  1444. if err != nil {
  1445. return []models.BaseFromLyData{}, err
  1446. }
  1447. // 时间格式转换
  1448. format, err := utils.ConvertTimeFormat(dateText)
  1449. if err != nil {
  1450. return []models.BaseFromLyData{}, err
  1451. }
  1452. // 解析日期并计算当前月份 和 后两月
  1453. yearMonths, err := utils.ConvertTimeFormatToYearMonth(format)
  1454. if err != nil {
  1455. return nil, err
  1456. }
  1457. fmt.Printf("Target yearMonth: %s\n", yearMonths)
  1458. // 处理提取的表格数据
  1459. var result []models.BaseFromLyData
  1460. for _, data := range tableData {
  1461. tableHeaders := data.Headers
  1462. tableRows := data.Rows
  1463. // 查找目标列
  1464. columnIdx := -1
  1465. for i, header := range tableHeaders {
  1466. if strings.Contains(header, columnName) {
  1467. columnIdx = i
  1468. break
  1469. }
  1470. }
  1471. if columnIdx == -1 {
  1472. log.Printf("InternationalPriceProcessor Process() : Column '%s' not found in table", columnName)
  1473. continue
  1474. }
  1475. // 处理表格中的每一行
  1476. var previousRowVariety string
  1477. for rowIndex, row := range tableRows {
  1478. if len(row) == len(tableHeaders) {
  1479. previousRowVariety = row[0]
  1480. } else if len(row) == len(tableHeaders)-1 {
  1481. row = append([]string{previousRowVariety}, row...)
  1482. tableRows[rowIndex] = row
  1483. }
  1484. for _, targetMonth := range yearMonths {
  1485. if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth {
  1486. if columnIdx < len(row) {
  1487. // 指标名称
  1488. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  1489. indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
  1490. // 指标编码
  1491. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1492. // 指标id获取
  1493. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1494. if err != nil {
  1495. logs.Error("InternationalPriceProcessor Process() : Failed to get index id: %v", err)
  1496. continue
  1497. }
  1498. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1499. if err != nil {
  1500. logs.Error("InternationalPriceProcessor Process() : Failed to get data by index id and date: %v", err)
  1501. continue
  1502. }
  1503. if len(indexData) > 0 {
  1504. logs.Info("InternationalPriceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1505. // 无需更新 指标展示本月和后两月的数据,报告每天更新,每天的值可能会改变,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  1506. continue
  1507. }
  1508. valueStr := row[columnIdx]
  1509. value, err := strconv.ParseFloat(valueStr, 64)
  1510. if err != nil {
  1511. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1512. }
  1513. // 创建并添加到结果列表
  1514. baseFromLyData := models.BaseFromLyData{
  1515. DataTime: format,
  1516. Value: value,
  1517. BaseFromLyIndexId: indexId,
  1518. IndexCode: indexCode,
  1519. }
  1520. result = append(result, baseFromLyData)
  1521. } else {
  1522. log.Printf("InternationalPriceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  1523. }
  1524. break
  1525. }
  1526. }
  1527. }
  1528. }
  1529. return result, nil
  1530. }
  1531. // CanadaStatisticsBureauProcessor
  1532. // @Description: 加拿大统计局处理器
  1533. type CanadaStatisticsBureauProcessor struct{}
  1534. func (p *CanadaStatisticsBureauProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1535. logs.Info("Processing Canada statistics bureau...")
  1536. // 解析关键字
  1537. if len(keywords) < 4 {
  1538. return []models.BaseFromLyData{}, fmt.Errorf("CanadaStatisticsBureauProcessor Process() : keywords must contain at least 4 elements")
  1539. }
  1540. // 拿到 行关键字和列关键字
  1541. columnDate := "本周"
  1542. rowVariety := keywords[1]
  1543. // 提取所有表格数据
  1544. tableData := getNoHeadTableData(reportContent)
  1545. // 提取日期信息
  1546. dateText, err := getDateInfo(ctx)
  1547. if err != nil {
  1548. return []models.BaseFromLyData{}, err
  1549. }
  1550. // 时间格式转换
  1551. format, err := utils.ConvertTimeFormat(dateText)
  1552. if err != nil {
  1553. return []models.BaseFromLyData{}, err
  1554. }
  1555. // 处理提取的表格数据
  1556. var result []models.BaseFromLyData
  1557. for _, data := range tableData {
  1558. tableHeaders := data.Headers
  1559. tableRows := data.Rows
  1560. // 查找目标列
  1561. columnIdx := -1
  1562. for i, header := range tableHeaders {
  1563. if strings.Contains(header, columnDate) {
  1564. columnIdx = i
  1565. break
  1566. }
  1567. }
  1568. if columnIdx == -1 {
  1569. log.Printf("CanadaStatisticsBureauProcessor Process() : Column '%s' not found in table", columnDate)
  1570. continue
  1571. }
  1572. // 处理表格中的每一行
  1573. for _, row := range tableRows {
  1574. if len(row) >= len(tableHeaders) {
  1575. if columnIdx < len(row) {
  1576. if row[0] != rowVariety {
  1577. continue
  1578. }
  1579. // 指标名称
  1580. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  1581. // 指标编码
  1582. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1583. // 指标id获取
  1584. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1585. if err != nil {
  1586. logs.Error("CanadaStatisticsBureauProcessor Process() : Failed to get index id: %v", err)
  1587. continue
  1588. }
  1589. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1590. if err != nil {
  1591. logs.Error("CanadaStatisticsBureauProcessor Process() : Failed to get data by index id and date: %v", err)
  1592. continue
  1593. }
  1594. if len(indexData) > 0 {
  1595. logs.Info("CanadaStatisticsBureauProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1596. // 无需更新 指标展示本周的数据,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
  1597. continue
  1598. }
  1599. valueStr := row[columnIdx]
  1600. value, err := strconv.ParseFloat(valueStr, 64)
  1601. if err != nil {
  1602. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1603. }
  1604. // 创建并添加到结果列表
  1605. baseFromLyData := models.BaseFromLyData{
  1606. DataTime: format,
  1607. Value: value,
  1608. BaseFromLyIndexId: indexId,
  1609. IndexCode: indexCode,
  1610. }
  1611. result = append(result, baseFromLyData)
  1612. } else {
  1613. log.Printf("CanadaStatisticsBureauProcessor Process() : Column index out of range for row '%s'", columnDate)
  1614. }
  1615. break
  1616. }
  1617. }
  1618. }
  1619. return result, nil
  1620. }
  1621. // ImportExportAnalysisProcessor
  1622. // @Description: 进出口分析处理器
  1623. type ImportExportAnalysisProcessor struct{}
  1624. func (p *ImportExportAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1625. fmt.Println("Processing processing profit...")
  1626. // 解析关键字
  1627. if len(keywords) < 3 {
  1628. return []models.BaseFromLyData{}, fmt.Errorf("ImportExportAnalysisProcessor Process() : keywords must contain at least 3 elements")
  1629. }
  1630. // 拿到 行关键字和列关键字
  1631. var columnDates []string
  1632. // 提取所有表格数据
  1633. tableData := getNoHeadTableData(reportContent)
  1634. // 提取日期信息
  1635. dateText, err := getDateInfo(ctx)
  1636. if err != nil {
  1637. return []models.BaseFromLyData{}, err
  1638. }
  1639. // 时间格式转换
  1640. format, err := utils.ConvertTimeFormat(dateText)
  1641. if err != nil {
  1642. return []models.BaseFromLyData{}, err
  1643. }
  1644. // 2025年1月可能才出2024年12月的数据,所以往前取一年
  1645. columnDates, err = utils.GetCurrentYearAndLastYear(format)
  1646. if err != nil {
  1647. return nil, err
  1648. }
  1649. // 处理提取的表格数据
  1650. var result []models.BaseFromLyData
  1651. for _, data := range tableData {
  1652. tableHeaders := data.Headers
  1653. tableRows := data.Rows
  1654. for _, columnDate := range columnDates {
  1655. // 查找目标列
  1656. columnIdx := -1
  1657. for i, header := range tableHeaders {
  1658. if strings.Contains(header, columnDate) {
  1659. columnIdx = i
  1660. break
  1661. }
  1662. }
  1663. if columnIdx == -1 {
  1664. log.Printf("ImportExportAnalysisProcessor Process() : Column '%s' not found in table", columnDate)
  1665. continue
  1666. }
  1667. // 处理表格中的每一行
  1668. for _, row := range tableRows {
  1669. if len(row) >= len(tableHeaders) {
  1670. if columnIdx < len(row) && isNumber(row[columnIdx]) && isNumber(row[0]) {
  1671. // 指标名称
  1672. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  1673. // 指标编码
  1674. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1675. // 指标id获取
  1676. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1677. if err != nil {
  1678. logs.Error("ImportExportAnalysisProcessor Process() : Failed to get index id: %v", err)
  1679. continue
  1680. }
  1681. atoi, err := strconv.Atoi(row[0])
  1682. if err != nil {
  1683. return nil, err
  1684. }
  1685. date := columnDate[:4] + "-" + fmt.Sprintf("%02d", atoi)
  1686. lastDayOfMonth, err := utils.GetLastDayOfMonth(date)
  1687. if err != nil {
  1688. return nil, err
  1689. }
  1690. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, lastDayOfMonth)
  1691. if err != nil {
  1692. logs.Error("ImportExportAnalysisProcessor Process() : Failed to get data by index id and date: %v", err)
  1693. continue
  1694. }
  1695. if len(indexData) > 0 {
  1696. logs.Info("ImportExportAnalysisProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1697. continue
  1698. }
  1699. valueStr := row[columnIdx]
  1700. value, err := strconv.ParseFloat(valueStr, 64)
  1701. if err != nil {
  1702. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1703. }
  1704. // 创建并添加到结果列表
  1705. baseFromLyData := models.BaseFromLyData{
  1706. DataTime: lastDayOfMonth,
  1707. Value: value,
  1708. BaseFromLyIndexId: indexId,
  1709. IndexCode: indexCode,
  1710. }
  1711. result = append(result, baseFromLyData)
  1712. continue
  1713. } else {
  1714. log.Printf("ImportExportAnalysisProcessor Process() : Column index out of range for row '%s'", columnDate)
  1715. }
  1716. break
  1717. }
  1718. }
  1719. }
  1720. }
  1721. return result, nil
  1722. }
  1723. // ExtractValueInParentheses 从字符串中提取括号中的值
  1724. func extractValueInParentheses(input string) (string, error) {
  1725. re := regexp.MustCompile(`(([^)]+))`)
  1726. matches := re.FindStringSubmatch(input)
  1727. if len(matches) > 1 {
  1728. return matches[1], nil
  1729. }
  1730. return "", fmt.Errorf("no value found in parentheses")
  1731. }
  1732. // 获取指标id,根据指标名称判断,没有插入指标生成返回
  1733. func getIndexId(indexCode string, indexName string, classifyId int, lySourceName string, frequency string, unit string) (int, error) {
  1734. // 判断指标是否存在
  1735. var indexId int
  1736. indexInfo, err := models.GetLyIndexByCode(indexCode)
  1737. if err != nil {
  1738. // 新增指标
  1739. index, err := addLyIndex(classifyId, indexCode, indexName, frequency, unit)
  1740. if err != nil {
  1741. return 0, err
  1742. }
  1743. indexId = index
  1744. } else {
  1745. indexId = indexInfo.BaseFromLyIndexId
  1746. }
  1747. return indexId, nil
  1748. }
  1749. // 获取页面时间信息
  1750. func getDateInfo(ctx context.Context) (string, error) {
  1751. var dateText string
  1752. err := chromedp.Run(ctx,
  1753. chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText),
  1754. )
  1755. if err != nil {
  1756. return "", fmt.Errorf("processing Process() : Failed to extract report date: %v", err)
  1757. }
  1758. logs.Info("Processing Process() : Report Extracted Date: %s", dateText)
  1759. return dateText, nil
  1760. }
  1761. // 获取所有表格数据 获取表格中有thead标签的数据
  1762. func getAllTableData(reportContent string) []TableData {
  1763. var tableData []TableData
  1764. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1765. if err != nil {
  1766. log.Fatal(err)
  1767. }
  1768. // 选择 id 为 "a_content" 的 div
  1769. doc.Find("#a_content").Each(func(index int, item *goquery.Selection) {
  1770. item.Find("table").Each(func(index int, table *goquery.Selection) {
  1771. var headers []string
  1772. var rows [][]string
  1773. // 提取表头
  1774. table.Find("thead th").Each(func(index int, th *goquery.Selection) {
  1775. headers = append(headers, th.Text())
  1776. })
  1777. // 提取表格行数据
  1778. table.Find("tbody tr").Each(func(index int, row *goquery.Selection) {
  1779. var rowData []string
  1780. row.Find("td").Each(func(index int, td *goquery.Selection) {
  1781. rowData = append(rowData, td.Text())
  1782. })
  1783. rows = append(rows, rowData)
  1784. })
  1785. // 仅在表头存在时添加到结果中
  1786. if len(headers) > 0 {
  1787. tableData = append(tableData, TableData{
  1788. Headers: headers,
  1789. Rows: rows,
  1790. })
  1791. }
  1792. })
  1793. })
  1794. return tableData
  1795. }
  1796. // 获取无头表格数据
  1797. func getNoHeadTableData(reportContent string) []TableData {
  1798. var tableData []TableData
  1799. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1800. if err != nil {
  1801. log.Fatal(err)
  1802. }
  1803. // Find the div with id "a_content"
  1804. doc.Find("#a_content").Each(func(index int, div *goquery.Selection) {
  1805. // Find all tables within the div
  1806. div.Find("table").Each(func(index int, table *goquery.Selection) {
  1807. var headers []string
  1808. var rows [][]string
  1809. // Extract table headers if any
  1810. table.Find("tr").Each(func(index int, tr *goquery.Selection) {
  1811. var rowData []string
  1812. tr.Find("td, th").Each(func(index int, cell *goquery.Selection) {
  1813. cellText := cell.Text()
  1814. rowData = append(rowData, cellText)
  1815. })
  1816. if index == 0 && len(rowData) > 0 {
  1817. // The first row is treated as the header row
  1818. headers = rowData
  1819. } else if len(rowData) > 0 {
  1820. // Add the row data to the rows slice
  1821. rows = append(rows, rowData)
  1822. }
  1823. })
  1824. // Only add table data if headers are present
  1825. if len(headers) > 0 {
  1826. tableData = append(tableData, TableData{
  1827. Headers: headers,
  1828. Rows: rows,
  1829. })
  1830. }
  1831. })
  1832. })
  1833. return tableData
  1834. }
  1835. // 获取表格数据 获取id 为 a_content 的 div 中的第一个表格 左上角那个单元格会拼在第一个,会拼上列上的合并单元格
  1836. func getTableData(reportContent string, isFirst bool) TableData {
  1837. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1838. if err != nil {
  1839. log.Fatal(err)
  1840. }
  1841. tableData := &TableData{}
  1842. // 只提取 id 为 a_content 的 div 中的第一个表格
  1843. var firstTable *goquery.Selection
  1844. if isFirst {
  1845. firstTable = doc.Find("div#a_content table").First()
  1846. } else {
  1847. firstTable = doc.Find("div#a_content table").Last()
  1848. }
  1849. var combinedHeaders []string
  1850. // 提取表头
  1851. firstTable.Find("tr").Each(func(i int, row *goquery.Selection) {
  1852. if i == 0 {
  1853. // 第一行处理合并单元格,保存到 combinedHeaders
  1854. row.Find("td,th").Each(func(j int, cell *goquery.Selection) {
  1855. if j == 0 {
  1856. // 把左上角的“年度(10/9月)”放入 Headers 第一个元素
  1857. tableData.Headers = append(tableData.Headers, strings.TrimSpace(cell.Text()))
  1858. } else {
  1859. // 处理其他单元格
  1860. colspan, exists := cell.Attr("colspan")
  1861. if exists {
  1862. spanCount := 0
  1863. fmt.Sscanf(colspan, "%d", &spanCount)
  1864. for k := 0; k < spanCount; k++ {
  1865. combinedHeaders = append(combinedHeaders, strings.TrimSpace(cell.Text()))
  1866. }
  1867. } else {
  1868. combinedHeaders = append(combinedHeaders, strings.TrimSpace(cell.Text()))
  1869. }
  1870. }
  1871. })
  1872. } else if i == 1 {
  1873. // 第二行处理具体标题,组合后保存到 Headers
  1874. row.Find("td,th").Each(func(j int, cell *goquery.Selection) {
  1875. if j < len(combinedHeaders) {
  1876. fullHeader := combinedHeaders[j] + strings.TrimSpace(cell.Text())
  1877. tableData.Headers = append(tableData.Headers, fullHeader)
  1878. }
  1879. })
  1880. } else {
  1881. // 处理数据行
  1882. var rowData []string
  1883. row.Find("td").Each(func(j int, cell *goquery.Selection) {
  1884. rowData = append(rowData, strings.TrimSpace(cell.Text()))
  1885. })
  1886. if len(rowData) > 0 {
  1887. tableData.Rows = append(tableData.Rows, rowData)
  1888. }
  1889. }
  1890. })
  1891. return *tableData
  1892. }
  1893. // 获取采购装船表格数据
  1894. func getPurchaseShippingTableData(reportContent string) TableData {
  1895. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1896. if err != nil {
  1897. log.Fatal(err)
  1898. }
  1899. var tableData TableData
  1900. // 只提取 id 为 a_content 的 div 中的第一个表格
  1901. firstTable := doc.Find("div#a_content table").First()
  1902. // 提取表头
  1903. var headers []string
  1904. var subHeaders []string
  1905. firstTable.Find("thead tr").Each(func(i int, row *goquery.Selection) {
  1906. row.Find("th").Each(func(j int, cell *goquery.Selection) {
  1907. headerText := strings.TrimSpace(cell.Text())
  1908. if i == 0 {
  1909. // 处理第一行表头
  1910. colspan, exists := cell.Attr("colspan")
  1911. if exists {
  1912. spanCount := 0
  1913. fmt.Sscanf(colspan, "%d", &spanCount)
  1914. for k := 0; k < spanCount; k++ {
  1915. headers = append(headers, headerText)
  1916. }
  1917. } else {
  1918. headers = append(headers, headerText)
  1919. }
  1920. } else if i == 1 {
  1921. // 处理第二行表头
  1922. subHeaders = append(subHeaders, headerText)
  1923. }
  1924. })
  1925. })
  1926. // 合并第一行和第二行表头信息
  1927. if len(subHeaders) > 0 {
  1928. for i := 0; i < len(subHeaders); i++ {
  1929. // 从第四个单元格开始拼接
  1930. headers[3+i] = headers[3+i] + subHeaders[i]
  1931. }
  1932. }
  1933. tableData.Headers = headers
  1934. // 处理数据行
  1935. firstTable.Find("tbody tr").Each(func(i int, row *goquery.Selection) {
  1936. var rowData []string
  1937. row.Find("td").Each(func(j int, cell *goquery.Selection) {
  1938. rowData = append(rowData, strings.TrimSpace(cell.Text()))
  1939. })
  1940. if len(rowData) > 0 {
  1941. tableData.Rows = append(tableData.Rows, rowData)
  1942. }
  1943. })
  1944. return tableData
  1945. }
  1946. // 判断字符串是否是数字
  1947. func isNumeric(value string) bool {
  1948. // 正则表达式匹配整数和浮点数
  1949. re := regexp.MustCompile(`^[+-]?(\d+(\.\d*)?|\.\d+)$`)
  1950. return re.MatchString(value)
  1951. }
  1952. // 只保留汉字
  1953. func extractChinese(text string) string {
  1954. re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符
  1955. return re.ReplaceAllString(text, "")
  1956. }
  1957. // 去除括号中的内容 包含括号 ()
  1958. func removeParentheses(text string) string {
  1959. re := regexp.MustCompile(`\([^)]*\)`)
  1960. return re.ReplaceAllString(text, "")
  1961. }
  1962. // IsChinese 判断传入的是否是汉字
  1963. func IsChinese(str string) bool {
  1964. for _, r := range str {
  1965. if unicode.Is(unicode.Han, r) {
  1966. return true
  1967. }
  1968. }
  1969. return false
  1970. }
  1971. // 判断是否是数字
  1972. func isNumber(str string) bool {
  1973. _, err := strconv.ParseFloat(str, 64)
  1974. return err == nil
  1975. }