package base_from_ccf import ( "context" "encoding/json" "eta/eta_data_analysis/models" "eta/eta_data_analysis/utils" "fmt" "github.com/PuerkitoBio/goquery" "io/ioutil" "strconv" "strings" "time" ) // TaskAnalysisHandlers 解析表格的函数 var TaskAnalysisHandlers = map[string]func(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error){ "原油石化早报": AnalysisOilReportEdb, "PTA周报": AnalysisPTAWeekEdb, "MEG周报": AnalysisMEGWeekEdb, "长丝周报": AnalysisChangSiWeekEdb, "短纤周报": AnalysisDuanXianWeekEdb, "瓶片周报": AnalysisPingPianWeekEdb, "切片周报": AnalysisQiePianWeekEdb, "PX周报": AnalysisPXWeekEdb, } // HandleIndexData 指标数据 type HandleIndexData struct { IndexName string `description:"指标名称"` IndexCode string `description:"指标编码"` ClassifyId int `description:"分类ID"` Unit string `description:"单位"` Sort int `description:"排序"` Frequency string `description:"频度"` TerminalCode string `description:"终端编码"` DateData map[string]string `description:"日期数据"` } // TaskOilDailyEdb 获取原油石化早报指标 func TaskOilDailyEdb(context.Context) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("TaskOilEdbDaily ErrMsg: %s", err.Error()) utils.FileLog.Info(tips) fmt.Println(tips) } }() nameKey := "原油石化早报" fetchRule, e := loadDataRule(nameKey) if e != nil { err = fmt.Errorf("loadDataRule, err: %v", e) return } // 解析前N篇报告 readLimit := utils.CCFDailyFetchNum filePaths, e := savePageHtml(nameKey, fetchRule.PageDir, false, readLimit) if e != nil { err = fmt.Errorf("savePageHtml, err: %v", e) return } readCount := 0 for _, v := range filePaths { readCount += 1 if readCount > readLimit { return } htm, e := ioutil.ReadFile(v) if e != nil { fmt.Printf("file: %s, ReadFile err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, ReadFile err: %v", v, e)) continue } handler, ok := TaskAnalysisHandlers[nameKey] if !ok { utils.FileLog.Info(fmt.Sprintf("%s无解析函数\n", nameKey)) continue } indexes, e := handler(htm, fetchRule) if e != nil { fmt.Printf("file: %s, AnalysisOilReportEdb err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, AnalysisOilReportEdb err: %v", v, e)) continue } // 写入数据库 params := make(map[string]interface{}) params["List"] = indexes params["TerminalCode"] = utils.TerminalCode result, e := postEdbLib(params, utils.LIB_ROUTE_CCF_EDB_HANDLE) if e != nil { b, _ := json.Marshal(params) fmt.Printf("file: %s, postEdbLib err: %v, params: %s\n", v, e, string(b)) utils.FileLog.Info(fmt.Sprintf("file: %s, postEdbLib err: %v, params: %s", v, e, string(b))) continue } resp := new(models.BaseEdbLibResponse) if e = json.Unmarshal(result, &resp); e != nil { fmt.Printf("file: %s, json.Unmarshal err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, json.Unmarshal err: %v", v, e)) continue } if resp.Ret != 200 { fmt.Printf("file: %s, Msg: %s, ErrMsg: %s\n", v, resp.Msg, resp.ErrMsg) utils.FileLog.Info(fmt.Sprintf("file: %s, Msg: %s, ErrMsg: %s", v, resp.Msg, resp.ErrMsg)) continue } } return } // TaskWeeklyEdb 获取周报指标 func TaskWeeklyEdb(context.Context) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("TaskWeeklyEdb ErrMsg: %s", err.Error()) utils.FileLog.Info(tips) fmt.Println(tips) } }() taskNames := []string{"PTA周报", "MEG周报", "长丝周报", "短纤周报", "瓶片周报", "切片周报", "PX周报"} readLimit := utils.CCFWeeklyFetchNum for _, nameKey := range taskNames { fmt.Printf("开始获取: %s\n", nameKey) fetchRule, e := loadDataRule(nameKey) if e != nil { utils.FileLog.Info(fmt.Sprintf("%s无解析规则, err: %v\n", nameKey, e)) continue } handler, ok := TaskAnalysisHandlers[nameKey] if !ok { //fmt.Printf("%s无解析函数\n", nameKey) utils.FileLog.Info(fmt.Sprintf("%s无解析函数\n", nameKey)) continue } // 解析前N篇报告 files, e := savePageHtml(nameKey, fetchRule.PageDir, false, readLimit) if e != nil { //fmt.Printf("%s保存首页失败, err: %v\n", nameKey, e) utils.FileLog.Info(fmt.Sprintf("%s保存首页失败, err: %v\n", nameKey, e)) continue } readCount := 0 for _, v := range files { readCount += 1 if readCount > readLimit { break } htm, e := ioutil.ReadFile(v) if e != nil { //fmt.Printf("file: %s, ReadFile err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, ReadFile err: %v", v, e)) continue } indexes, e := handler(htm, fetchRule) if e != nil { //fmt.Printf("file: %s, AnalysisOilReportEdb err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, AnalysisOilReportEdb err: %v", v, e)) continue } // 写入数据库 params := make(map[string]interface{}) params["List"] = indexes params["TerminalCode"] = utils.TerminalCode result, e := postEdbLib(params, utils.LIB_ROUTE_CCF_EDB_HANDLE) if e != nil { b, _ := json.Marshal(params) //fmt.Printf("file: %s, postEdbLib err: %v, params: %s\n", v, e, string(b)) utils.FileLog.Info(fmt.Sprintf("file: %s, postEdbLib err: %v, params: %s", v, e, string(b))) continue } resp := new(models.BaseEdbLibResponse) if e = json.Unmarshal(result, &resp); e != nil { //fmt.Printf("file: %s, json.Unmarshal err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, json.Unmarshal err: %v", v, e)) continue } if resp.Ret != 200 { //fmt.Printf("file: %s, Msg: %s, ErrMsg: %s\n", v, resp.Msg, resp.ErrMsg) utils.FileLog.Info(fmt.Sprintf("file: %s, Msg: %s, ErrMsg: %s", v, resp.Msg, resp.ErrMsg)) continue } } fmt.Printf("结束获取: %s\n", nameKey) } return } // AnalysisOilReportEdb 解析原油石化早报中的指标数据 func AnalysisOilReportEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } // 找到表格 keyElement := doc.Find("#newscontent") tableBody := keyElement.ChildrenFiltered("table").First().ChildrenFiltered("tbody") if tableBody.Length() <= 0 { err = fmt.Errorf("表格未找到") return } colDates := make(map[int]string) colLen := tableBody.Children().First().Find("td").Length() attemptDates := []string{"2006/1/2", "2006/01/02", "2006-01-02", "2006-1-2", "2006.01.02", "2006.1.2"} var rows []TableRow var mergeBegin, mergeRows int var mergeProduct string tableBody.Children().Each(func(i int, s *goquery.Selection) { cells := s.Find("td") // 从表头取出日期列 // 格式: [产品|市场|日期列(列数不定)|涨跌|单位] if i == 0 { cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := strings.TrimSpace(ss.Text()) if ii > 1 && ii < colLen-2 { var d time.Time // 尝试解析日期 for _, a := range attemptDates { t, e := time.ParseInLocation(a, cellTxt, time.Local) if e == nil { d = t break } } //fmt.Println("colDate: ", d) if !d.IsZero() { colDates[ii] = d.Format(utils.FormatDate) } } }) } // 取指标 if i > 0 { row := TableRow{ DateData: make(map[string]string), } mergedRow := false // 是否为被合并行 cellsLen := cells.Length() cells.Each(func(ii int, cell *goquery.Selection) { cellData := filterInvalidVal(cell.Text()) if cellData == "" { return } switch ii { case 0: // 被合并行为市场列, 其余为产品列 hasMerge, _ := cell.Attr("rowspan") if hasMerge != "" { // 开始合并行 mergeRows, _ = strconv.Atoi(hasMerge) mergeBegin = i row.Product = cellData mergeProduct = row.Product } else { // 被合并行的后一行, 重置合并计数 if i >= (mergeBegin + mergeRows) { mergeBegin = 0 mergeRows = 0 } // 被合并行, 第一列为市场 if mergeBegin > 0 && mergeRows > 0 && i < (mergeBegin+mergeRows) { row.Product = mergeProduct row.Market = cellData mergedRow = true } if mergeBegin == 0 && mergeRows == 0 { row.Product = cellData } } case 1: // 被合并行为日期列, 其余为市场列 if mergedRow { d, ok := colDates[ii+1] if ok { row.DateData[d] = formatIntervalData(cellData, "") } } else { row.Market = cellData } case cellsLen - 2: // 忽略涨跌列 case cellsLen - 1: row.Unit = cellData default: // 日期列 if mergedRow { d, ok := colDates[ii+1] if ok { row.DateData[d] = formatIntervalData(cellData, "") } } else { d, ok := colDates[ii] if ok { row.DateData[d] = formatIntervalData(cellData, "") } } } }) rows = append(rows, row) } }) // 只取需要的指标 indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // AnalysisPTAWeekEdb 解析PTA周报中的指标数据 func AnalysisPTAWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } burdenTitle, ptaTitle := "负荷", "PTA库存" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println("年份", publishYear) // 遍历h2, 找出负荷和PTA库存下第一个table burdenTable, ptaTable := new(goquery.Selection), new(goquery.Selection) h2Selections := doc.Find("h2") h2Selections.Each(func(i int, h2 *goquery.Selection) { //fmt.Println(i, h2.Text()) if strings.Contains(h2.Text(), burdenTitle) { burdenTable = h2.NextAllFiltered("table").First() } if strings.Contains(h2.Text(), ptaTitle) { ptaTable = h2.NextAllFiltered("table").First() } }) // 负荷 //var rows []TableRow //var burdenRows []TableRow //var burdenDataTime string //burdenTable.Find("tbody").Children().Each(func(i int, s *goquery.Selection) { // // 表头取出日期 // cells := s.Find("td") // if i == 0 { // cells.Each(func(ii int, ss *goquery.Selection) { // cellTxt := strings.TrimSpace(ss.Text()) // if ii == 2 { // strTime := fmt.Sprintf("%d年%s", publishYear, cellTxt) // t, e := time.ParseInLocation("2006年01月02日", strTime, time.Local) // if e != nil { // err = fmt.Errorf("解析PTA负荷数据日期失败, err: %v", e) // return // } // burdenDataTime = t.Format(utils.FormatDate) // } // }) // } // // 取指标 // if i > 0 { // row := TableRow{ // DateData: make(map[string]string), // } // cells.Each(func(ii int, ss *goquery.Selection) { // cellTxt := strings.TrimSpace(ss.Text()) // switch ii { // case 0: // row.Product = cellTxt // case 1: // row.Market = cellTxt // case 2: // row.DateData[burdenDataTime] = cellTxt // } // }) // //row.Unit = burdenUnit // burdenRows = append(burdenRows, row) // } //}) //rows = append(rows, burdenRows...) var rows []TableRow var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = burdenTable analysisPars.MarketCol.HasCol = true analysisPars.MarketCol.ColIndex = 1 analysisPars.DateCol.StartIndex = 2 analysisPars.DateCol.EndIndex = 3 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "%d年%s" analysisPars.DateCol.TimeFormat = []string{"2006年01月02日", "2006年1月2日"} burdenRows := analysisNoneMergeTable(analysisPars) rows = append(rows, burdenRows...) // PTA库存, 存在特殊格式 ptaRows := make(map[int]TableRow) ptaTable.Find("tbody").Children().Each(func(i int, s *goquery.Selection) { cells := s.Find("td") cellLen := cells.Length() // 判断tr下td的长度, 兼容处理 // td长度为2, 数据日期取发布日期 if cellLen == 2 { if i == 0 { cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := strings.TrimSpace(ss.Text()) row := TableRow{ Product: cellTxt, DateData: make(map[string]string), } ptaRows[ii] = row }) } if i > 0 { cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := filterInvalidVal(ss.Text()) if cellTxt == "" { return } val, e := calculateDataHalfVal(cellTxt) if e != nil { utils.FileLog.Info(fmt.Sprintf("PTA周报-calculateDataHalfVal: cellTxt-%s, err: %v", cellTxt, e)) return } ptaRows[ii].DateData[publishTime.Format(utils.FormatDate)] = val }) } } // 大于2时, 内容第一列为日期 if cellLen > 2 { if i == 0 { cells.Each(func(ii int, ss *goquery.Selection) { if ii == 0 { return } cellTxt := strings.TrimSpace(ss.Text()) row := TableRow{ Product: cellTxt, DateData: make(map[string]string), } ptaRows[ii] = row }) } if i > 0 { var dataTime string cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := filterInvalidVal(ss.Text()) if cellTxt == "" { return } if ii == 0 { strTime := fmt.Sprintf("%d/%s", publishTime.Year(), cellTxt) t, e := time.ParseInLocation("2006/1/2", strTime, time.Local) if e != nil { fmt.Printf("time parse err: %v", e) return } // 判断报告是否跨年 if t.AddDate(0, -6, 0).After(publishTime) { utils.FileLog.Info(fmt.Sprintf("跨年判断-2: ColTime-%v; PublishTime-%v", t, publishTime)) t = t.AddDate(-1, 0, 0) } dataTime = t.Format(utils.FormatDate) return } val, e := calculateDataHalfVal(cellTxt) if e != nil { fmt.Printf("calculateDataHalfVal err: %v\n", e) return } if dataTime != "" && val != "" { ptaRows[ii].DateData[dataTime] = val } }) } } }) for _, v := range ptaRows { rows = append(rows, v) } indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // AnalysisMEGWeekEdb 解析MEG周报中的指标数据 func AnalysisMEGWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } burdenTitle, stockTitle := "CCF指数", "MEG华东港口库存情况" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() //fmt.Println("publishTxt: ", publishTxt) publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println(publishYear) // 遍历h2, 找出对应Table burdenTable, stockTable := new(goquery.Selection), new(goquery.Selection) //h2Selections := doc.Find("h2") doc.Find("h2").Each(func(i int, h2 *goquery.Selection) { //fmt.Println(i, h2.Text()) if strings.Contains(h2.Text(), burdenTitle) { burdenTable = h2.NextAllFiltered("table").First() } if strings.Contains(h2.Text(), stockTitle) { stockTable = h2.NextAllFiltered("table").First() } }) // 负荷-存在合并行 var rows []TableRow //var burdenRows []TableRow { //var burdenDataTime string var mergeBegin, mergeRows int var mergeProduct string burdenColDate := make(map[int]string) // 日期列key->日期 burdenTable.Find("tbody").Children().Each(func(i int, s *goquery.Selection) { // 表头取出日期 cells := s.Find("td") if i == 0 { cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := strings.TrimSpace(ss.Text()) if cellTxt == "" { return } if ii == 2 || ii == 3 { //fmt.Println("日期列") strTime := fmt.Sprintf("%d年%s", publishTime.Year(), cellTxt) //fmt.Println("日期str", strTime) t, e := time.ParseInLocation("2006年1月2日", strTime, time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("MEG周报-日期解析: cellTxt-%s, err: %v", cellTxt, e)) //fmt.Println("e: ", e) //err = fmt.Errorf("解析MEG负荷数据日期失败, err: %v", e) return } // 判断报告是否跨年 if t.AddDate(0, -6, 0).After(publishTime) { utils.FileLog.Info(fmt.Sprintf("跨年判断-MEG: ColTime-%v; PublishTime-%v", t, publishTime)) t = t.AddDate(-1, 0, 0) } if !t.IsZero() { burdenColDate[ii] = t.Format(utils.FormatDate) } //fmt.Println("日期:", t.Format(utils.FormatDate)) } }) } // 取指标 if i > 0 { row := TableRow{ DateData: make(map[string]string), } mergedRow := false // 是否为被合并行 cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := filterInvalidVal(ss.Text()) if cellTxt == "" { return } switch ii { case 0: // 被合并行为市场列, 其余为产品列 hasMerge, _ := ss.Attr("rowspan") if hasMerge != "" { // 开始合并行 mergeRows, _ = strconv.Atoi(hasMerge) mergeBegin = i row.Product = cellTxt mergeProduct = row.Product } else { // 被合并行的后一行, 重置合并计数 if i >= (mergeBegin + mergeRows) { mergeBegin = 0 mergeRows = 0 } // 被合并行第一列为产品 if mergeBegin > 0 && mergeRows > 0 && i < (mergeBegin+mergeRows) { row.Product = mergeProduct row.Market = cellTxt mergedRow = true //fmt.Println("被合并行: ", i, mergeBegin+mergeRows) } if mergeBegin == 0 && mergeRows == 0 { row.Product = cellTxt } } case 1: // 被合并行为值列, 其余为市场列 if mergedRow { d, ok := burdenColDate[ii+1] if ok { row.DateData[d] = cellTxt } } else { row.Market = cellTxt } case 2: if mergedRow { d, ok := burdenColDate[ii+1] if ok { row.DateData[d] = cellTxt } } else { d, ok := burdenColDate[ii] if ok { row.DateData[d] = cellTxt } } case 3: if !mergedRow { d, ok := burdenColDate[ii] if ok { row.DateData[d] = cellTxt } } } }) rows = append(rows, row) } }) } // 库存 //var stockRows []TableRow //{ // colDate := make(map[int]string) // 日期列key->日期 // stockTable.Find("tbody").Children().Each(func(i int, s *goquery.Selection) { // cells := s.Find("td") // // // 表头取出日期 // if i == 0 { // cells.Each(func(ii int, ss *goquery.Selection) { // cellTxt := strings.TrimSpace(ss.Text()) // if ii > 0 { // t, e := time.ParseInLocation("2006/1/2", cellTxt, time.Local) // if e != nil { // fmt.Println("e: ", e) // //err = fmt.Errorf("解析MEG负荷数据日期失败, err: %v", e) // return // } // colDate[ii] = t.Format(utils.FormatDate) // fmt.Println("日期:", t.Format(utils.FormatDate)) // } // }) // } // // // 取指标 // if i > 0 { // row := TableRow{ // Product: stockTitle, // //Unit: stockUnit, // DateData: make(map[string]string), // } // cells.Each(func(ii int, ss *goquery.Selection) { // cellTxt := strings.TrimSpace(ss.Text()) // switch ii { // case 0: // row.Market = cellTxt // case 1, 2: // row.DateData[colDate[ii]] = cellTxt // } // }) // fmt.Println(row) // stockRows = append(stockRows, row) // } // }) //} var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = stockTable analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 2 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "" analysisPars.DateCol.TimeFormat = []string{"2006/1/2"} stockRows := analysisNoneMergeTable(analysisPars) rows = append(rows, stockRows...) indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) //fmt.Println(111) return } // AnalysisChangSiWeekEdb 解析长丝周报中的指标数据 func AnalysisChangSiWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } burdenTitle, stockTitle, observeTitle := "负荷指数", "库存指数", "下游观察" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() //fmt.Println("publishTxt: ", publishTxt) publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println(publishYear) // 遍历h2, 找出对应Table burdenTable, stockTable, observeTable := new(goquery.Selection), new(goquery.Selection), new(goquery.Selection) //h2Selections := doc.Find("h2") doc.Find("h2").Each(func(i int, h2 *goquery.Selection) { //fmt.Println(i, h2.Text()) if strings.Contains(h2.Text(), burdenTitle) { burdenTable = h2.NextAllFiltered("table").First() } if strings.Contains(h2.Text(), stockTitle) { stockTable = h2.NextAllFiltered("table").First() } if strings.Contains(h2.Text(), observeTitle) { observeTable = h2.NextAllFiltered("table").First() } }) // 负荷/下游观察解析 //noneMergeAnalysis := func(docTable *goquery.Selection, unit string) (items []TableRow) { // colDate := make(map[int]string) // // docTable.Find("tbody").Children().Each(func(i int, s *goquery.Selection) { // cells := s.Find("td") // // // 表头取出日期 // if i == 0 { // cells.Each(func(ii int, ss *goquery.Selection) { // cellTxt := strings.TrimSpace(ss.Text()) // fmt.Println("cellTxt", cellTxt) // if ii >= 1 && ii <= 3 { // strTime := fmt.Sprintf("%d.%s", publishYear, cellTxt) // t, e := time.ParseInLocation("2006.01.02", strTime, time.Local) // if e != nil { // fmt.Println("e: ", e) // //err = fmt.Errorf("解析MEG负荷数据日期失败, err: %v", e) // return // } // colDate[ii] = t.Format(utils.FormatDate) // fmt.Println("日期:", t.Format(utils.FormatDate)) // } // }) // } // // // 取指标 // if i > 0 { // row := TableRow{ // //Product: stockTitle, // Unit: unit, // DateData: make(map[string]string), // } // cells.Each(func(ii int, ss *goquery.Selection) { // cellTxt := strings.TrimSpace(ss.Text()) // fmt.Println("cellTxt", cellTxt) // switch ii { // case 0: // row.Product = cellTxt // case 1, 2, 3: // row.DateData[colDate[ii]] = cellTxt // } // }) // //fmt.Println(row) // items = append(items, row) // } // }) // return //} // 库存解析-存在合并行 mergeAnalysis := func(docTable *goquery.Selection) (items []TableRow) { var mergeBegin, mergeRows int var mergeProduct string colDate := make(map[int]string) // 日期列key->日期 attemptDates := []string{"2006.01.02", "2006.1.02", "2006.01.2"} docTable.Find("tbody").Children().Each(func(i int, s *goquery.Selection) { // 表头取出日期 cells := s.Find("td") if i == 0 { cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := strings.TrimSpace(ss.Text()) fmt.Println("1-cellTxt", cellTxt) if ii >= 2 && ii <= 4 { //fmt.Println("日期列") strTime := fmt.Sprintf("%d.%s", publishTime.Year(), cellTxt) //fmt.Println("日期str", strTime) //t, e := time.ParseInLocation("2006.01.02", strTime, time.Local) //if e != nil { // utils.FileLog.Info(fmt.Sprintf("长丝周报-日期解析: cellTxt-%s, err: %v", cellTxt, e)) // //fmt.Println("time e: ", e) // //err = fmt.Errorf("解析MEG负荷数据日期失败, err: %v", e) // return //} var colTime time.Time for _, f := range attemptDates { t, e := time.ParseInLocation(f, strTime, time.Local) if e != nil { continue } colTime = t break } // 判断报告是否跨年 if colTime.AddDate(0, -6, 0).After(publishTime) { utils.FileLog.Info(fmt.Sprintf("跨年判断-长丝: ColTime-%v; PublishTime-%v", colTime, publishTime)) colTime = colTime.AddDate(-1, 0, 0) } if !colTime.IsZero() { colDate[ii] = colTime.Format(utils.FormatDate) } //fmt.Println("日期:", t.Format(utils.FormatDate)) } }) } // 取指标 if i > 0 { row := TableRow{ DateData: make(map[string]string), } mergedRow := false // 是否为被合并行 cells.Each(func(ii int, ss *goquery.Selection) { cellTxt := filterInvalidVal(ss.Text()) fmt.Println("2-cellTxt", cellTxt) switch ii { case 0: // 被合并行为市场列, 其余为产品列 hasMerge, _ := ss.Attr("rowspan") if hasMerge != "" { // 开始合并行 mergeRows, _ = strconv.Atoi(hasMerge) mergeBegin = i row.Product = cellTxt mergeProduct = row.Product } else { // 被合并行的后一行, 重置合并计数 if i >= (mergeBegin + mergeRows) { mergeBegin = 0 mergeRows = 0 } // 被合并行第一列为产品 if mergeBegin > 0 && mergeRows > 0 && i < (mergeBegin+mergeRows) { row.Product = mergeProduct row.Market = cellTxt mergedRow = true //fmt.Println("被合并行: ", i, mergeBegin+mergeRows) } if mergeBegin == 0 && mergeRows == 0 { row.Product = cellTxt } } case 1: // 被合并行为值列, 其余为市场列 if mergedRow { d, ok := colDate[ii+1] if ok { row.DateData[d] = cellTxt } } else { row.Market = cellTxt } case 2, 3: if mergedRow { d, ok := colDate[ii+1] if ok { row.DateData[d] = cellTxt } } else { d, ok := colDate[ii] if ok { row.DateData[d] = cellTxt } } case 4: if !mergedRow { d, ok := colDate[ii] if ok { row.DateData[d] = cellTxt } } } }) items = append(items, row) } }) return } // 负荷 var rows []TableRow fmt.Println("blen", burdenTable.Length()) if burdenTable.Length() > 0 { //items := noneMergeAnalysis(burdenTable, burdenUnit) //if len(items) > 0 { // rows = append(rows, items...) //} //strTime := fmt.Sprintf("%d.%s", publishYear, cellTxt) //t, e := time.ParseInLocation("2006.01.02", strTime, time.Local) var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = burdenTable analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 3 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "%d.%s" analysisPars.DateCol.TimeFormat = []string{"2006.01.02"} burdenRows := analysisNoneMergeTable(analysisPars) rows = append(rows, burdenRows...) } // 下游观察 fmt.Println("olen", observeTable.Length()) if observeTable.Length() > 0 { //items := noneMergeAnalysis(observeTable, observeUnit) //if len(items) > 0 { // rows = append(rows, items...) //} var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = observeTable analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 3 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "%d.%s" analysisPars.DateCol.TimeFormat = []string{"2006.01.02"} observeRows := analysisNoneMergeTable(analysisPars) rows = append(rows, observeRows...) } // 下游观察 fmt.Println("slen", stockTable.Length()) if stockTable.Length() > 0 { //fmt.Println(stockUnit) items := mergeAnalysis(stockTable) if len(items) > 0 { rows = append(rows, items...) } } fmt.Println(rows) indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // AnalysisDuanXianWeekEdb 解析短纤周报中的指标数据 func AnalysisDuanXianWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } mainTitle := "主要运行指数" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() //fmt.Println("publishTxt: ", publishTxt) publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println(publishYear) // 查找包含文本的
元素 mainElement := doc.Find(fmt.Sprintf("p:contains('%s')", mainTitle)) if mainElement.Length() <= 0 { err = fmt.Errorf("未找到p标签, keyword: %s", mainTitle) return } table := mainElement.NextAllFiltered("table").First() if table.Length() <= 0 { err = fmt.Errorf("未找到p标签后的table, keyword: %s", mainTitle) return } var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = table analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 2 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "%d年%s" analysisPars.DateCol.TimeFormat = []string{"2006年1月2日"} rows := analysisNoneMergeTable(analysisPars) fmt.Println(rows) indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // AnalysisPingPianWeekEdb 解析瓶片周报中的指标数据 func AnalysisPingPianWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } mainTitle := "周均负荷指数" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() //fmt.Println("publishTxt: ", publishTxt) publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println(publishYear) // 查找包含文本的
元素 mainElement := doc.Find(fmt.Sprintf("h2:contains('%s')", mainTitle)) if mainElement.Length() <= 0 { err = fmt.Errorf("未找到p标签, keyword: %s", mainTitle) return } table := mainElement.NextAllFiltered("table").First() if table.Length() <= 0 { err = fmt.Errorf("未找到p标签后的table, keyword: %s", mainTitle) return } var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = table analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 3 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "%d.%s" analysisPars.DateCol.TimeFormat = []string{"2006.1.2"} analysisPars.DateCol.SplitLast = true analysisPars.DateCol.SplitFlag = "-" rows := analysisNoneMergeTable(analysisPars) fmt.Println(rows) indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // AnalysisQiePianWeekEdb 解析切片周报中的指标数据 func AnalysisQiePianWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } mainTitle := "切片纺方面" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() //fmt.Println("publishTxt: ", publishTxt) publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println(publishYear) // 查找包含关键词的标签 mainElement := doc.Find(fmt.Sprintf("h2:contains('%s')", mainTitle)) if mainElement.Length() <= 0 { err = fmt.Errorf("未找到关键词标签, keyword: %s", mainTitle) return } table := mainElement.NextAllFiltered("table").First() if table.Length() <= 0 { err = fmt.Errorf("未找到p标签后的table, keyword: %s", mainTitle) return } var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = table analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 3 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "" analysisPars.DateCol.TimeFormat = []string{"2006-1-2", "2006/1/2"} analysisPars.ValCol.SplitHalfVal = true rows := analysisNoneMergeTable(analysisPars) fmt.Println(rows) indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // AnalysisPXWeekEdb 解析PX周报中的指标数据 func AnalysisPXWeekEdb(htm []byte, fetchRule *DataRule) (indexes []*HandleIndexData, err error) { if len(htm) == 0 || fetchRule == nil { utils.FileLog.Info("htm empty") return } doc, e := goquery.NewDocumentFromReader(strings.NewReader(string(htm))) if e != nil { err = fmt.Errorf("NewDocumentFromReader err: %v", e) return } mainTitle := "负荷指数" // 从收藏按钮往上找table, 取出报告发布日期 collectEle := doc.Find("#savenews") publishTimeTab := collectEle.ParentsFiltered("table").First() publishTxt := publishTimeTab.Find("td:first-child").Text() //fmt.Println("publishTxt: ", publishTxt) publishTime, e := extractReportPublishTime(publishTxt) if e != nil { err = fmt.Errorf("extractReportPublishTime err: %v", e) return } //publishYear := publishTime.Year() //fmt.Println(publishTime) //fmt.Println(publishYear) // 查找包含关键词的标签 mainElement := doc.Find(fmt.Sprintf("h2:contains('%s')", mainTitle)) if mainElement.Length() <= 0 { err = fmt.Errorf("未找到关键词标签, keyword: %s", mainTitle) return } table := mainElement.NextAllFiltered("table").First() if table.Length() <= 0 { err = fmt.Errorf("未找到p标签后的table, keyword: %s", mainTitle) return } var analysisPars AnalysisNoneMergeTablePars analysisPars.DocTable = table analysisPars.DateCol.StartIndex = 1 analysisPars.DateCol.EndIndex = 3 analysisPars.DateCol.PublishTime = publishTime //analysisPars.DateCol.PublishYear = publishYear analysisPars.DateCol.StrTimeFormat = "%d年%s" analysisPars.DateCol.TimeFormat = []string{"2006年1月2日"} rows := analysisNoneMergeTable(analysisPars) fmt.Println(rows) indexes = formatTableRow2ValidEdb(rows, fetchRule.EdbMatch) return } // FetchHistoryFiles 获取历史文件 func FetchHistoryFiles(context.Context) { var err error defer func() { if err != nil { tips := fmt.Sprintf("FetchEdbHistoryFiles ErrMsg: %s", err.Error()) utils.FileLog.Info(tips) fmt.Println(tips) } }() taskNames := []string{"原油石化早报", "PTA周报", "MEG周报", "长丝周报", "短纤周报", "瓶片周报", "切片周报", "PX周报", "PTA装置", "MEG装置", "PX装置"} //taskNames := []string{"原油石化早报"} for _, nameKey := range taskNames { fmt.Println("开始获取: ", nameKey) fetchRule, e := loadDataRule(nameKey) if e != nil { err = fmt.Errorf("loadDataRule, err: %v", e) return } _, e = savePageHtml(nameKey, fetchRule.PageDir, true, 0) if e != nil { err = fmt.Errorf("savePageHtml, err: %v", e) return } fmt.Println("结束获取: ", nameKey) } return } // ReadEdbHistoryFiles 读取历史文件 func ReadEdbHistoryFiles(context.Context) { var err error defer func() { if err != nil { tips := fmt.Sprintf("ReadEdbHistoryFiles ErrMsg: %s", err.Error()) utils.FileLog.Info(tips) fmt.Println(tips) } }() taskNames := []string{"原油石化早报", "PTA周报", "MEG周报", "长丝周报", "短纤周报", "瓶片周报", "切片周报", "PX周报"} //taskNames := []string{"原油石化早报", "PTA周报", "MEG周报", "长丝周报", "短纤周报", "瓶片周报", "切片周报", "PX周报"} for _, nameKey := range taskNames { fetchRule, e := loadDataRule(nameKey) if e != nil { utils.FileLog.Info(fmt.Sprintf("%s无解析规则, err: %v\n", nameKey, e)) continue } filePaths, e := listFiles(fetchRule.PageDir) if e != nil { utils.FileLog.Info(fmt.Sprintf("%s读取文件目录失败, err: %v\n", nameKey, e)) continue } for _, v := range filePaths { //if k > 0 { // break //} v = fmt.Sprintf("%s/%s", fetchRule.PageDir, v) fmt.Printf("开始解析: %s", v) //htm, e := ioutil.ReadFile("static/ccf/oil_daily/28-20240604-原油石化早报(6.7).html") htm, e := ioutil.ReadFile(v) if e != nil { fmt.Printf("file: %s, ReadFile err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, ReadFile err: %v", v, e)) continue } handler, ok := TaskAnalysisHandlers[nameKey] if !ok { utils.FileLog.Info(fmt.Sprintf("%s无解析函数\n", nameKey)) continue } indexes, e := handler(htm, fetchRule) if e != nil { fmt.Printf("file: %s, handler err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, handler err: %v", v, e)) continue } // 写入数据库 params := make(map[string]interface{}) params["List"] = indexes params["TerminalCode"] = utils.TerminalCode result, e := postEdbLib(params, utils.LIB_ROUTE_CCF_EDB_HANDLE) if e != nil { b, _ := json.Marshal(params) fmt.Printf("file: %s, postEdbLib err: %v, params: %s\n", v, e, string(b)) utils.FileLog.Info(fmt.Sprintf("file: %s, postEdbLib err: %v, params: %s", v, e, string(b))) continue } resp := new(models.BaseEdbLibResponse) if e = json.Unmarshal(result, &resp); e != nil { fmt.Printf("file: %s, json.Unmarshal err: %v\n", v, e) utils.FileLog.Info(fmt.Sprintf("file: %s, json.Unmarshal err: %v", v, e)) continue } if resp.Ret != 200 { fmt.Printf("file: %s, Msg: %s, ErrMsg: %s\n", v, resp.Msg, resp.ErrMsg) utils.FileLog.Info(fmt.Sprintf("file: %s, Msg: %s, ErrMsg: %s", v, resp.Msg, resp.ErrMsg)) continue } } } return }