|
- package services
- import (
- "bytes"
- "context"
- "encoding/json"
- "eta/eta_crawler/services/alarm_msg"
- "eta/eta_crawler/utils"
- "fmt"
- "github.com/PuerkitoBio/goquery"
- "github.com/xuri/excelize/v2"
- "io"
- "mime/multipart"
- "net/http"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "time"
- )
- type UsdaPsdDataQueryParams struct {
- QueryID int `json:"queryId"`
- CommodityGroupCode string `json:"commodityGroupCode"`
- Commodities []string `json:"commodities"`
- Attributes []int `json:"attributes"`
- Countries []string `json:"countries"`
- MarketYears []int `json:"marketYears"`
- ChkCommoditySummary bool `json:"chkCommoditySummary"`
- ChkAttribSummary bool `json:"chkAttribSummary"`
- ChkCountrySummary bool `json:"chkCountrySummary"`
- CommoditySummaryText string `json:"commoditySummaryText"`
- AttribSummaryText string `json:"attribSummaryText"`
- CountrySummaryText string `json:"countrySummaryText"`
- OptionColumn string `json:"optionColumn"`
- ChkTopCountry bool `json:"chkTopCountry"`
- TopCountryCount string `json:"topCountryCount"`
- ChkFileFormat bool `json:"chkfileFormat"`
- ChkPrevMonth bool `json:"chkPrevMonth"`
- ChkMonthChange bool `json:"chkMonthChange"`
- ChkCodes bool `json:"chkCodes"`
- ChkYearChange bool `json:"chkYearChange"`
- QueryName string `json:"queryName"`
- SortOrder string `json:"sortOrder"`
- TopCountryState bool `json:"topCountryState"`
- }
- type UsdaPsdData struct {
- TableHeaders []string `json:"tableHeaders"`
- QueryResult []map[string]interface{} `json:"queryResult"`
- }
- type UsdaPsdDataAttribute struct {
- AttributeId int `json:"attributeId"`
- }
- type UsdaFasIndex struct {
- ClassifyName string `description:"指标目录"`
- ParentClassifyName string `description:"父级指标目录"`
- ClassifySort int `description:"指标目录排序号"`
- IndexName string `description:"指标名称"`
- IndexCode string `description:"指标编码"`
- Unit string `description:"单位"`
- Sort int `description:"排序号"`
- Frequency string `description:"频度"`
- TerminalCode string `description:"编码"`
- Country string `description:"国家"`
- Commodity string `description:"属性"`
- ExcelDataMap map[string]string
- }
- func DownloadUsdaPsdDataTask(cont context.Context) (err error) {
-
-
-
- years := []int{time.Now().Year() + 1, time.Now().Year()}
- var commodities []string
- commodities = append(commodities, "0813800", "0813200", "0813600", "0813100", "0813500", "4242000", "4233000", "4235000", "4243000", "4244000", "4234000", "4239100", "4232000", "4236000", "2223000", "2232000", "2221000", "2226000", "2222000", "2224000")
- for _, commodity := range commodities {
- err = DownloadUsdaPsdData(commodity, years)
- if err != nil {
- utils.FileLog.Info("DownloadUsdaPsdData " + commodity + "ErrMsg:" + err.Error())
- }
-
- utils.FileLog.Info("DownloadUsdaPsdData " + commodity + " 爬取成功")
- }
- return
- }
- func DownloadUsdaFmsDataTask(cont context.Context) (err error) {
-
- startDate := time.Now().AddDate(0, -1, 0).Format("01/02/2006")
- endDate := time.Now().Format("01/02/2006")
- err = DownloadUsdaFmsData(startDate, endDate)
- return
- }
- func DownloadUsdaPsdData(commodityCode string, years []int) (err error) {
- defer func() {
- if err != nil {
- msg := "失败提醒" + "downloadUsdaPsdData ErrMsg:" + err.Error()
- fmt.Println("msg:", msg)
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
-
- attributeUrl := "https://apps.fas.usda.gov/PSDOnlineApi/api/query/GetMultiCommodityAttributes?"
- dataUrl := "https://apps.fas.usda.gov/PSDOnlineApi/api/query/RunQuery"
- var commodities []string
-
- commodities = append(commodities, commodityCode)
- commodityCodes := strings.Join(commodities, ",")
- attributeUrl = attributeUrl + "commodityCodes=" + commodityCodes
- fmt.Println("attributeUrl", attributeUrl)
-
-
- attributeBody, e := utils.HttpGetNoCookie(attributeUrl)
- if e != nil {
- err = e
- return
- }
- attrList := make([]UsdaPsdDataAttribute, 0)
- err = json.Unmarshal(attributeBody, &attrList)
- if err != nil {
- fmt.Println("json.Unmarshal err:" + err.Error())
- return
- }
-
- var attributes []int
- for _, v := range attrList {
-
- attributes = append(attributes, v.AttributeId)
- }
-
-
- var countries []string
- countries = append(countries, "R00", "ALL")
- var marketYears []int
-
- marketYears = append(marketYears, years...)
-
- var req UsdaPsdDataQueryParams
- req.Commodities = commodities
- req.Attributes = attributes
- req.Countries = countries
- req.MarketYears = marketYears
- req.OptionColumn = "year"
- req.ChkPrevMonth = true
- req.ChkYearChange = true
- req.SortOrder = "Commodity/Country/Attribute"
-
- reqBody, _ := json.Marshal(req)
-
- fmt.Println("reqBody", string(reqBody))
- headerParams := make(map[string]string)
-
-
- headerParams["Content-Type"] = "application/json"
- body, e := utils.HttpPostNoCookie(dataUrl, string(reqBody), headerParams)
- if e != nil {
- err = e
- return
- }
- item := new(UsdaPsdData)
- err = json.Unmarshal(body, &item)
- if err != nil {
- fmt.Println("json.Unmarshal err:" + err.Error())
- return
- }
-
- done := make(chan error)
- go func() {
- done <- handleUsdaFasPsd(item)
- }()
-
- select {
- case err = <-done:
- if err != nil {
- err = fmt.Errorf("handleUsdaFasPsd, Err:%w", err)
- return
- }
-
- }
- utils.FileLog.Info("月度供需 " + commodityCode + "结束")
- return
- }
- func DownloadUsdaFmsData(startDate, endDate string) (err error) {
-
- defer func() {
- if err != nil {
- msg := "失败提醒" + "DownloadUsdaFmsData ErrMsg:" + err.Error()
- fmt.Println("msg:", msg)
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
- downloadFile := fmt.Sprintf("./static/usda_fms_excel_%s.xls", time.Now().Format(utils.FormatDateTimeUnSpace))
-
- dataUrl := "https://apps.fas.usda.gov/esrquery/esrq.aspx"
- body1, err := utils.HttpGetNoCookie(dataUrl)
- if err != nil {
- return
- }
- htmlString := string(body1)
-
-
- doc, err := goquery.NewDocumentFromReader(strings.NewReader(htmlString))
- if err != nil {
- return
- }
- stateValue := doc.Find("input#__VIEWSTATE").AttrOr("value", "")
- stateEneratorValue := doc.Find("input#__VIEWSTATEGENERATOR").AttrOr("value", "")
-
- validValue := doc.Find("input#__EVENTVALIDATION").AttrOr("value", "")
- var body bytes.Buffer
- multipartWriter := multipart.NewWriter(&body)
-
- if err = multipartWriter.WriteField("__EVENTTARGET", ""); err != nil {
- err = fmt.Errorf("set __EVENTTARGET, Err:%s", err)
- return
- }
- if err = multipartWriter.WriteField("__EVENTARGUMENT", ""); err != nil {
- err = fmt.Errorf("set __EVENTARGUMENT, Err:%s", err)
- return
- }
- if err = multipartWriter.WriteField("__LASTFOCUS", ""); err != nil {
- err = fmt.Errorf("set __LASTFOCUS, Err:%s", err)
- return
- }
- if err = multipartWriter.WriteField("__VIEWSTATE", stateValue); err != nil {
- err = fmt.Errorf("set __VIEWSTATE, Err:%s", err)
- return
- }
- if err = multipartWriter.WriteField("__VIEWSTATEGENERATOR", stateEneratorValue); err != nil {
- err = fmt.Errorf("set __VIEWSTATEGENERATOR, Err:%s", err)
- return
- }
- if err = multipartWriter.WriteField("__EVENTVALIDATION", validValue); err != nil {
- err = fmt.Errorf("set __EVENTVALIDATION, Err:%s", err)
- return
- }
-
-
- CommodityIds := []string{"801", "901", "902"}
- for _, v := range CommodityIds {
- if err = multipartWriter.WriteField("ctl00$MainContent$lbCommodity", v); err != nil {
- err = fmt.Errorf("set ctl00$MainContent$lbCommodity, Err:%s", err)
- return
- }
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$lbCountry", "0:0"); err != nil {
- err = fmt.Errorf("set ctl00$MainContent$lbCountry, Err:%s", err)
- return
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$ddlReportFormat", "10"); err != nil {
- return
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$rblOutputType", "2"); err != nil {
- return
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$tbStartDate", startDate); err != nil {
- return
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$tbEndDate", endDate); err != nil {
- return
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$rblColumnSelection", "regular"); err != nil {
- return
- }
- if err = multipartWriter.WriteField("ctl00$MainContent$btnSubmit", "Submit"); err != nil {
- return
- }
-
-
- if err = multipartWriter.Close(); err != nil {
- err = fmt.Errorf("close multipart writer, Err:%s", err)
- return
- }
-
- req, err := http.NewRequest("POST", dataUrl, &body)
- if err != nil {
- err = fmt.Errorf("create request, Err:%s", err)
- return
- }
-
- req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
-
- client := &http.Client{}
- resp, err := client.Do(req)
- if err != nil {
- err = fmt.Errorf("send request, Err:%s", err)
- return
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- err = fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- return
- }
-
- out, err := os.Create(downloadFile)
- if err != nil {
- return
- }
-
- _, err = io.Copy(out, resp.Body)
- if err != nil {
- return
- }
-
- err = out.Close()
- if err != nil {
- err = fmt.Errorf("Failed to close temporary file: %v", err)
- return
- }
-
- downloadFileXlsx := downloadFile + "x"
- err = ConvertXlsToXlsx(downloadFile, downloadFileXlsx)
- if err != nil {
- err = fmt.Errorf("文件格式转换失败 convert excel, Err:%w", err)
- return
- }
-
- done := make(chan error)
- go func() {
- done <- ParseUsdaFmsExcel(downloadFileXlsx)
- }()
-
- select {
- case err = <-done:
- if err != nil {
- err = fmt.Errorf("parse excel, Err:%w", err)
- return
- }
-
- }
-
- defer func() {
- os.Remove(downloadFile)
- }()
- fmt.Println("Excel file downloaded successfully")
- return
- }
- func ParseUsdaFmsExcel(path string) (err error) {
- defer func() {
- if err != nil {
- msg := "失败提醒" + "DownloadUsdaFmsData_ParseUsdaFmsExcel ErrMsg:" + err.Error()
- fmt.Println("msg:", msg)
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
-
- exist, err := PathExists(path)
- if err != nil {
- fmt.Println(err)
- err = fmt.Errorf("文件地址不存在 err:%s", err.Error())
- return
- }
- if !exist {
- err = fmt.Errorf("文件地址不存在")
- return
- }
-
- xlFile, err := excelize.OpenFile(path)
- if err != nil {
- fmt.Println("OpenFile err:", err)
- err = fmt.Errorf("打开文件失败 err:%s", err.Error())
- return
- }
- defer func() {
-
- if err = xlFile.Close(); err != nil {
- fmt.Println(err)
- }
- os.Remove(path)
- }()
- sheetName := xlFile.GetSheetName(0)
- fmt.Println("Sheet Name:", sheetName)
-
-
-
- indexMap := make(map[string]*UsdaFasIndex)
- indexList := make([]*UsdaFasIndex, 0)
- sort := 0
- rows, err := xlFile.GetRows(sheetName)
-
-
- for i, row := range rows {
- if i > 6 {
- commodity := ""
- dateStr := ""
- country := ""
- dataVal := ""
- unit := "Metric Tons"
- for k, text := range row {
-
- kind := ""
- indexName := ""
- if k == 1 {
- commodity = text
- } else if k == 2 {
- dateStr = text
- } else if k == 4 {
- country = text
- } else if k == 5 {
- kind = "Weekly Exports"
- } else if k == 6 {
- kind = "Accum Exports"
- } else if k == 7 {
- kind = "Outstanding Sale:CMY"
- } else if k == 8 {
- kind = "Gross Sale:CMY"
- } else if k == 9 {
- kind = "Net Sale :CMY"
- } else if k == 10 {
- kind = "Total Commitment:CMY"
- } else if k == 11 {
- kind = "Outstanding Sale:NMY"
- } else if k == 12 {
- kind = "Net Sale :NMY"
- }
- if k > 4 && k < 13 {
-
-
-
- timeT, e := time.ParseInLocation(utils.FormatDateTime, dateStr, time.Local)
- if e != nil {
- utils.FileLog.Info("日期格式转换失败 err:%s", e.Error())
- continue
- }
- date := timeT.Format(utils.FormatDate)
- dataVal = text
- firstCommodity := utils.GetFirstLetter(commodity)
- firstKind := utils.GetFirstLetter(kind)
- indexName = fmt.Sprintf("%s: %s: %s", commodity, country, kind)
- inCode := fmt.Sprintf("usda%s%s%s", firstCommodity, strings.ToLower(strings.ReplaceAll(country, " ", "")), firstKind)
- indexItem, okIndex := indexMap[indexName]
-
- classifyName := commodity
- if !okIndex {
-
- indexItem = new(UsdaFasIndex)
- indexItem.IndexName = indexName
- indexItem.ClassifyName = classifyName
- indexItem.Country = country
- indexItem.Commodity = kind
- indexItem.ParentClassifyName = "出口销售"
- indexItem.ClassifySort = 0
- indexItem.IndexCode = inCode
- indexItem.Frequency = "周度"
- indexItem.Sort = sort
- indexItem.Unit = unit
- indexItem.ExcelDataMap = make(map[string]string)
- sort++
- }
- if strings.Contains(dataVal, ",") {
- dataVal = strings.ReplaceAll(dataVal, ",", "")
- }
- val, e := strconv.ParseFloat(dataVal, 64)
- if e != nil {
- utils.FileLog.Info("数据转换失败 err:%s", e.Error())
- continue
- }
- indexItem.ExcelDataMap[date] = fmt.Sprintf("%.4f", val)
- indexMap[indexName] = indexItem
- }
- }
- }
- }
-
- for _, v := range indexMap {
-
-
- indexList = append(indexList, v)
- if len(indexList) > 100 {
- err = addUsdaFasPsdData(indexList, "出口销售")
- if err != nil {
- return
- }
- indexList = []*UsdaFasIndex{}
- }
- }
- err = addUsdaFasPsdData(indexList, "出口销售")
- if err != nil {
- return
- }
- fmt.Println("出口销售 执行成功")
- return
- }
- func handleUsdaFasPsd(item *UsdaPsdData) (err error) {
-
- errMsg := ""
- defer func() {
- if err != nil {
- errMsg += err.Error()
- }
- if errMsg != "" {
- msg := "失败提醒" + "downloadUsdaPsdData_handleUsdaFasPsd ErrMsg:" + errMsg
- fmt.Println("msg:", msg)
- utils.FileLog.Info(msg)
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
-
- headerSlice := make([]string, 0)
- for index, v := range item.TableHeaders {
-
- fmt.Println("key:", index, "value:", v)
- if !strings.Contains(v, "/") && !strings.Contains(v, " ") {
- v = strings.ToLower(v)
- }
- if v == "Unit Description" {
- v = "unit Description"
- }
- headerSlice = append(headerSlice, v)
- }
- sort := 0
-
- indexMap := make(map[string]*UsdaFasIndex)
-
- commodityRow := ""
- countriesRow := ""
- attributesRow := ""
- for _, row := range item.QueryResult {
- unitK := headerSlice[len(headerSlice)-1]
- unit := row[unitK].(string)
-
- unit = strings.Replace(unit, " ", "", -1)
- unit = strings.Trim(unit, "()")
- for _, k := range headerSlice {
- col, ok := row[k]
- if !ok || col == nil {
-
- continue
- }
- if k == "commodity" {
- commodityRow = col.(string)
- } else if k == "country" {
- countriesRow = col.(string)
- } else if k == "attribute" {
- attributesRow = col.(string)
- } else if k == "unit Description" {
-
- } else {
-
- year, _ := strconv.Atoi(strings.Split(k, "/")[0])
- indexName := ""
- classifyName := ""
- classifySort := 0
- inCode := ""
- fre := "年度"
- lastStr := "Yearly"
-
- dateT := time.Date(year, time.December, 31, 0, 0, 0, 0, time.Local)
- if strings.Contains(k, "(") {
- fre = "月度"
- lastStr = "Monthly"
-
- monthStr := strings.Split(k, "(")[1]
- monthStr = strings.Split(monthStr, ")")[0]
-
- monthT, e := time.ParseInLocation("Jan", monthStr, time.Local)
- if e != nil {
- errMsg += fmt.Sprintf("月份转换错误:%s%s\n", monthStr, e.Error())
- continue
- }
-
-
- dateT = time.Date(year, monthT.Month(), 1, 0, 0, 0, 0, time.Now().Location()).AddDate(0, 1, -1)
- }
- date := dateT.Format("2006-01-02")
-
- if commodityRow != "" && countriesRow != "" && attributesRow != "" {
- indexName = commodityRow + ": " + countriesRow + ": " + attributesRow + ": " + lastStr
- } else {
- fmt.Println("commodityRow:", commodityRow, "countriesRow:", countriesRow, "attributesRow:", attributesRow)
- errMsg += fmt.Sprintf("指标名称为空 commodityRow:%s,countriesRow:%s,attributesRow:%s\n", commodityRow, countriesRow, attributesRow)
- continue
- }
- firstCommodityRow := strings.Replace(commodityRow, ", ", "", -1)
- firstCommodityRow = strings.Replace(firstCommodityRow, " ", "", -1)
- firstCommodityRow = strings.ToLower(firstCommodityRow)
- firstAttributesRow := utils.GetFirstLetter(attributesRow)
- firstLastStr := utils.GetFirstLetter(lastStr)
- inCode = fmt.Sprintf("usda%s%s%s%s", firstCommodityRow, strings.ToLower(strings.ReplaceAll(countriesRow, " ", "")), firstAttributesRow, firstLastStr)
- indexItem, okIndex := indexMap[indexName]
-
- classifyName = commodityRow
- if !okIndex {
-
- indexItem = new(UsdaFasIndex)
- indexItem.IndexName = indexName
- indexItem.ClassifyName = classifyName
- indexItem.Country = countriesRow
- indexItem.Commodity = attributesRow
- indexItem.ParentClassifyName = "月度供需"
- indexItem.ClassifySort = classifySort
- indexItem.IndexCode = inCode
- indexItem.Frequency = fre
- indexItem.Sort = sort
- indexItem.Unit = unit
- indexItem.ExcelDataMap = make(map[string]string)
- sort++
- }
- val := col.(float64)
- val = utils.FloatFormatRound(val, 2)
- indexItem.ExcelDataMap[date] = fmt.Sprintf("%.4f", val)
- indexMap[indexName] = indexItem
- continue
- }
- }
- }
- indexList := make([]*UsdaFasIndex, 0)
- for _, v := range indexMap {
-
-
- indexList = append(indexList, v)
- if len(indexList) > 100 {
- err = addUsdaFasPsdData(indexList, "月度供需")
- if err != nil {
- return
- }
- indexList = []*UsdaFasIndex{}
- }
- }
- err = addUsdaFasPsdData(indexList, "月度供需")
- if err != nil {
- return
- }
- fmt.Println("月度供需 " + commodityRow + "执行成功")
- return
- }
- func addUsdaFasPsdData(indexList []*UsdaFasIndex, sheetName string) (err error) {
- if len(indexList) > 0 {
- params := make(map[string]interface{})
- params["List"] = indexList
- params["TerminalCode"] = ""
- result, e := utils.PostEdbLib(params, "usda_fas/handle/excel_data")
- if e != nil {
- err = fmt.Errorf("sheet :%s PostEdbLib err: %s", sheetName, e.Error())
- b, _ := json.Marshal(params)
- utils.FileLog.Info(fmt.Sprintf("sheet :%s PostEdbLib err: %s, params: %s", sheetName, e.Error(), string(b)))
- return
- }
- resp := new(utils.BaseEdbLibResponse)
- if e := json.Unmarshal(result, &resp); e != nil {
- err = fmt.Errorf("sheet :%s json.Unmarshal err: %s", sheetName, e)
- utils.FileLog.Info(fmt.Sprintf("sheet :%s json.Unmarshal err: %s", sheetName, e))
- return
- }
- if resp.Ret != 200 {
- err = fmt.Errorf("sheet :%s Msg: %s, ErrMsg: %s", sheetName, resp.Msg, resp.ErrMsg)
- utils.FileLog.Info(fmt.Sprintf("sheet :%s Msg: %s, ErrMsg: %s", sheetName, resp.Msg, resp.ErrMsg))
- return
- }
- }
- return
- }
- func ConvertXlsToXlsx(inputFile, outputFile string) (err error) {
- pythonScript := "./static/convert_xls_to_xlsx.py"
- cmd := exec.Command(utils.PYTHON_PATH, pythonScript, inputFile, outputFile)
-
- var out bytes.Buffer
- cmd.Stdout = &out
- cmd.Stderr = os.Stderr
-
- err = cmd.Run()
- if err != nil {
- err = fmt.Errorf("Error running command: %v\n", err)
- fmt.Printf("Error running command: %v\n", err)
- return
- }
-
- output := out.String()
- if strings.TrimSpace(output) == "SUCCESS" {
- fmt.Println("Conversion completed successfully.")
- } else {
- err = fmt.Errorf("Conversion failed: %s", output)
- fmt.Println("Conversion failed.")
-
- fmt.Println("Output from Python script:", output)
- }
- return
- }
|