Bladeren bron

python把xls转换成xlsx文件

xyxie 6 maanden geleden
bovenliggende
commit
199dda8c1a
4 gewijzigde bestanden met toevoegingen van 228 en 101 verwijderingen
  1. 4 1
      services/task.go
  2. 199 100
      services/usda_psd.go
  3. 20 0
      static/convert_xls_to_xlsx.py
  4. 5 0
      utils/config.go

+ 4 - 1
services/task.go

@@ -23,6 +23,8 @@ func Task() {
 	//FileCoalCoastal()
 	//FileCoalInland()
 	if utils.BusinessCode == utils.BusinessCodeRelease {
+		refreshUsdaPsd := task.NewTask("RefreshUsdaPsd", "0 0 3,21 1 * *", DownloadUsdaPsdDataTask)
+		refreshUsdaFms := task.NewTask("RefreshUsdaFms", "0 0,30 16-23 * * *", DownloadUsdaFmsDataTask)
 		refreshData := task.NewTask("refreshData", "0 0,30 16-18 * * *", RefreshData)
 		refreshEic := task.NewTask("RefreshEic", "0 0 2,6 * * *", RefreshEic)
 		refreshCoal := task.NewTask("RefreshCoal", "0 0,30 16-23 * * *", RefreshCoal)
@@ -40,7 +42,8 @@ func Task() {
 		//refreshNationalQuarter := task.NewTask("RefreshNationalQuarterDb", "0 25 1 15 * *", national_data.RefreshNationalQuarterDb)
 		//refreshNationalYearA := task.NewTask("RefreshNationalYearDbA", "0 45 1 20 * *", national_data.RefreshNationalYearDbA)
 		//refreshNationalYearB := task.NewTask("RefreshNationalYearDbB", "0 45 1 25 * *", national_data.RefreshNationalYearDbB)
-
+		task.AddTask("美国农业部月度供需数据爬取", refreshUsdaPsd)
+		task.AddTask("美国农业部出口销售数据爬取", refreshUsdaFms)
 		task.AddTask("数据爬取", refreshData)
 		task.AddTask("欧洲天然气爬取", refreshEic)
 		task.AddTask("中国煤炭网爬取", refreshCoal)

+ 199 - 100
services/usda_psd.go

@@ -2,16 +2,18 @@ package services
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"eta/eta_crawler/services/alarm_msg"
 	"eta/eta_crawler/utils"
 	"fmt"
 	"github.com/PuerkitoBio/goquery"
-	"github.com/tealeg/xlsx"
+	"github.com/xuri/excelize/v2"
 	"io"
 	"mime/multipart"
 	"net/http"
 	"os"
+	"os/exec"
 	"strconv"
 	"strings"
 	"time"
@@ -67,6 +69,21 @@ type UsdaFasIndex struct {
 	ExcelDataMap       map[string]string
 }
 
+func DownloadUsdaPsdDataTask(cont context.Context) (err error) {
+	//月度供需,年度和月度
+	//todo 设置下载频率
+	err = DownloadUsdaPsdData()
+	return
+}
+
+func DownloadUsdaFmsDataTask(cont context.Context) (err error) {
+	//出口销售周度数据
+	startDate := time.Now().AddDate(0, -1, 0).Format("01/02/2006")
+	endDate := time.Now().Format("01/02/2006")
+	err = DownloadUsdaFmsData(startDate, endDate)
+	return
+}
+
 // Meal, Palm Kernel:0813800
 // Meal, Peanut:0813200
 // Meal, Rapeseed:0813600
@@ -90,7 +107,7 @@ type UsdaFasIndex struct {
 // Oilseed, Soybean:2222000
 // Oilseed, Sunflowerseed:2224000
 // 美国农业部月度供需平衡表数据
-func DownloadUsdaPsdData() (indexList []*UsdaFasIndex, err error) {
+func DownloadUsdaPsdData() (err error) {
 	// 从test.json文件中读取json串
 	/*body, err := ioutil.ReadFile("test.json")
 	if err != nil {
@@ -178,12 +195,14 @@ func DownloadUsdaPsdData() (indexList []*UsdaFasIndex, err error) {
 		fmt.Println("json.Unmarshal err:" + err.Error())
 		return
 	}
-	indexList, err = handleUsdaFasPsd(item)
+	go func() {
+		err = handleUsdaFasPsd(item)
+	}()
 	return
 }
 
 // 美国农业出库销售数据
-func DownloadUsdaFmsData() (indexList []*UsdaFasIndex, err error) {
+func DownloadUsdaFmsData(startDate, endDate string) (err error) {
 	// todo 设置下载频率, 如果有正在处理中的,则暂停下载
 	defer func() {
 		if err != nil {
@@ -193,7 +212,7 @@ func DownloadUsdaFmsData() (indexList []*UsdaFasIndex, err error) {
 			go alarm_msg.SendAlarmMsg(msg, 3)
 		}
 	}()
-	downloadFile := "downloaded_excel.xlsx"
+	downloadFile := fmt.Sprintf("./static/usda_fms_excel_%s.xls", time.Now().Format(utils.FormatDate))
 	//请求首页,获取入参
 	dataUrl := "https://apps.fas.usda.gov/esrquery/esrq.aspx"
 	body1, err := utils.HttpGetNoCookie(dataUrl)
@@ -249,9 +268,6 @@ func DownloadUsdaFmsData() (indexList []*UsdaFasIndex, err error) {
 			return
 		}
 	}
-	// todo 下载的日期
-	startDate := "08/22/2019"
-	endDate := "08/22/2024"
 	if err = multipartWriter.WriteField("ctl00$MainContent$lbCountry", "0:0"); err != nil {
 		err = fmt.Errorf("set ctl00$MainContent$lbCountry, Err:%s", err)
 		return
@@ -313,22 +329,57 @@ func DownloadUsdaFmsData() (indexList []*UsdaFasIndex, err error) {
 	if err != nil {
 		return
 	}
-	defer out.Close()
 
 	// 将响应体写入到文件
 	_, err = io.Copy(out, resp.Body)
 	if err != nil {
 		return
 	}
+	// 关闭临时文件以确保数据写入完成
+	err = out.Close()
+	if err != nil {
+		err = fmt.Errorf("Failed to close temporary file: %v", err)
+		return
+	}
+
+	// 转换文件格式
+	downloadFileXlsx := downloadFile + "x"
+	err = ConvertXlsToXlsx(downloadFile, downloadFileXlsx)
+	if err != nil {
+		err = fmt.Errorf("文件格式转换失败 convert excel, Err:%w", err)
+		return
+	}
+	// 使用通道等待解析完成
+	done := make(chan error)
 	go func() {
-		err = ParseUsdaFmsExcel(downloadFile)
-		fmt.Println("Excel file downloaded successfully")
+		done <- ParseUsdaFmsExcel(downloadFileXlsx)
 	}()
+	// 等待解析完成或超时
+	select {
+	case err = <-done:
+		if err != nil {
+			err = fmt.Errorf("parse excel, Err:%w", err)
+			return
+		}
+	case <-time.After(20 * time.Minute): // 假设20分钟超时
+		err = fmt.Errorf("parse excel timed out")
+		return
+	}
+
+	fmt.Println("Excel file downloaded successfully")
 	return
 }
 
 func ParseUsdaFmsExcel(path string) (err error) {
-	var xlFile *xlsx.File
+	defer func() {
+		if err != nil {
+			msg := "失败提醒" + "DownloadUsdaFmsData_ParseUsdaFmsExcel ErrMsg:" + err.Error()
+			fmt.Println("msg:", msg)
+			utils.FileLog.Info(msg)
+			go alarm_msg.SendAlarmMsg(msg, 3)
+		}
+	}()
+	//var xlFile *xlsx.File
 	exist, err := PathExists(path)
 	if err != nil {
 		fmt.Println(err)
@@ -339,121 +390,141 @@ func ParseUsdaFmsExcel(path string) (err error) {
 		err = fmt.Errorf("文件地址不存在")
 		return
 	}
-	xlFile, err = xlsx.OpenFile(path)
+	//xlFile, err = xlsx.OpenFile(path)
+	xlFile, err := excelize.OpenFile(path)
 	if err != nil {
 		fmt.Println("OpenFile err:", err)
 		err = fmt.Errorf("打开文件失败 err:%s", err.Error())
 		return
 	}
-
+	defer func() {
+		// 关闭工作簿
+		if err = xlFile.Close(); err != nil {
+			fmt.Println(err)
+		}
+		os.Remove(path)
+	}()
+	sheetName := xlFile.GetSheetName(0)
+	fmt.Println("Sheet Name:", sheetName)
 	//解析出表头第7行
 	//拼接指标名称
 	// 指标名称
 	indexMap := make(map[string]*UsdaFasIndex)
 	indexList := make([]*UsdaFasIndex, 0)
 	sort := 0
-	for _, sheet := range xlFile.Sheets {
-		//遍历行读取
-		maxCol := sheet.MaxCol
-		for i := 0; i < maxCol; i++ {
-			if i > 6 {
-				row := sheet.Row(i)
-				cells := row.Cells
-				commodity := ""
-				dateStr := ""
-				country := ""
-				dataVal := ""
-				unit := "Metric Tons"
-				for k, cell := range cells {
-					text := cell.String()
-					kind := ""
-					indexName := ""
-					if k == 1 { // 品种名称Commodity
-						commodity = text
-					} else if k == 2 {
-						dateStr = text
-					} else if k == 4 {
-						country = text
-					} else if k == 5 {
-						kind = "Weekly  Exports"
-					} else if k == 6 {
-						kind = "Accum  Exports"
-					} else if k == 7 {
-						kind = "Outstanding Sale:CMY"
-					} else if k == 8 {
-						kind = "Gross Sale:CMY"
-					} else if k == 9 {
-						kind = "Net Sale :CMY"
-					} else if k == 10 {
-						kind = "Total Commitment:CMY"
-					} else if k == 11 {
-						kind = "Outstanding Sale:NMY"
-					} else if k == 12 {
-						kind = "Net Sale :NMY"
+	rows, err := xlFile.GetRows(sheetName)
+	//for _, sheet := range xlFile.Sheets {
+	//遍历行读取
+	for i, row := range rows {
+		if i > 6 {
+			commodity := ""
+			dateStr := ""
+			country := ""
+			dataVal := ""
+			unit := "Metric Tons"
+			for k, text := range row {
+				fmt.Println("第", i, "行,第", k, "列,内容:", text)
+				kind := ""
+				indexName := ""
+				if k == 1 { // 品种名称Commodity
+					commodity = text
+				} else if k == 2 {
+					dateStr = text
+				} else if k == 4 {
+					country = text
+				} else if k == 5 {
+					kind = "Weekly  Exports"
+				} else if k == 6 {
+					kind = "Accum  Exports"
+				} else if k == 7 {
+					kind = "Outstanding Sale:CMY"
+				} else if k == 8 {
+					kind = "Gross Sale:CMY"
+				} else if k == 9 {
+					kind = "Net Sale :CMY"
+				} else if k == 10 {
+					kind = "Total Commitment:CMY"
+				} else if k == 11 {
+					kind = "Outstanding Sale:NMY"
+				} else if k == 12 {
+					kind = "Net Sale :NMY"
+				}
+				if k > 4 && k < 13 {
+					// 处理日期
+					fmt.Println(dateStr)
+					fmt.Println(unit)
+					timeT, e := time.ParseInLocation(utils.FormatDateTime, dateStr, time.Local)
+					if e != nil {
+						utils.FileLog.Info("日期格式转换失败 err:%s", e.Error())
+						continue
+					}
+					date := timeT.Format(utils.FormatDate)
+					dataVal = text
+					indexName = fmt.Sprintf("%s: %s: %s", commodity, country, kind)
+					inCode := "usda" + utils.GetFirstLetter(indexName)
+					indexItem, okIndex := indexMap[indexName]
+					// 首字母大写
+					classifyName := commodity
+					if !okIndex {
+						// 新增指标
+						indexItem = new(UsdaFasIndex)
+						indexItem.IndexName = indexName
+						indexItem.ClassifyName = classifyName
+						indexItem.ParentClassifyName = "出口销售"
+						indexItem.ClassifySort = 0
+						indexItem.IndexCode = inCode
+						indexItem.Frequency = "周度"
+						indexItem.Sort = sort
+						indexItem.Unit = unit
+						indexItem.ExcelDataMap = make(map[string]string)
+						sort++
 					}
-					if k > 4 && k < 13 {
-						// 处理日期
-						fmt.Println(dateStr)
-						fmt.Println(unit)
-						timeT, e := time.ParseInLocation("01\\/02\\/2006", dateStr, time.Local)
-						if e != nil {
-							utils.FileLog.Info("日期格式转换失败 err:%s", e.Error())
-							continue
-						}
-						date := timeT.Format(utils.FormatDate)
-						dataVal = text
-						indexName = fmt.Sprintf("%s: %s: %s", commodity, country, kind)
-						inCode := "usda" + utils.GetFirstLetter(indexName)
-						indexItem, okIndex := indexMap[indexName]
-						// 首字母大写
-						classifyName := commodity
-						if !okIndex {
-							// 新增指标
-							indexItem = new(UsdaFasIndex)
-							indexItem.IndexName = indexName
-							indexItem.ClassifyName = classifyName
-							indexItem.ParentClassifyName = "出口销售"
-							indexItem.ClassifySort = 0
-							indexItem.IndexCode = inCode
-							indexItem.Frequency = "周度"
-							indexItem.Sort = sort
-							indexItem.Unit = unit
-							indexItem.ExcelDataMap = make(map[string]string)
-							sort++
-						}
-						val, e := strconv.ParseFloat(dataVal, 64)
-						if e != nil {
-							utils.FileLog.Info("数据转换失败 err:%s", e.Error())
-							continue
-						}
-						indexItem.ExcelDataMap[date] = fmt.Sprintf("%.4f", val)
-						indexMap[indexName] = indexItem
+					if strings.Contains(dataVal, ",") {
+						dataVal = strings.ReplaceAll(dataVal, ",", "")
+					}
+					val, e := strconv.ParseFloat(dataVal, 64)
+					if e != nil {
+						utils.FileLog.Info("数据转换失败 err:%s", e.Error())
+						continue
 					}
+					indexItem.ExcelDataMap[date] = fmt.Sprintf("%.4f", val)
+					indexMap[indexName] = indexItem
 				}
 			}
 		}
 	}
+	//}
 
 	for _, v := range indexMap {
 		fmt.Printf("IndexName: %s \n", v.IndexName)
 		fmt.Printf("IndexCode: %s \n", v.IndexCode)
 		indexList = append(indexList, v)
 		if len(indexList) > 500 {
-			err = addUsdaFasPsdData(indexList)
+			err = addUsdaFasPsdData(indexList, "出口销售")
 			if err != nil {
 				return
 			}
 			indexList = []*UsdaFasIndex{}
 		}
 	}
-	err = addUsdaFasPsdData(indexList)
+	err = addUsdaFasPsdData(indexList, "出口销售")
 	if err != nil {
 		return
 	}
 	return
 }
 
-func handleUsdaFasPsd(item *UsdaPsdData) (indexList []*UsdaFasIndex, err error) {
+func handleUsdaFasPsd(item *UsdaPsdData) (err error) {
+	//设置缓存key,防止重复处理
+	defer func() {
+		if err != nil {
+			msg := "失败提醒" + "downloadUsdaPsdData_handleUsdaFasPsd ErrMsg:" + err.Error()
+			fmt.Println("msg:", msg)
+			utils.FileLog.Info(msg)
+			go alarm_msg.SendAlarmMsg(msg, 3)
+		}
+	}()
+
 	// 解析
 	headerSlice := make([]string, 0)
 	for index, v := range item.TableHeaders {
@@ -467,9 +538,6 @@ func handleUsdaFasPsd(item *UsdaPsdData) (indexList []*UsdaFasIndex, err error)
 		}
 		headerSlice = append(headerSlice, v)
 	}
-	// 解析
-	// 遍历行读取
-	indexList = make([]*UsdaFasIndex, 0)
 	sort := 0
 	// 指标名称
 	indexMap := make(map[string]*UsdaFasIndex)
@@ -561,28 +629,27 @@ func handleUsdaFasPsd(item *UsdaPsdData) (indexList []*UsdaFasIndex, err error)
 		}
 
 	}
-
+	indexList := make([]*UsdaFasIndex, 0)
 	for _, v := range indexMap {
 		fmt.Printf("IndexName: %s \n", v.IndexName)
 		fmt.Printf("IndexCode: %s \n", v.IndexCode)
 		indexList = append(indexList, v)
 		if len(indexList) > 500 {
-			err = addUsdaFasPsdData(indexList)
+			err = addUsdaFasPsdData(indexList, "月度供需")
 			if err != nil {
 				return
 			}
 			indexList = []*UsdaFasIndex{}
 		}
 	}
-	err = addUsdaFasPsdData(indexList)
+	err = addUsdaFasPsdData(indexList, "月度供需")
 	if err != nil {
 		return
 	}
 	return
 }
 
-func addUsdaFasPsdData(indexList []*UsdaFasIndex) (err error) {
-	sheetName := "月度供需"
+func addUsdaFasPsdData(indexList []*UsdaFasIndex, sheetName string) (err error) {
 	if len(indexList) > 0 {
 		params := make(map[string]interface{})
 		params["List"] = indexList
@@ -608,3 +675,35 @@ func addUsdaFasPsdData(indexList []*UsdaFasIndex) (err error) {
 	}
 	return
 }
+
+// ConvertXlsToXlsx 调用python服务把旧的xls格式转换成xlsx格式
+func ConvertXlsToXlsx(inputFile, outputFile string) (err error) {
+	pythonScript := "./static/convert_xls_to_xlsx.py"
+
+	cmd := exec.Command(utils.PYTHON_PATH, pythonScript, inputFile, outputFile)
+
+	// 创建一个缓冲区来捕获输出
+	var out bytes.Buffer
+	cmd.Stdout = &out
+	cmd.Stderr = os.Stderr // 你仍然可以将错误输出到标准错误
+
+	// 运行命令
+	err = cmd.Run()
+	if err != nil {
+		err = fmt.Errorf("Error running command: %v\n", err)
+		fmt.Printf("Error running command: %v\n", err)
+		return
+	}
+
+	// 检查输出是否包含 "SUCCESS"
+	output := out.String()
+	if strings.TrimSpace(output) == "SUCCESS" {
+		fmt.Println("Conversion completed successfully.")
+	} else {
+		err = fmt.Errorf("Conversion failed: %s", output)
+		fmt.Println("Conversion failed.")
+		// 如果需要,可以打印更详细的错误信息(如果 Python 脚本打印了的话)
+		fmt.Println("Output from Python script:", output)
+	}
+	return
+}

+ 20 - 0
static/convert_xls_to_xlsx.py

@@ -0,0 +1,20 @@
+# convert_xls_to_xlsx.py
+import pandas as pd
+import sys
+
+def convert_xls_to_xlsx(input_file, output_file):
+    try:
+        df = pd.read_excel(input_file, engine='xlrd')
+        df.to_excel(output_file, index=False)
+        print("SUCCESS")  # 打印成功消息
+    except Exception as e:
+        print(f"ERROR: {e}")  # 打印错误消息
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print("ERROR: Usage: python convert_xls_to_xlsx.py <input_xls_file> <output_xlsx_file>")
+        sys.exit(1)
+
+    input_file = sys.argv[1]
+    output_file = sys.argv[2]
+    convert_xls_to_xlsx(input_file, output_file)

+ 5 - 0
utils/config.go

@@ -12,6 +12,7 @@ var (
 	RunMode        string //运行模式
 	MYSQL_URL      string //数据库连接
 	MYSQL_URL_DATA string
+	PYTHON_PATH    string // python可执行文件地址
 )
 
 // 日志配置
@@ -108,4 +109,8 @@ func init() {
 		logMaxDaysStr := config["log_max_day"]
 		LogMaxDays, _ = strconv.Atoi(logMaxDaysStr)
 	}
+	PYTHON_PATH = config["python_path"]
+	if PYTHON_PATH == "" {
+		PYTHON_PATH = "python3"
+	}
 }