processor_business_logic.go 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257
  1. // Package liangyou
  2. // @Author gmy 2024/8/6 10:50:00
  3. package liangyou
  4. import (
  5. "context"
  6. "eta/eta_crawler/models"
  7. "eta/eta_crawler/utils"
  8. "fmt"
  9. "github.com/PuerkitoBio/goquery"
  10. "github.com/beego/beego/v2/core/logs"
  11. "github.com/chromedp/chromedp"
  12. "log"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "unicode"
  17. )
  18. var (
  19. lySourceName = "lysww" // 粮油商务网
  20. )
  21. // TableData 用于存储表格的数据
  22. type TableData struct {
  23. Headers []string `json:"headers"`
  24. Rows [][]string `json:"rows"`
  25. }
  26. // ImportCostProcessor
  27. // @Description: 进口成本处理器
  28. type ImportCostProcessor struct{}
  29. func (p *ImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  30. logs.Info("Processing import cost...")
  31. // 解析关键字
  32. if len(keywords) < 5 {
  33. return []models.BaseFromLyData{}, fmt.Errorf("ImportCostProcessor Process() : keywords must contain at least 5 elements")
  34. }
  35. // 拿到 行关键字和列关键字
  36. columnName := keywords[len(keywords)-4]
  37. rowVariety := keywords[0]
  38. rowPort := keywords[len(keywords)-3]
  39. indexNamePrefix := keywords[:1]
  40. indexNameSuffix := keywords[1:]
  41. // 提取所有表格数据
  42. tableData := getNoHeadTableData(reportContent)
  43. // 提取日期信息
  44. dateText, err := getDateInfo(ctx)
  45. if err != nil {
  46. return []models.BaseFromLyData{}, err
  47. }
  48. // 时间格式转换
  49. format, err := utils.ConvertTimeFormat(dateText)
  50. if err != nil {
  51. return []models.BaseFromLyData{}, err
  52. }
  53. // 解析日期并计算当前月份
  54. var targetMonths []string
  55. if product == "油菜籽" {
  56. targetMonths, err = utils.ParseDateAndMonthColzaOil(format)
  57. } else {
  58. targetMonths, err = utils.ParseDateAndMonth(dateText)
  59. }
  60. if err != nil {
  61. return []models.BaseFromLyData{}, fmt.Errorf("ImportCostProcessor Process() : Failed to parse date: %v", err)
  62. }
  63. fmt.Printf("Target Month: %s\n", targetMonths)
  64. // 处理提取的表格数据
  65. var result []models.BaseFromLyData
  66. for _, data := range tableData {
  67. tableHeaders := data.Headers
  68. tableRows := data.Rows
  69. // 查找目标列
  70. columnIdx := -1
  71. for i, header := range tableHeaders {
  72. if strings.Contains(header, columnName) {
  73. columnIdx = i
  74. break
  75. }
  76. }
  77. if columnIdx == -1 {
  78. log.Printf("ImportCostProcessor Process() : Column '%s' not found in table", columnName)
  79. continue
  80. }
  81. // 处理表格中的每一行
  82. //var flag bool = true
  83. var previousRowVariety string
  84. var previousRowPort string
  85. for rowIndex, row := range tableRows {
  86. if len(row) == len(tableHeaders) {
  87. previousRowVariety = row[0]
  88. previousRowPort = row[1]
  89. } else if len(row) == len(tableHeaders)-1 {
  90. previousRowPort = row[0]
  91. row = append([]string{previousRowVariety}, row...)
  92. tableRows[rowIndex] = row
  93. } else if len(row) == len(tableHeaders)-2 {
  94. row = append([]string{previousRowVariety, previousRowPort}, row...)
  95. tableRows[rowIndex] = row
  96. }
  97. for _, targetMonth := range targetMonths {
  98. if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort {
  99. if columnIdx < len(row) {
  100. // 指标名称
  101. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  102. indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
  103. // 指标编码
  104. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  105. // 指标id获取
  106. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  107. if err != nil {
  108. logs.Error("ImportCostProcessor Process() : Failed to get index id: %v", err)
  109. continue
  110. }
  111. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  112. if err != nil {
  113. logs.Error("ImportCostProcessor Process() : Failed to get data by index id and date: %v", err)
  114. continue
  115. }
  116. if len(indexData) > 0 {
  117. logs.Info("ImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  118. continue
  119. }
  120. valueStr := row[columnIdx]
  121. value, err := strconv.ParseFloat(valueStr, 64)
  122. if err != nil {
  123. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  124. }
  125. // 创建并添加到结果列表
  126. baseFromLyData := models.BaseFromLyData{
  127. DataTime: format,
  128. Value: value,
  129. BaseFromLyIndexId: indexId,
  130. IndexCode: indexCode,
  131. }
  132. result = append(result, baseFromLyData)
  133. } else {
  134. log.Printf("ImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort)
  135. }
  136. break
  137. }
  138. }
  139. }
  140. }
  141. return result, nil
  142. }
  143. // ProcessingProfitProcessor
  144. // @Description: 加工利润处理器
  145. type ProcessingProfitProcessor struct{}
  146. func (p *ProcessingProfitProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  147. fmt.Println("Processing processing profit...")
  148. // 解析关键字
  149. if len(keywords) < 4 {
  150. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingProfitProcessor Process() : keywords must contain at least 4 elements")
  151. }
  152. // 拿到 行关键字和列关键字
  153. columnName := keywords[1]
  154. rowVariety := keywords[0]
  155. indexNamePrefix := keywords[:1]
  156. indexNameSuffix := keywords[1:]
  157. // 提取所有表格数据
  158. tableData := getNoHeadTableData(reportContent)
  159. // 提取日期信息
  160. dateText, err := getDateInfo(ctx)
  161. if err != nil {
  162. return []models.BaseFromLyData{}, err
  163. }
  164. // 时间格式转换
  165. format, err := utils.ConvertTimeFormat(dateText)
  166. if err != nil {
  167. return []models.BaseFromLyData{}, err
  168. }
  169. // 解析日期并计算当前月份 和 后两月
  170. yearMonths, err := utils.ConvertTimeFormatToYearMonth(format)
  171. if err != nil {
  172. return nil, err
  173. }
  174. fmt.Printf("Target yearMonth: %s\n", yearMonths)
  175. // 处理提取的表格数据
  176. var result []models.BaseFromLyData
  177. for _, data := range tableData {
  178. tableHeaders := data.Headers
  179. tableRows := data.Rows
  180. // 查找目标列
  181. columnIdx := -1
  182. for i, header := range tableHeaders {
  183. if strings.Contains(columnName, header) {
  184. columnIdx = i
  185. break
  186. }
  187. }
  188. if columnIdx == -1 {
  189. log.Printf("ProcessingProfitProcessor Process() : Column '%s' not found in table", columnName)
  190. continue
  191. }
  192. // 处理表格中的每一行
  193. var previousRowVariety string
  194. for rowIndex, row := range tableRows {
  195. if len(row) == len(tableHeaders) {
  196. previousRowVariety = row[0]
  197. } else if len(row) == len(tableHeaders)-1 {
  198. row = append([]string{previousRowVariety}, row...)
  199. tableRows[rowIndex] = row
  200. }
  201. for _, targetMonth := range yearMonths {
  202. if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth {
  203. if columnIdx < len(row) {
  204. // 指标名称
  205. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  206. indexName := strings.Join(indexNameList[:len(keywords)-1], ":")
  207. // 指标编码
  208. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  209. // 指标id获取
  210. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  211. if err != nil {
  212. logs.Error("ProcessingProfitProcessor Process() : Failed to get index id: %v", err)
  213. continue
  214. }
  215. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  216. if err != nil {
  217. logs.Error("ProcessingProfitProcessor Process() : Failed to get data by index id and date: %v", err)
  218. continue
  219. }
  220. if len(indexData) > 0 {
  221. logs.Info("ProcessingProfitProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  222. continue
  223. }
  224. valueStr := row[columnIdx]
  225. value, err := strconv.ParseFloat(valueStr, 64)
  226. if err != nil {
  227. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  228. }
  229. // 创建并添加到结果列表
  230. baseFromLyData := models.BaseFromLyData{
  231. DataTime: format,
  232. Value: value,
  233. BaseFromLyIndexId: indexId,
  234. IndexCode: indexCode,
  235. }
  236. result = append(result, baseFromLyData)
  237. } else {
  238. log.Printf("ProcessingProfitProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  239. }
  240. break
  241. }
  242. }
  243. }
  244. }
  245. return result, nil
  246. }
  247. // ShippingCostProcessor
  248. // @Description: 船运费用处理器
  249. type ShippingCostProcessor struct{}
  250. func (p *ShippingCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  251. fmt.Println("Processing processing profit...")
  252. // 解析关键字
  253. if len(keywords) < 4 {
  254. return []models.BaseFromLyData{}, fmt.Errorf("ShippingCostProcessor Process() : keywords must contain at least 5 elements")
  255. }
  256. // 拿到 行关键字和列关键字
  257. columnName := keywords[len(keywords)-3]
  258. rowVariety := keywords[0]
  259. rowDestination := keywords[1]
  260. rowShipType := keywords[2]
  261. // 提取所有表格数据
  262. tableData := getNoHeadTableData(reportContent)
  263. // 提取日期信息
  264. dateText, err := getDateInfo(ctx)
  265. if err != nil {
  266. return []models.BaseFromLyData{}, err
  267. }
  268. // 时间格式转换
  269. format, err := utils.ConvertTimeFormat(dateText)
  270. if err != nil {
  271. return []models.BaseFromLyData{}, err
  272. }
  273. // 处理提取的表格数据
  274. var result []models.BaseFromLyData
  275. for _, data := range tableData {
  276. tableHeaders := data.Headers
  277. tableRows := data.Rows
  278. // 查找目标列
  279. columnIdx := -1
  280. for i, header := range tableHeaders {
  281. if strings.Contains(header, columnName) {
  282. columnIdx = i
  283. break
  284. }
  285. }
  286. if columnIdx == -1 {
  287. log.Printf("ShippingCostProcessor Process() : Column '%s' not found in table", columnName)
  288. continue
  289. }
  290. // 处理表格中的每一行
  291. for rowIndex, row := range tableRows {
  292. if len(row) == len(tableHeaders)-1 {
  293. row = append([]string{rowVariety}, row...)
  294. tableRows[rowIndex] = row
  295. rowShipType, err = extractValueInParentheses(rowVariety)
  296. if err != nil {
  297. logs.Error("ShippingCostProcessor Process() : Failed to extract value in parentheses: %v", err)
  298. continue
  299. }
  300. }
  301. if len(row) >= len(tableHeaders) && row[0] == rowVariety && (row[1] == rowDestination || strings.Contains(row[0], row[1])) && row[2] == rowShipType {
  302. if columnIdx < len(row) {
  303. // 指标名称
  304. indexName := strings.Join(keywords[:len(keywords)-3], `:`)
  305. // 指标编码
  306. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  307. // 指标id获取
  308. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  309. if err != nil {
  310. logs.Error("ShippingCostProcessor Process() : Failed to get index id: %v", err)
  311. continue
  312. }
  313. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  314. if err != nil {
  315. logs.Error("ShippingCostProcessor Process() : Failed to get data by index id and date: %v", err)
  316. continue
  317. }
  318. if len(indexData) > 0 {
  319. logs.Info("ShippingCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  320. continue
  321. }
  322. valueStr := row[columnIdx]
  323. value, err := strconv.ParseFloat(valueStr, 64)
  324. if err != nil {
  325. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  326. }
  327. // 创建并添加到结果列表
  328. baseFromLyData := models.BaseFromLyData{
  329. DataTime: format,
  330. Value: value,
  331. BaseFromLyIndexId: indexId,
  332. IndexCode: indexCode,
  333. }
  334. result = append(result, baseFromLyData)
  335. } else {
  336. log.Printf("ShippingCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  337. }
  338. break
  339. }
  340. }
  341. }
  342. return result, nil
  343. }
  344. // SupplyDemandBalanceProcessor
  345. // @Description: 供需平衡处理器
  346. type SupplyDemandBalanceProcessor struct{}
  347. func (p *SupplyDemandBalanceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  348. // https://www.fao.com.cn/art/gG7gKTCNDHLJNsq9QRYjoQ==.htm
  349. logs.Info("Processing processing report...")
  350. // 解析关键字
  351. if len(keywords) < 4 {
  352. return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : keywords must contain at least 4 elements")
  353. }
  354. // 拿到 行关键字和列关键字
  355. var columnName string
  356. rowVariety := keywords[1]
  357. // 提取所有表格数据
  358. tableData := getTableData(reportContent, true)
  359. logs.Info("SupplyDemandBalanceProcessor Process() : Table data: %v", tableData)
  360. // 提取日期信息
  361. dateText, err := getDateInfo(ctx)
  362. if err != nil {
  363. return []models.BaseFromLyData{}, err
  364. }
  365. // 时间格式转换
  366. format, err := utils.ConvertTimeFormat(dateText)
  367. if err != nil {
  368. return []models.BaseFromLyData{}, err
  369. }
  370. currentYearAndNextYear, err := utils.GetCurrentYearAndNextYear(format)
  371. if err != nil {
  372. return nil, err
  373. }
  374. month, err := utils.GetCurrentMonth(format)
  375. if err != nil {
  376. return nil, err
  377. }
  378. monthSuffix := "预估"
  379. logs.Info("SupplyDemandBalanceProcessor Process() : Target Year: %s:%s\n", currentYearAndNextYear, month+monthSuffix)
  380. // 处理提取的表格数据
  381. var result []models.BaseFromLyData
  382. headers := tableData.Headers
  383. rows := tableData.Rows
  384. for _, year := range currentYearAndNextYear {
  385. columnName = year + month + monthSuffix
  386. isCurrentYear, err := utils.IsCurrentYear(year)
  387. if err != nil {
  388. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to determine if year is current year: %v", err)
  389. continue
  390. }
  391. if !isCurrentYear {
  392. format, err = utils.GetNextYearLastDay(format)
  393. if err != nil {
  394. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get next year last day: %v", err)
  395. continue
  396. }
  397. }
  398. // 查找目标列
  399. columnIdx := -1
  400. for i, header := range headers {
  401. if strings.Contains(columnName, header) {
  402. columnIdx = i
  403. break
  404. }
  405. }
  406. if columnIdx == -1 {
  407. logs.Error("SupplyDemandBalanceProcessor Process() : Column '%s' not found in table", columnName)
  408. continue
  409. }
  410. // 处理表格中的每一行
  411. for _, row := range rows {
  412. if len(row) >= len(headers) && row[0] == rowVariety {
  413. if columnIdx < len(row) {
  414. // 指标名称
  415. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  416. // 指标编码
  417. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  418. // 指标id获取
  419. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  420. if err != nil {
  421. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get index id: %v", err)
  422. continue
  423. }
  424. valueStr := row[columnIdx]
  425. value, err := strconv.ParseFloat(valueStr, 64)
  426. if err != nil {
  427. return []models.BaseFromLyData{}, fmt.Errorf("SupplyDemandBalanceProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  428. }
  429. yearMonth, err := utils.GetYearMonth(format)
  430. if err != nil {
  431. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get year month: %v", err)
  432. continue
  433. }
  434. indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, yearMonth)
  435. if err != nil {
  436. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to get data by index id and date: %v", err)
  437. continue
  438. }
  439. if len(indexData) > 0 {
  440. logs.Info("SupplyDemandBalanceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  441. // 存在走更新逻辑 主要更新今年在去年的预估值
  442. indexData := indexData[0]
  443. if indexData.Value != value {
  444. time, err := utils.StringToTime(indexData.ModifyTime)
  445. if err != nil {
  446. return nil, err
  447. }
  448. timeZero, err := utils.StringToTimeZero(format)
  449. if err != nil {
  450. return nil, err
  451. }
  452. if time.Before(timeZero) {
  453. // 更新指标数据
  454. err := models.UpdateLyDataById(indexData.BaseFromLyDataId, value)
  455. if err != nil {
  456. logs.Error("SupplyDemandBalanceProcessor Process() : Failed to update data: %v", err)
  457. continue
  458. }
  459. // 更新指标库数据
  460. edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(indexData.IndexCode, yearMonth)
  461. if err != nil {
  462. return nil, err
  463. }
  464. if len(edbIndexData) > 0 {
  465. err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value)
  466. if err != nil {
  467. return nil, err
  468. }
  469. }
  470. }
  471. }
  472. continue
  473. }
  474. // 创建并添加到结果列表
  475. baseFromLyData := models.BaseFromLyData{
  476. DataTime: format,
  477. Value: value,
  478. BaseFromLyIndexId: indexId,
  479. IndexCode: indexCode,
  480. }
  481. result = append(result, baseFromLyData)
  482. } else {
  483. log.Printf("SupplyDemandBalanceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  484. }
  485. break
  486. }
  487. }
  488. }
  489. return result, nil
  490. }
  491. // PurchaseShippingProcessor
  492. // @Description: 采购装船处理器
  493. type PurchaseShippingProcessor struct{}
  494. func (p *PurchaseShippingProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  495. logs.Info("Processing purchase shipping...")
  496. // 解析关键字
  497. if len(keywords) < 3 {
  498. return []models.BaseFromLyData{}, fmt.Errorf("PurchaseShippingProcessor Process() : keywords must contain at least 3 elements")
  499. }
  500. // 拿到 行关键字和列关键字
  501. columnName := keywords[len(keywords)-3]
  502. // 提取所有表格数据
  503. tableData := getPurchaseShippingTableData(reportContent)
  504. logs.Info("PurchaseShippingProcessor Process() : Table data: %v", tableData)
  505. // 提取日期信息
  506. dateText, err := getDateInfo(ctx)
  507. if err != nil {
  508. return []models.BaseFromLyData{}, err
  509. }
  510. // 时间格式转换
  511. format, err := utils.ConvertTimeFormat(dateText)
  512. if err != nil {
  513. return []models.BaseFromLyData{}, err
  514. }
  515. // 处理提取的表格数据
  516. var result []models.BaseFromLyData
  517. headers := tableData.Headers
  518. rows := tableData.Rows
  519. // 查找目标列
  520. columnIdx := -1
  521. for i, header := range headers {
  522. if strings.Contains(columnName, header) {
  523. columnIdx = i
  524. break
  525. }
  526. }
  527. if columnIdx == -1 {
  528. log.Printf("PurchaseShippingProcessor Process() : Column '%s' not found in table", columnName)
  529. } else {
  530. // 处理表格中的每一行
  531. for _, row := range rows {
  532. if len(row) >= len(headers) {
  533. if columnIdx < len(row) {
  534. if !isNumber(row[columnIdx]) {
  535. continue
  536. }
  537. // 指标名称
  538. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  539. // 指标编码
  540. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  541. // 指标id获取
  542. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  543. if err != nil {
  544. logs.Error("PurchaseShippingProcessor Process() : Failed to get index id: %v", err)
  545. continue
  546. }
  547. var yearMonth string
  548. number, err := utils.ConvertMonthToNumber(row[1])
  549. if err != nil {
  550. return nil, err
  551. }
  552. yearMonth = row[0] + "-" + number
  553. isSameMonth, err := utils.IsSameMonth(format, yearMonth)
  554. if err != nil {
  555. return nil, err
  556. }
  557. if isSameMonth {
  558. yearMonth = format
  559. } else {
  560. lastDayOfMonth, err := utils.GetLastDayOfMonth(yearMonth)
  561. if err != nil {
  562. return nil, err
  563. }
  564. yearMonth = lastDayOfMonth
  565. }
  566. valueStr := row[columnIdx]
  567. value, err := strconv.ParseFloat(valueStr, 64)
  568. if err != nil {
  569. return []models.BaseFromLyData{}, fmt.Errorf("PurchaseShippingProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  570. }
  571. month, err := utils.GetYearMonth(yearMonth)
  572. if err != nil {
  573. return nil, err
  574. }
  575. indexData, err := models.GetLyDataByIndexIdAndDataTimeYM(indexId, month)
  576. if err != nil {
  577. logs.Error("PurchaseShippingProcessor Process() : Failed to get data by index id and date: %v", err)
  578. continue
  579. }
  580. if len(indexData) > 0 {
  581. if indexData[0].Value != value {
  582. logs.Info("PurchaseShippingProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  583. lyData := indexData[0]
  584. time, err := utils.StringToTime(lyData.ModifyTime)
  585. if err != nil {
  586. return nil, err
  587. }
  588. timeZero, err := utils.StringToTimeZero(format)
  589. if err != nil {
  590. return nil, err
  591. }
  592. if time.Before(timeZero) {
  593. // 更新指标数据
  594. err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
  595. if err != nil {
  596. return nil, err
  597. }
  598. // 同步更新指标库数据 须根据指标编码和日期更新
  599. edbIndexData, err := models.GetLyEdbDataByIndexCodeAndDataTime(lyData.IndexCode, month)
  600. if err != nil {
  601. return nil, err
  602. }
  603. if len(edbIndexData) > 0 {
  604. err := models.UpdateLyEdbDataById(edbIndexData[0].EdbInfoId, value)
  605. if err != nil {
  606. return nil, err
  607. }
  608. }
  609. }
  610. }
  611. continue
  612. }
  613. // 创建并添加到结果列表
  614. baseFromLyData := models.BaseFromLyData{
  615. DataTime: yearMonth,
  616. Value: value,
  617. BaseFromLyIndexId: indexId,
  618. IndexCode: indexCode,
  619. }
  620. result = append(result, baseFromLyData)
  621. continue
  622. } else {
  623. log.Printf("PurchaseShippingProcessor Process() : Column index out of range for row '%s'", columnName)
  624. }
  625. break
  626. }
  627. }
  628. }
  629. return result, nil
  630. }
  631. // ProcessingReportProcessor
  632. // @Description: 加工报告处理器
  633. type ProcessingReportProcessor struct {
  634. }
  635. func (p *ProcessingReportProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  636. logs.Info("Processing processing report...")
  637. // 解析关键字
  638. if len(keywords) < 3 {
  639. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : keywords must contain at least 3 elements")
  640. }
  641. // 拿到 行关键字和列关键字
  642. columnName := keywords[0]
  643. rowName := keywords[1]
  644. // 提取所有表格数据
  645. tableData := getAllTableData(reportContent)
  646. // 提取日期信息
  647. dateText, err := getDateInfo(ctx)
  648. if err != nil {
  649. return []models.BaseFromLyData{}, err
  650. }
  651. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  652. // 指标编码
  653. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  654. // 指标id获取
  655. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  656. if err != nil {
  657. return nil, err
  658. }
  659. // 校验指标数据是否存在 根据指标id和日期 存在则跳过,不存在正常往下走
  660. format, err := utils.ConvertTimeFormat(dateText)
  661. if err != nil {
  662. return []models.BaseFromLyData{}, err
  663. }
  664. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  665. if err != nil {
  666. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
  667. }
  668. if len(indexData) > 0 {
  669. logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  670. // 不必做更新处理,报告每周刷新,即使本周和上周数据一致,也需要每周记录
  671. return []models.BaseFromLyData{}, nil
  672. }
  673. // 解析日期并计算当前周数
  674. targetWeek, err := utils.ParseDateAndWeek(dateText)
  675. if err != nil {
  676. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : Failed to parse date: %v", err)
  677. }
  678. fmt.Printf("Target Week: %s\n", targetWeek)
  679. var result []models.BaseFromLyData
  680. // 处理提取的表格数据
  681. for _, data := range tableData {
  682. tableHeaders := data.Headers
  683. tableRows := data.Rows
  684. // 查找目标列
  685. columnIdx := -1
  686. for i, header := range tableHeaders {
  687. headerString := extractChinese(header)
  688. if strings.Contains(columnName, headerString) {
  689. // 这个表格不是很好处理,这里写的有些僵硬,后续需要优化
  690. if columnName == "国内大豆开机率" {
  691. i = i + 2
  692. }
  693. columnIdx = i
  694. break
  695. }
  696. }
  697. if columnIdx == -1 {
  698. logs.Error("ProcessingReportProcessor Process() : Column '%s' not found in table", columnName)
  699. continue
  700. }
  701. // 查找本周的列位置
  702. weekIdx := -1
  703. for i, header := range tableHeaders {
  704. if strings.Contains(header, targetWeek) && i > columnIdx {
  705. weekIdx = i
  706. break
  707. }
  708. }
  709. if weekIdx == -1 {
  710. fmt.Printf("Week column '%s' not found in table\n", targetWeek)
  711. continue
  712. }
  713. // 查找目标行
  714. for _, row := range tableRows {
  715. if strings.Contains(row[0], rowName) {
  716. if columnIdx < len(row) {
  717. // 指标名称
  718. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  719. // 指标编码
  720. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  721. // 指标id获取
  722. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  723. if err != nil {
  724. logs.Error("ProcessingReportProcessor Process() : Failed to get index id: %v", err)
  725. continue
  726. }
  727. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  728. if err != nil {
  729. logs.Error("ProcessingReportProcessor Process() : Failed to get data by index id and date: %v", err)
  730. continue
  731. }
  732. if len(indexData) > 0 {
  733. logs.Info("ProcessingReportProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  734. // 无需走更新逻辑,报告每日更新,即使今天和昨天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  735. continue
  736. }
  737. valueStr := row[columnIdx]
  738. value, err := strconv.ParseFloat(valueStr, 64)
  739. if err != nil {
  740. return []models.BaseFromLyData{}, fmt.Errorf("ProcessingReportProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  741. }
  742. // 创建并添加到结果列表
  743. baseFromLyData := models.BaseFromLyData{
  744. DataTime: format,
  745. Value: value,
  746. BaseFromLyIndexId: indexId,
  747. IndexCode: indexCode,
  748. }
  749. result = append(result, baseFromLyData)
  750. continue
  751. } else {
  752. log.Printf("ProcessingReportProcessor Process() : Column index out of range for row '%s', '%s'", rowName, columnName)
  753. }
  754. break
  755. }
  756. }
  757. }
  758. return result, nil
  759. }
  760. // InventoryAnalysisProcessor
  761. // @Description: 库存分析处理器
  762. type InventoryAnalysisProcessor struct{}
  763. func (p *InventoryAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  764. // https://www.fao.com.cn/art/yg1IKj9FpPEIDv2LefnPhQ==.htm
  765. logs.Info("Processing inventory analysis...")
  766. // 解析关键字
  767. if len(keywords) < 4 {
  768. return []models.BaseFromLyData{}, fmt.Errorf("InventoryAnalysisProcessor Process() : keywords must contain at least 4 elements")
  769. }
  770. // 拿到 行关键字和列关键字
  771. columnName := keywords[0]
  772. rowVariety := keywords[1]
  773. columnSuffix := "本周"
  774. columnName = columnName + columnSuffix
  775. // 提取所有表格数据
  776. tableData := getTableData(reportContent, true)
  777. logs.Info("InventoryAnalysisProcessor Process() : Table data: %v", tableData)
  778. // 提取日期信息
  779. dateText, err := getDateInfo(ctx)
  780. if err != nil {
  781. return []models.BaseFromLyData{}, err
  782. }
  783. // 时间格式转换
  784. format, err := utils.ConvertTimeFormat(dateText)
  785. if err != nil {
  786. return []models.BaseFromLyData{}, err
  787. }
  788. // 处理提取的表格数据
  789. var result []models.BaseFromLyData
  790. headers := tableData.Headers
  791. rows := tableData.Rows
  792. // 查找目标列
  793. columnIdx := -1
  794. for i, header := range headers {
  795. header := removeParentheses(header)
  796. if strings.Contains(columnName, header) {
  797. columnIdx = i
  798. break
  799. }
  800. }
  801. if columnIdx == -1 {
  802. logs.Error("InventoryAnalysisProcessor Process() : Column '%s' not found in table", columnName)
  803. } else {
  804. // 处理表格中的每一行
  805. for _, row := range rows {
  806. if len(row) >= len(headers) && strings.Contains(row[0], rowVariety) {
  807. if columnIdx < len(row) {
  808. // 指标名称
  809. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  810. indexName = removeParentheses(indexName)
  811. // 指标编码
  812. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  813. // 指标id获取
  814. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  815. if err != nil {
  816. logs.Error("InventoryAnalysisProcessor Process() : Failed to get index id: %v", err)
  817. continue
  818. }
  819. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  820. if err != nil {
  821. logs.Error("InventoryAnalysisProcessor Process() : Failed to get data by index id and date: %v", err)
  822. continue
  823. }
  824. if len(indexData) > 0 {
  825. logs.Info("InventoryAnalysisProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  826. // 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
  827. continue
  828. }
  829. valueStr := row[columnIdx]
  830. value, err := strconv.ParseFloat(valueStr, 64)
  831. if err != nil {
  832. return []models.BaseFromLyData{}, fmt.Errorf("InventoryAnalysisProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  833. }
  834. // 创建并添加到结果列表
  835. baseFromLyData := models.BaseFromLyData{
  836. DataTime: format,
  837. Value: value,
  838. BaseFromLyIndexId: indexId,
  839. IndexCode: indexCode,
  840. }
  841. result = append(result, baseFromLyData)
  842. continue
  843. } else {
  844. log.Printf("InventoryAnalysisProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  845. }
  846. break
  847. }
  848. }
  849. }
  850. return result, nil
  851. }
  852. // PriceSpreadArbitrageProcessor
  853. // @Description: 价差套利处理器
  854. type PriceSpreadArbitrageProcessor struct{}
  855. func (p *PriceSpreadArbitrageProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  856. fmt.Println("Processing processing profit...")
  857. // 解析关键字
  858. if len(keywords) < 4 {
  859. return []models.BaseFromLyData{}, fmt.Errorf("PriceSpreadArbitrageProcessor Process() : keywords must contain at least 4 elements")
  860. }
  861. // 拿到 行关键字和列关键字
  862. var columnDate string
  863. rowVariety := keywords[0]
  864. rowBourse := keywords[1]
  865. // 提取所有表格数据
  866. tableData := getNoHeadTableData(reportContent)
  867. // 提取日期信息
  868. dateText, err := getDateInfo(ctx)
  869. if err != nil {
  870. return []models.BaseFromLyData{}, err
  871. }
  872. // 时间格式转换
  873. format, err := utils.ConvertTimeFormat(dateText)
  874. if err != nil {
  875. return []models.BaseFromLyData{}, err
  876. }
  877. day, err := utils.ConvertTimeFormatToYearMonthDay(format)
  878. if err != nil {
  879. return nil, err
  880. }
  881. columnDate = day
  882. // 处理提取的表格数据
  883. var result []models.BaseFromLyData
  884. for _, data := range tableData {
  885. tableHeaders := data.Headers
  886. tableRows := data.Rows
  887. // 查找目标列
  888. columnIdx := -1
  889. for i, header := range tableHeaders {
  890. if strings.Contains(header, columnDate) {
  891. columnIdx = i
  892. break
  893. }
  894. }
  895. if columnIdx == -1 {
  896. log.Printf("PriceSpreadArbitrageProcessor Process() : Column '%s' not found in table", columnDate)
  897. continue
  898. }
  899. // 处理表格中的每一行
  900. for _, row := range tableRows {
  901. if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == rowBourse {
  902. if columnIdx < len(row) {
  903. // 指标名称
  904. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  905. // 指标编码
  906. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  907. // 指标id获取
  908. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  909. if err != nil {
  910. logs.Error("PriceSpreadArbitrageProcessor Process() : Failed to get index id: %v", err)
  911. continue
  912. }
  913. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  914. if err != nil {
  915. logs.Error("PriceSpreadArbitrageProcessor Process() : Failed to get data by index id and date: %v", err)
  916. continue
  917. }
  918. if len(indexData) > 0 {
  919. logs.Info("PriceSpreadArbitrageProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  920. // 无需走更新逻辑,报告每天更新,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  921. continue
  922. }
  923. valueStr := row[columnIdx]
  924. value, err := strconv.ParseFloat(valueStr, 64)
  925. if err != nil {
  926. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  927. }
  928. // 创建并添加到结果列表
  929. baseFromLyData := models.BaseFromLyData{
  930. DataTime: format,
  931. Value: value,
  932. BaseFromLyIndexId: indexId,
  933. IndexCode: indexCode,
  934. }
  935. result = append(result, baseFromLyData)
  936. } else {
  937. log.Printf("PriceSpreadArbitrageProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate)
  938. }
  939. break
  940. }
  941. }
  942. }
  943. return result, nil
  944. }
  945. // DailyTransactionProcessor
  946. // @Description: 每日成交处理器
  947. type DailyTransactionProcessor struct{}
  948. func (p *DailyTransactionProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  949. fmt.Println("Processing processing profit...")
  950. // 解析关键字
  951. if len(keywords) < 4 {
  952. return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : keywords must contain at least 4 elements")
  953. }
  954. // 获取第一个表格
  955. areaTableDataList := getNoHeadTableData(reportContent)
  956. if len(areaTableDataList) == 0 {
  957. return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : No table data found")
  958. }
  959. areaTableData := areaTableDataList[0]
  960. // 获取第二个表格
  961. blocTableData := getTableData(reportContent, false)
  962. if blocTableData.Headers == nil {
  963. return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : No table data found")
  964. }
  965. logs.Info("SupplyDemandBalanceProcessor Process() : areaTableData data: %v, blocTableData data: %v", areaTableData, blocTableData)
  966. // 提取日期信息
  967. dateText, err := getDateInfo(ctx)
  968. if err != nil {
  969. return []models.BaseFromLyData{}, err
  970. }
  971. // 时间格式转换
  972. format, err := utils.ConvertTimeFormat(dateText)
  973. if err != nil {
  974. return []models.BaseFromLyData{}, err
  975. }
  976. // 处理提取的表格数据
  977. var result []models.BaseFromLyData
  978. areaHeaders := areaTableData.Headers
  979. areaRows := areaTableData.Rows
  980. // 第一个表格
  981. // 拿到 行关键字和列关键字
  982. columnArea := keywords[1]
  983. var rowAreaMonthDays []string
  984. rowWeek := "地区总计"
  985. monthDay, err := utils.GetWeekdaysInSameWeek(format)
  986. if err != nil {
  987. return nil, err
  988. }
  989. rowAreaMonthDays = monthDay
  990. // 查找目标列
  991. areaColumnIdx := -1
  992. for i, header := range areaHeaders {
  993. if strings.Contains(header, columnArea) {
  994. areaColumnIdx = i
  995. break
  996. }
  997. }
  998. if areaColumnIdx == -1 {
  999. log.Printf("DailyTransactionProcessor Process() : One Column '%s' not found in table", columnArea)
  1000. } else if !strings.Contains(strings.Join(keywords[:len(keywords)-3], ":"), "主要集团") {
  1001. for _, row := range areaRows {
  1002. if len(row) >= len(areaHeaders) && row[0] == rowWeek {
  1003. if areaColumnIdx < len(row) {
  1004. // 指标名称
  1005. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  1006. // 指标编码
  1007. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1008. // 指标id获取
  1009. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1010. if err != nil {
  1011. logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
  1012. continue
  1013. }
  1014. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1015. if err != nil {
  1016. logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
  1017. continue
  1018. }
  1019. if len(indexData) > 0 {
  1020. logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1021. // 无需走更新逻辑,报告每周更新,一周出来一周中每天得数据,即使本周和上周数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  1022. continue
  1023. }
  1024. valueStr := row[areaColumnIdx]
  1025. isChinese := IsChinese(valueStr)
  1026. if isChinese {
  1027. continue
  1028. }
  1029. value, err := strconv.ParseFloat(valueStr, 64)
  1030. if err != nil {
  1031. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1032. }
  1033. // 创建并添加到结果列表
  1034. var dealDate string
  1035. if row[0] == rowWeek {
  1036. dealDate = format
  1037. } else {
  1038. date, err := utils.ConvertToDate(row[0])
  1039. if err != nil {
  1040. return nil, err
  1041. }
  1042. dealDate = date
  1043. }
  1044. baseFromLyData := models.BaseFromLyData{
  1045. DataTime: dealDate,
  1046. Value: value,
  1047. BaseFromLyIndexId: indexId,
  1048. IndexCode: indexCode,
  1049. }
  1050. result = append(result, baseFromLyData)
  1051. } else {
  1052. log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea)
  1053. }
  1054. break
  1055. } else {
  1056. for _, monthDay := range rowAreaMonthDays {
  1057. if len(row) >= len(areaHeaders) && (row[0] == monthDay && !strings.Contains(strings.Join(keywords[:len(keywords)-3], ":"), "周度")) {
  1058. if areaColumnIdx < len(row) {
  1059. // 指标名称
  1060. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  1061. // 指标编码
  1062. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1063. // 指标id获取
  1064. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1065. if err != nil {
  1066. logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
  1067. continue
  1068. }
  1069. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1070. if err != nil {
  1071. logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
  1072. continue
  1073. }
  1074. if len(indexData) > 0 {
  1075. logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1076. // 无需走更新逻辑,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
  1077. continue
  1078. }
  1079. valueStr := row[areaColumnIdx]
  1080. isChinese := IsChinese(valueStr)
  1081. if isChinese {
  1082. continue
  1083. }
  1084. value, err := strconv.ParseFloat(valueStr, 64)
  1085. if err != nil {
  1086. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1087. }
  1088. // 创建并添加到结果列表
  1089. var dealDate string
  1090. if row[0] == rowWeek {
  1091. dealDate = format
  1092. } else {
  1093. date, err := utils.ConvertToDate(row[0])
  1094. if err != nil {
  1095. return nil, err
  1096. }
  1097. dealDate = date
  1098. }
  1099. baseFromLyData := models.BaseFromLyData{
  1100. DataTime: dealDate,
  1101. Value: value,
  1102. BaseFromLyIndexId: indexId,
  1103. IndexCode: indexCode,
  1104. }
  1105. result = append(result, baseFromLyData)
  1106. } else {
  1107. log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", monthDay, columnArea)
  1108. }
  1109. break
  1110. }
  1111. }
  1112. }
  1113. }
  1114. }
  1115. // 第二个表格
  1116. // 拿到 行关键字和列关键字
  1117. columnBloc := keywords[len(keywords)-3]
  1118. rowBloc := keywords[1]
  1119. blocHeaders := blocTableData.Headers
  1120. blocRows := blocTableData.Rows
  1121. // 查找目标列
  1122. blocColumnIdx := -1
  1123. for i, header := range blocHeaders {
  1124. if strings.Contains(header, columnBloc) {
  1125. blocColumnIdx = i
  1126. break
  1127. }
  1128. }
  1129. if blocColumnIdx == -1 {
  1130. log.Printf("DailyTransactionProcessor Process() : Two Column '%s' not found in table", columnBloc)
  1131. } else {
  1132. // 处理表格中的每一行
  1133. for _, row := range blocRows {
  1134. if len(row) >= len(blocHeaders) && strings.Contains(row[0], rowBloc) {
  1135. if blocColumnIdx < len(row) {
  1136. // 指标名称
  1137. indexName := strings.Join(keywords[:len(keywords)-3], ":")
  1138. indexName = removeParentheses(indexName)
  1139. // 指标编码
  1140. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1141. // 指标id获取
  1142. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1143. if err != nil {
  1144. logs.Error("DailyTransactionProcessor Process() : Failed to get index id: %v", err)
  1145. continue
  1146. }
  1147. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1148. if err != nil {
  1149. logs.Error("DailyTransactionProcessor Process() : Failed to get data by index id and date: %v", err)
  1150. continue
  1151. }
  1152. if len(indexData) > 0 {
  1153. logs.Info("DailyTransactionProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1154. continue
  1155. }
  1156. valueStr := row[blocColumnIdx]
  1157. value, err := strconv.ParseFloat(valueStr, 64)
  1158. if err != nil {
  1159. return []models.BaseFromLyData{}, fmt.Errorf("DailyTransactionProcessor Process() : failed to parse value '%s': %v", valueStr, err)
  1160. }
  1161. // 创建并添加到结果列表
  1162. baseFromLyData := models.BaseFromLyData{
  1163. DataTime: format,
  1164. Value: value,
  1165. BaseFromLyIndexId: indexId,
  1166. IndexCode: indexCode,
  1167. }
  1168. result = append(result, baseFromLyData)
  1169. } else {
  1170. log.Printf("DailyTransactionProcessor Process() : Column index out of range for row '%s', '%s'", rowBloc, columnBloc)
  1171. }
  1172. break
  1173. }
  1174. }
  1175. }
  1176. return result, nil
  1177. }
  1178. // PalmOilImportCostProcessor 棕榈油进口成本
  1179. type PalmOilImportCostProcessor struct{}
  1180. func (p *PalmOilImportCostProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1181. logs.Info("Processing palm oil import cost...")
  1182. // 解析关键字
  1183. if len(keywords) < 5 {
  1184. return []models.BaseFromLyData{}, fmt.Errorf("PalmOilImportCostProcessor Process() : keywords must contain at least 5 elements")
  1185. }
  1186. // 拿到 行关键字和列关键字
  1187. columnName := keywords[len(keywords)-4]
  1188. rowVariety := keywords[0]
  1189. rowPort := keywords[len(keywords)-3]
  1190. indexNamePrefix := keywords[:1]
  1191. indexNameSuffix := keywords[1:]
  1192. // 提取所有表格数据
  1193. tableData := getNoHeadTableData(reportContent)
  1194. // 提取日期信息
  1195. dateText, err := getDateInfo(ctx)
  1196. if err != nil {
  1197. return []models.BaseFromLyData{}, err
  1198. }
  1199. // 时间格式转换
  1200. format, err := utils.ConvertTimeFormat(dateText)
  1201. if err != nil {
  1202. return []models.BaseFromLyData{}, err
  1203. }
  1204. // 解析日期并计算当前月份
  1205. targetMonths, err := utils.GetYearMonthNoYear(format)
  1206. if err != nil {
  1207. return []models.BaseFromLyData{}, fmt.Errorf("PalmOilImportCostProcessor Process() : Failed to parse date: %v", err)
  1208. }
  1209. fmt.Printf("Target Month: %s\n", targetMonths)
  1210. // 处理提取的表格数据
  1211. var result []models.BaseFromLyData
  1212. for _, data := range tableData {
  1213. tableHeaders := data.Headers
  1214. tableRows := data.Rows
  1215. // 查找目标列
  1216. columnIdx := -1
  1217. for i, header := range tableHeaders {
  1218. if strings.Contains(header, columnName) {
  1219. columnIdx = i
  1220. break
  1221. }
  1222. }
  1223. if columnIdx == -1 {
  1224. log.Printf("PalmOilImportCostProcessor Process() : Column '%s' not found in table", columnName)
  1225. continue
  1226. }
  1227. // 处理表格中的每一行
  1228. //var flag bool = true
  1229. var previousRowVariety string
  1230. var previousRowPort string
  1231. var previousRowFob string
  1232. for rowIndex, row := range tableRows {
  1233. if len(row) == len(tableHeaders) {
  1234. previousRowVariety = row[0]
  1235. previousRowPort = row[1]
  1236. previousRowFob = row[2]
  1237. } else if len(row) == len(tableHeaders)-1 {
  1238. previousRowPort = row[0]
  1239. previousRowFob = row[1]
  1240. row = append([]string{previousRowVariety}, row...)
  1241. tableRows[rowIndex] = row
  1242. } else if len(row) == len(tableHeaders)-2 {
  1243. // 这段这里不需要。。。先保留吧
  1244. previousRowFob = row[0]
  1245. row = append([]string{previousRowVariety, previousRowPort}, row...)
  1246. tableRows[rowIndex] = row
  1247. } else if len(row) == len(tableHeaders)-3 {
  1248. row = append([]string{previousRowVariety, previousRowPort, previousRowFob}, row...)
  1249. tableRows[rowIndex] = row
  1250. }
  1251. for _, targetMonth := range targetMonths {
  1252. if len(row) >= len(tableHeaders) && strings.Contains(rowVariety, row[0]) && row[1] == targetMonth && row[len(row)-1] == rowPort {
  1253. if columnIdx < len(row) {
  1254. // 指标名称
  1255. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  1256. indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
  1257. // 指标编码
  1258. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1259. // 指标id获取
  1260. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1261. if err != nil {
  1262. logs.Error("PalmOilImportCostProcessor Process() : Failed to get index id: %v", err)
  1263. continue
  1264. }
  1265. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1266. if err != nil {
  1267. logs.Error("PalmOilImportCostProcessor Process() : Failed to get data by index id and date: %v", err)
  1268. continue
  1269. }
  1270. if len(indexData) > 0 {
  1271. logs.Info("PalmOilImportCostProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1272. continue
  1273. }
  1274. valueStr := row[columnIdx]
  1275. value, err := strconv.ParseFloat(valueStr, 64)
  1276. if err != nil {
  1277. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1278. }
  1279. // 创建并添加到结果列表
  1280. baseFromLyData := models.BaseFromLyData{
  1281. DataTime: format,
  1282. Value: value,
  1283. BaseFromLyIndexId: indexId,
  1284. IndexCode: indexCode,
  1285. }
  1286. result = append(result, baseFromLyData)
  1287. } else {
  1288. log.Printf("PalmOilImportCostProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, rowPort)
  1289. }
  1290. break
  1291. }
  1292. }
  1293. }
  1294. }
  1295. return result, nil
  1296. }
  1297. // ImportEstimateProcessor
  1298. // @Description: 进口预估处理器
  1299. type ImportEstimateProcessor struct{}
  1300. func (p *ImportEstimateProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1301. logs.Info("Processing import estimate...")
  1302. // 解析关键字
  1303. if len(keywords) < 4 {
  1304. return []models.BaseFromLyData{}, fmt.Errorf("ImportEstimateProcessor Process() : keywords must contain at least 4 elements")
  1305. }
  1306. // 拿到 行关键字和列关键字
  1307. var columnDates []string
  1308. rowVariety := keywords[1]
  1309. // 提取所有表格数据
  1310. tableData := getNoHeadTableData(reportContent)
  1311. // 提取日期信息
  1312. dateText, err := getDateInfo(ctx)
  1313. if err != nil {
  1314. return []models.BaseFromLyData{}, err
  1315. }
  1316. // 时间格式转换
  1317. format, err := utils.ConvertTimeFormat(dateText)
  1318. if err != nil {
  1319. return []models.BaseFromLyData{}, err
  1320. }
  1321. columnDates, err = utils.GetNextThreeMonthsNoYear(format)
  1322. if err != nil {
  1323. return nil, err
  1324. }
  1325. monthsLastDay, err := utils.GetNextThreeMonthsLastDay(format)
  1326. if err != nil {
  1327. return nil, err
  1328. }
  1329. // 处理提取的表格数据
  1330. var result []models.BaseFromLyData
  1331. for _, data := range tableData {
  1332. tableHeaders := data.Headers
  1333. tableRows := data.Rows
  1334. // 查找目标列
  1335. for _, columnDate := range columnDates {
  1336. columnIdx := -1
  1337. for i, tableHeader := range tableHeaders {
  1338. if strings.Contains(tableHeader, columnDate) {
  1339. columnIdx = i
  1340. break
  1341. }
  1342. }
  1343. if columnIdx == -1 {
  1344. log.Printf("ImportEstimateProcessor Process() : Column '%s' not found in table", columnDate)
  1345. continue
  1346. } else {
  1347. // 处理表格中的每一行
  1348. for _, row := range tableRows {
  1349. if len(row) >= len(tableHeaders) && strings.Contains(row[0], rowVariety) && isNumber(row[columnIdx]) {
  1350. if columnIdx < len(row) {
  1351. // 指标名称
  1352. indexName := strings.Join(keywords[:len(keywords)-2], `:`)
  1353. // 指标编码
  1354. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1355. // 指标id获取
  1356. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1357. if err != nil {
  1358. logs.Error("ImportEstimateProcessor Process() : Failed to get index id: %v", err)
  1359. continue
  1360. }
  1361. toNumber, err := utils.ConvertMonthToNumber(columnDate)
  1362. if err != nil {
  1363. logs.Error("ImportEstimateProcessor Process() : Failed to convert month to number: %v", err)
  1364. continue
  1365. }
  1366. slice, err := utils.GetElementInSlice(monthsLastDay, toNumber)
  1367. if err != nil {
  1368. logs.Error("ImportEstimateProcessor Process() : Failed to get element in slice: %v", err)
  1369. continue
  1370. }
  1371. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, slice)
  1372. if err != nil {
  1373. logs.Error("ImportEstimateProcessor Process() : Failed to get data by index id and date: %v", err)
  1374. continue
  1375. }
  1376. valueStr := row[columnIdx]
  1377. value, err := strconv.ParseFloat(valueStr, 64)
  1378. if err != nil {
  1379. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1380. }
  1381. if len(indexData) > 0 {
  1382. if indexData[0].Value != value {
  1383. logs.Info("ImportEstimateProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1384. lyData := indexData[0]
  1385. time, err := utils.StringToTime(lyData.ModifyTime)
  1386. if err != nil {
  1387. return nil, err
  1388. }
  1389. timeZero, err := utils.StringToTimeZero(format)
  1390. if err != nil {
  1391. return nil, err
  1392. }
  1393. if lyData.Value != value && time.Before(timeZero) {
  1394. // 更新指标数据
  1395. err := models.UpdateLyDataById(lyData.BaseFromLyDataId, value)
  1396. if err != nil {
  1397. return nil, err
  1398. }
  1399. // 同步更新指标库数据
  1400. lyEdbIndexData, err := models.GetLyEdbDataByIndexCodeAndExactDataTime(lyData.IndexCode, lyData.DataTime)
  1401. if err != nil {
  1402. return nil, err
  1403. }
  1404. if len(lyEdbIndexData) > 0 {
  1405. err := models.UpdateLyEdbDataById(lyEdbIndexData[0].EdbInfoId, value)
  1406. if err != nil {
  1407. return nil, err
  1408. }
  1409. }
  1410. }
  1411. }
  1412. continue
  1413. }
  1414. // 创建并添加到结果列表
  1415. baseFromLyData := models.BaseFromLyData{
  1416. DataTime: slice,
  1417. Value: value,
  1418. BaseFromLyIndexId: indexId,
  1419. IndexCode: indexCode,
  1420. }
  1421. result = append(result, baseFromLyData)
  1422. } else {
  1423. log.Printf("ImportEstimateProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnDate)
  1424. }
  1425. break
  1426. }
  1427. }
  1428. }
  1429. }
  1430. }
  1431. return result, nil
  1432. }
  1433. // InternationalPriceProcessor
  1434. // @Description: 国际价格处理器
  1435. type InternationalPriceProcessor struct{}
  1436. func (p *InternationalPriceProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1437. logs.Info("Processing international price...")
  1438. // 解析关键字
  1439. if len(keywords) < 4 {
  1440. return []models.BaseFromLyData{}, fmt.Errorf("InternationalPriceProcessor Process() : keywords must contain at least 4 elements")
  1441. }
  1442. // 拿到 行关键字和列关键字
  1443. columnName := keywords[1]
  1444. rowVariety := keywords[0]
  1445. indexNamePrefix := keywords[:1]
  1446. indexNameSuffix := keywords[1:]
  1447. // 提取所有表格数据
  1448. tableData := getNoHeadTableData(reportContent)
  1449. // 提取日期信息
  1450. dateText, err := getDateInfo(ctx)
  1451. if err != nil {
  1452. return []models.BaseFromLyData{}, err
  1453. }
  1454. // 时间格式转换
  1455. format, err := utils.ConvertTimeFormat(dateText)
  1456. if err != nil {
  1457. return []models.BaseFromLyData{}, err
  1458. }
  1459. // 解析日期并计算当前月份 和 后两月
  1460. yearMonths, err := utils.ConvertTimeFormatToYearMonth(format)
  1461. if err != nil {
  1462. return nil, err
  1463. }
  1464. fmt.Printf("Target yearMonth: %s\n", yearMonths)
  1465. // 处理提取的表格数据
  1466. var result []models.BaseFromLyData
  1467. for _, data := range tableData {
  1468. tableHeaders := data.Headers
  1469. tableRows := data.Rows
  1470. // 查找目标列
  1471. columnIdx := -1
  1472. for i, header := range tableHeaders {
  1473. if strings.Contains(header, columnName) {
  1474. columnIdx = i
  1475. break
  1476. }
  1477. }
  1478. if columnIdx == -1 {
  1479. log.Printf("InternationalPriceProcessor Process() : Column '%s' not found in table", columnName)
  1480. continue
  1481. }
  1482. // 处理表格中的每一行
  1483. var previousRowVariety string
  1484. for rowIndex, row := range tableRows {
  1485. if len(row) == len(tableHeaders) {
  1486. previousRowVariety = row[0]
  1487. } else if len(row) == len(tableHeaders)-1 {
  1488. row = append([]string{previousRowVariety}, row...)
  1489. tableRows[rowIndex] = row
  1490. }
  1491. for _, targetMonth := range yearMonths {
  1492. if len(row) >= len(tableHeaders) && row[0] == rowVariety && row[1] == targetMonth {
  1493. if columnIdx < len(row) {
  1494. // 指标名称
  1495. indexNameList := append(indexNamePrefix, append([]string{targetMonth}, indexNameSuffix...)...)
  1496. indexName := strings.Join(indexNameList[:len(keywords)-2], ":")
  1497. // 指标编码
  1498. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1499. // 指标id获取
  1500. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1501. if err != nil {
  1502. logs.Error("InternationalPriceProcessor Process() : Failed to get index id: %v", err)
  1503. continue
  1504. }
  1505. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1506. if err != nil {
  1507. logs.Error("InternationalPriceProcessor Process() : Failed to get data by index id and date: %v", err)
  1508. continue
  1509. }
  1510. if len(indexData) > 0 {
  1511. logs.Info("InternationalPriceProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1512. // 无需更新 指标展示本月和后两月的数据,报告每天更新,每天的值可能会改变,即使今天和每天数据一致,也需要每天记录,如果到这里也只是说,今天这个报告被读取了两次
  1513. continue
  1514. }
  1515. valueStr := row[columnIdx]
  1516. value, err := strconv.ParseFloat(valueStr, 64)
  1517. if err != nil {
  1518. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1519. }
  1520. // 创建并添加到结果列表
  1521. baseFromLyData := models.BaseFromLyData{
  1522. DataTime: format,
  1523. Value: value,
  1524. BaseFromLyIndexId: indexId,
  1525. IndexCode: indexCode,
  1526. }
  1527. result = append(result, baseFromLyData)
  1528. } else {
  1529. log.Printf("InternationalPriceProcessor Process() : Column index out of range for row '%s', '%s'", rowVariety, columnName)
  1530. }
  1531. break
  1532. }
  1533. }
  1534. }
  1535. }
  1536. return result, nil
  1537. }
  1538. // CanadaStatisticsBureauProcessor
  1539. // @Description: 加拿大统计局处理器
  1540. type CanadaStatisticsBureauProcessor struct{}
  1541. func (p *CanadaStatisticsBureauProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1542. logs.Info("Processing Canada statistics bureau...")
  1543. // 解析关键字
  1544. if len(keywords) < 4 {
  1545. return []models.BaseFromLyData{}, fmt.Errorf("CanadaStatisticsBureauProcessor Process() : keywords must contain at least 4 elements")
  1546. }
  1547. // 拿到 行关键字和列关键字
  1548. columnDate := "本周"
  1549. rowVariety := keywords[1]
  1550. // 提取所有表格数据
  1551. tableData := getNoHeadTableData(reportContent)
  1552. // 提取日期信息
  1553. dateText, err := getDateInfo(ctx)
  1554. if err != nil {
  1555. return []models.BaseFromLyData{}, err
  1556. }
  1557. // 时间格式转换
  1558. format, err := utils.ConvertTimeFormat(dateText)
  1559. if err != nil {
  1560. return []models.BaseFromLyData{}, err
  1561. }
  1562. // 处理提取的表格数据
  1563. var result []models.BaseFromLyData
  1564. for _, data := range tableData {
  1565. tableHeaders := data.Headers
  1566. tableRows := data.Rows
  1567. // 查找目标列
  1568. columnIdx := -1
  1569. for i, header := range tableHeaders {
  1570. if strings.Contains(header, columnDate) {
  1571. columnIdx = i
  1572. break
  1573. }
  1574. }
  1575. if columnIdx == -1 {
  1576. log.Printf("CanadaStatisticsBureauProcessor Process() : Column '%s' not found in table", columnDate)
  1577. continue
  1578. }
  1579. // 处理表格中的每一行
  1580. for _, row := range tableRows {
  1581. if len(row) >= len(tableHeaders) {
  1582. if columnIdx < len(row) {
  1583. if row[0] != rowVariety {
  1584. continue
  1585. }
  1586. // 指标名称
  1587. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  1588. // 指标编码
  1589. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1590. // 指标id获取
  1591. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1592. if err != nil {
  1593. logs.Error("CanadaStatisticsBureauProcessor Process() : Failed to get index id: %v", err)
  1594. continue
  1595. }
  1596. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, format)
  1597. if err != nil {
  1598. logs.Error("CanadaStatisticsBureauProcessor Process() : Failed to get data by index id and date: %v", err)
  1599. continue
  1600. }
  1601. if len(indexData) > 0 {
  1602. logs.Info("CanadaStatisticsBureauProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1603. // 无需更新 指标展示本周的数据,报告每周更新,即使本周和上周数据一致,也需要每周记录,如果到这里也只是说,今天这个报告被读取了两次
  1604. continue
  1605. }
  1606. valueStr := row[columnIdx]
  1607. value, err := strconv.ParseFloat(valueStr, 64)
  1608. if err != nil {
  1609. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1610. }
  1611. // 创建并添加到结果列表
  1612. baseFromLyData := models.BaseFromLyData{
  1613. DataTime: format,
  1614. Value: value,
  1615. BaseFromLyIndexId: indexId,
  1616. IndexCode: indexCode,
  1617. }
  1618. result = append(result, baseFromLyData)
  1619. } else {
  1620. log.Printf("CanadaStatisticsBureauProcessor Process() : Column index out of range for row '%s'", columnDate)
  1621. }
  1622. break
  1623. }
  1624. }
  1625. }
  1626. return result, nil
  1627. }
  1628. // ImportExportAnalysisProcessor
  1629. // @Description: 进出口分析处理器
  1630. type ImportExportAnalysisProcessor struct{}
  1631. func (p *ImportExportAnalysisProcessor) Process(ctx context.Context, product string, reportContent string, keywords []string, classifyId int) ([]models.BaseFromLyData, error) {
  1632. fmt.Println("Processing processing profit...")
  1633. // 解析关键字
  1634. if len(keywords) < 3 {
  1635. return []models.BaseFromLyData{}, fmt.Errorf("ImportExportAnalysisProcessor Process() : keywords must contain at least 3 elements")
  1636. }
  1637. // 拿到 行关键字和列关键字
  1638. var columnDates []string
  1639. // 提取所有表格数据
  1640. tableData := getNoHeadTableData(reportContent)
  1641. // 提取日期信息
  1642. dateText, err := getDateInfo(ctx)
  1643. if err != nil {
  1644. return []models.BaseFromLyData{}, err
  1645. }
  1646. // 时间格式转换
  1647. format, err := utils.ConvertTimeFormat(dateText)
  1648. if err != nil {
  1649. return []models.BaseFromLyData{}, err
  1650. }
  1651. // 2025年1月可能才出2024年12月的数据,所以往前取一年
  1652. columnDates, err = utils.GetCurrentYearAndLastYear(format)
  1653. if err != nil {
  1654. return nil, err
  1655. }
  1656. // 处理提取的表格数据
  1657. var result []models.BaseFromLyData
  1658. for _, data := range tableData {
  1659. tableHeaders := data.Headers
  1660. tableRows := data.Rows
  1661. for _, columnDate := range columnDates {
  1662. // 查找目标列
  1663. columnIdx := -1
  1664. for i, header := range tableHeaders {
  1665. if strings.Contains(header, columnDate) {
  1666. columnIdx = i
  1667. break
  1668. }
  1669. }
  1670. if columnIdx == -1 {
  1671. log.Printf("ImportExportAnalysisProcessor Process() : Column '%s' not found in table", columnDate)
  1672. continue
  1673. }
  1674. // 处理表格中的每一行
  1675. for _, row := range tableRows {
  1676. if len(row) >= len(tableHeaders) {
  1677. if columnIdx < len(row) && isNumber(row[columnIdx]) && isNumber(row[0]) {
  1678. // 指标名称
  1679. indexName := strings.Join(keywords[:len(keywords)-2], ":")
  1680. // 指标编码
  1681. indexCode := utils.GenerateIndexCode(lySourceName, indexName)
  1682. // 指标id获取
  1683. indexId, err := getIndexId(indexCode, indexName, classifyId, lySourceName, keywords[len(keywords)-2], keywords[len(keywords)-1])
  1684. if err != nil {
  1685. logs.Error("ImportExportAnalysisProcessor Process() : Failed to get index id: %v", err)
  1686. continue
  1687. }
  1688. atoi, err := strconv.Atoi(row[0])
  1689. if err != nil {
  1690. return nil, err
  1691. }
  1692. date := columnDate[:4] + "-" + fmt.Sprintf("%02d", atoi)
  1693. lastDayOfMonth, err := utils.GetLastDayOfMonth(date)
  1694. if err != nil {
  1695. return nil, err
  1696. }
  1697. indexData, err := models.GetLyDataByIndexIdAndDataTime(indexId, lastDayOfMonth)
  1698. if err != nil {
  1699. logs.Error("ImportExportAnalysisProcessor Process() : Failed to get data by index id and date: %v", err)
  1700. continue
  1701. }
  1702. if len(indexData) > 0 {
  1703. logs.Info("ImportExportAnalysisProcessor Process() : Data already exists for index %d and date %s", indexId, dateText)
  1704. continue
  1705. }
  1706. valueStr := row[columnIdx]
  1707. value, err := strconv.ParseFloat(valueStr, 64)
  1708. if err != nil {
  1709. return []models.BaseFromLyData{}, fmt.Errorf("failed to parse value '%s': %v", valueStr, err)
  1710. }
  1711. // 创建并添加到结果列表
  1712. baseFromLyData := models.BaseFromLyData{
  1713. DataTime: lastDayOfMonth,
  1714. Value: value,
  1715. BaseFromLyIndexId: indexId,
  1716. IndexCode: indexCode,
  1717. }
  1718. result = append(result, baseFromLyData)
  1719. continue
  1720. } else {
  1721. log.Printf("ImportExportAnalysisProcessor Process() : Column index out of range for row '%s'", columnDate)
  1722. }
  1723. break
  1724. }
  1725. }
  1726. }
  1727. }
  1728. return result, nil
  1729. }
  1730. // ExtractValueInParentheses 从字符串中提取括号中的值
  1731. func extractValueInParentheses(input string) (string, error) {
  1732. re := regexp.MustCompile(`(([^)]+))`)
  1733. matches := re.FindStringSubmatch(input)
  1734. if len(matches) > 1 {
  1735. return matches[1], nil
  1736. }
  1737. return "", fmt.Errorf("no value found in parentheses")
  1738. }
  1739. // 获取指标id,根据指标名称判断,没有插入指标生成返回
  1740. func getIndexId(indexCode string, indexName string, classifyId int, sourceName string, frequency string, unit string) (int, error) {
  1741. if indexCode == "lysww" {
  1742. return 0, fmt.Errorf("indexCode is error")
  1743. }
  1744. // 判断指标是否存在
  1745. var indexId int
  1746. indexInfo, err := models.GetLyIndexByCode(indexCode)
  1747. if err != nil {
  1748. return indexId, err
  1749. }
  1750. if indexInfo == nil {
  1751. // 新增指标
  1752. index, err := addLyIndex(classifyId, indexCode, indexName, frequency, unit)
  1753. if err != nil {
  1754. return 0, err
  1755. }
  1756. indexId = index
  1757. } else {
  1758. indexId = indexInfo.BaseFromLyIndexId
  1759. }
  1760. return indexId, nil
  1761. }
  1762. // 获取页面时间信息
  1763. func getDateInfo(ctx context.Context) (string, error) {
  1764. var dateText string
  1765. err := chromedp.Run(ctx,
  1766. chromedp.Evaluate(`document.querySelector('div.a_date span').innerText`, &dateText),
  1767. )
  1768. if err != nil {
  1769. return "", fmt.Errorf("processing Process() : Failed to extract report date: %v", err)
  1770. }
  1771. logs.Info("Processing Process() : Report Extracted Date: %s", dateText)
  1772. return dateText, nil
  1773. }
  1774. // 获取所有表格数据 获取表格中有thead标签的数据
  1775. func getAllTableData(reportContent string) []TableData {
  1776. var tableData []TableData
  1777. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1778. if err != nil {
  1779. log.Fatal(err)
  1780. }
  1781. // 选择 id 为 "a_content" 的 div
  1782. doc.Find("#a_content").Each(func(index int, item *goquery.Selection) {
  1783. item.Find("table").Each(func(index int, table *goquery.Selection) {
  1784. var headers []string
  1785. var rows [][]string
  1786. // 提取表头
  1787. table.Find("thead th").Each(func(index int, th *goquery.Selection) {
  1788. headers = append(headers, th.Text())
  1789. })
  1790. // 提取表格行数据
  1791. table.Find("tbody tr").Each(func(index int, row *goquery.Selection) {
  1792. var rowData []string
  1793. row.Find("td").Each(func(index int, td *goquery.Selection) {
  1794. rowData = append(rowData, td.Text())
  1795. })
  1796. rows = append(rows, rowData)
  1797. })
  1798. // 仅在表头存在时添加到结果中
  1799. if len(headers) > 0 {
  1800. tableData = append(tableData, TableData{
  1801. Headers: headers,
  1802. Rows: rows,
  1803. })
  1804. }
  1805. })
  1806. })
  1807. return tableData
  1808. }
  1809. // 获取无头表格数据
  1810. func getNoHeadTableData(reportContent string) []TableData {
  1811. var tableData []TableData
  1812. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1813. if err != nil {
  1814. log.Fatal(err)
  1815. }
  1816. // Find the div with id "a_content"
  1817. doc.Find("#a_content").Each(func(index int, div *goquery.Selection) {
  1818. // Find all tables within the div
  1819. div.Find("table").Each(func(index int, table *goquery.Selection) {
  1820. var headers []string
  1821. var rows [][]string
  1822. // Extract table headers if any
  1823. table.Find("tr").Each(func(index int, tr *goquery.Selection) {
  1824. var rowData []string
  1825. tr.Find("td, th").Each(func(index int, cell *goquery.Selection) {
  1826. cellText := cell.Text()
  1827. rowData = append(rowData, cellText)
  1828. })
  1829. if index == 0 && len(rowData) > 0 {
  1830. // The first row is treated as the header row
  1831. headers = rowData
  1832. } else if len(rowData) > 0 {
  1833. // Add the row data to the rows slice
  1834. rows = append(rows, rowData)
  1835. }
  1836. })
  1837. // Only add table data if headers are present
  1838. if len(headers) > 0 {
  1839. tableData = append(tableData, TableData{
  1840. Headers: headers,
  1841. Rows: rows,
  1842. })
  1843. }
  1844. })
  1845. })
  1846. return tableData
  1847. }
  1848. // 获取表格数据 获取id 为 a_content 的 div 中的第一个表格 左上角那个单元格会拼在第一个,会拼上列上的合并单元格
  1849. func getTableData(reportContent string, isFirst bool) TableData {
  1850. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1851. if err != nil {
  1852. log.Fatal(err)
  1853. }
  1854. tableData := &TableData{}
  1855. // 只提取 id 为 a_content 的 div 中的第一个表格
  1856. var firstTable *goquery.Selection
  1857. if isFirst {
  1858. firstTable = doc.Find("div#a_content table").First()
  1859. } else {
  1860. firstTable = doc.Find("div#a_content table").Last()
  1861. }
  1862. var combinedHeaders []string
  1863. // 提取表头
  1864. firstTable.Find("tr").Each(func(i int, row *goquery.Selection) {
  1865. if i == 0 {
  1866. // 第一行处理合并单元格,保存到 combinedHeaders
  1867. row.Find("td,th").Each(func(j int, cell *goquery.Selection) {
  1868. if j == 0 {
  1869. // 把左上角的“年度(10/9月)”放入 Headers 第一个元素
  1870. tableData.Headers = append(tableData.Headers, strings.TrimSpace(cell.Text()))
  1871. } else {
  1872. // 处理其他单元格
  1873. colspan, exists := cell.Attr("colspan")
  1874. if exists {
  1875. spanCount := 0
  1876. fmt.Sscanf(colspan, "%d", &spanCount)
  1877. for k := 0; k < spanCount; k++ {
  1878. combinedHeaders = append(combinedHeaders, strings.TrimSpace(cell.Text()))
  1879. }
  1880. } else {
  1881. combinedHeaders = append(combinedHeaders, strings.TrimSpace(cell.Text()))
  1882. }
  1883. }
  1884. })
  1885. } else if i == 1 {
  1886. // 第二行处理具体标题,组合后保存到 Headers
  1887. row.Find("td,th").Each(func(j int, cell *goquery.Selection) {
  1888. if j < len(combinedHeaders) {
  1889. fullHeader := combinedHeaders[j] + strings.TrimSpace(cell.Text())
  1890. tableData.Headers = append(tableData.Headers, fullHeader)
  1891. }
  1892. })
  1893. } else {
  1894. // 处理数据行
  1895. var rowData []string
  1896. row.Find("td").Each(func(j int, cell *goquery.Selection) {
  1897. rowData = append(rowData, strings.TrimSpace(cell.Text()))
  1898. })
  1899. if len(rowData) > 0 {
  1900. tableData.Rows = append(tableData.Rows, rowData)
  1901. }
  1902. }
  1903. })
  1904. return *tableData
  1905. }
  1906. // 获取采购装船表格数据
  1907. func getPurchaseShippingTableData(reportContent string) TableData {
  1908. doc, err := goquery.NewDocumentFromReader(strings.NewReader(reportContent))
  1909. if err != nil {
  1910. log.Fatal(err)
  1911. }
  1912. var tableData TableData
  1913. // 只提取 id 为 a_content 的 div 中的第一个表格
  1914. firstTable := doc.Find("div#a_content table").First()
  1915. // 提取表头
  1916. var headers []string
  1917. var subHeaders []string
  1918. firstTable.Find("thead tr").Each(func(i int, row *goquery.Selection) {
  1919. row.Find("th").Each(func(j int, cell *goquery.Selection) {
  1920. headerText := strings.TrimSpace(cell.Text())
  1921. if i == 0 {
  1922. // 处理第一行表头
  1923. colspan, exists := cell.Attr("colspan")
  1924. if exists {
  1925. spanCount := 0
  1926. fmt.Sscanf(colspan, "%d", &spanCount)
  1927. for k := 0; k < spanCount; k++ {
  1928. headers = append(headers, headerText)
  1929. }
  1930. } else {
  1931. headers = append(headers, headerText)
  1932. }
  1933. } else if i == 1 {
  1934. // 处理第二行表头
  1935. subHeaders = append(subHeaders, headerText)
  1936. }
  1937. })
  1938. })
  1939. // 合并第一行和第二行表头信息
  1940. if len(subHeaders) > 0 {
  1941. for i := 0; i < len(subHeaders); i++ {
  1942. // 从第四个单元格开始拼接
  1943. headers[3+i] = headers[3+i] + subHeaders[i]
  1944. }
  1945. }
  1946. tableData.Headers = headers
  1947. // 处理数据行
  1948. firstTable.Find("tbody tr").Each(func(i int, row *goquery.Selection) {
  1949. var rowData []string
  1950. row.Find("td").Each(func(j int, cell *goquery.Selection) {
  1951. rowData = append(rowData, strings.TrimSpace(cell.Text()))
  1952. })
  1953. if len(rowData) > 0 {
  1954. tableData.Rows = append(tableData.Rows, rowData)
  1955. }
  1956. })
  1957. return tableData
  1958. }
  1959. // 判断字符串是否是数字
  1960. func isNumeric(value string) bool {
  1961. // 正则表达式匹配整数和浮点数
  1962. re := regexp.MustCompile(`^[+-]?(\d+(\.\d*)?|\.\d+)$`)
  1963. return re.MatchString(value)
  1964. }
  1965. // 只保留汉字
  1966. func extractChinese(text string) string {
  1967. re := regexp.MustCompile(`[^\p{Han}]`) // 匹配非汉字字符
  1968. return re.ReplaceAllString(text, "")
  1969. }
  1970. // 去除括号中的内容 包含括号 ()
  1971. func removeParentheses(text string) string {
  1972. re := regexp.MustCompile(`\([^)]*\)`)
  1973. return re.ReplaceAllString(text, "")
  1974. }
  1975. // IsChinese 判断传入的是否是汉字
  1976. func IsChinese(str string) bool {
  1977. for _, r := range str {
  1978. if unicode.Is(unicode.Han, r) {
  1979. return true
  1980. }
  1981. }
  1982. return false
  1983. }
  1984. // 判断是否是数字
  1985. func isNumber(str string) bool {
  1986. _, err := strconv.ParseFloat(str, 64)
  1987. return err == nil
  1988. }