Explorar el Código

Merge remote-tracking branch 'origin/feature/eta2.4.4_gpr_risk'

Roc hace 3 meses
padre
commit
0c558cfd91
Se han modificado 2 ficheros con 266 adiciones y 0 borrados
  1. 263 0
      services/gpr_risk.go
  2. 3 0
      services/task.go

+ 263 - 0
services/gpr_risk.go

@@ -0,0 +1,263 @@
+package services
+
+import (
+	"context"
+	"encoding/json"
+	"eta/eta_crawler/services/alarm_msg"
+	"eta/eta_crawler/utils"
+	"fmt"
+	"github.com/xuri/excelize/v2"
+	"io"
+	"net/http"
+	"os"
+	"path/filepath"
+	"strconv"
+	"time"
+)
+
+// GprRiskIndex 美国农业部指标数据
+type GprRiskIndex struct {
+	ClassifyName       string `description:"指标目录"`
+	ParentClassifyName string `description:"父级指标目录"`
+	ClassifySort       int    `description:"指标目录排序号"`
+	IndexName          string `description:"指标名称"`
+	IndexCode          string `description:"指标编码"`
+	Unit               string `description:"单位"`
+	Sort               int    `description:"排序号"`
+	Frequency          string `description:"频度"`
+	TerminalCode       string `description:"编码"`
+	ExcelDataMap       map[string]string
+}
+
+func DownloadGprRiskFile() (err error) {
+	defer func() {
+		if err != nil {
+			msg := "失败提醒" + "DownloadGprRiskFile ErrMsg:" + err.Error()
+			fmt.Println("msg:", msg)
+			utils.FileLog.Info(msg)
+			go alarm_msg.SendAlarmMsg(msg, 3)
+		}
+	}()
+
+	url := "https://www.matteoiacoviello.com/gpr_files/data_gpr_daily_recent.xls"
+	destDir := "static"
+	fileName := fmt.Sprintf("data_gpr_daily_recent_%s.xls", time.Now().Format(utils.FormatDateTimeUnSpace))
+	//fileName := filepath.Base(url)
+	destPath := filepath.Join(destDir, fileName)
+	fmt.Println(destPath)
+
+	//创建文件
+	out, err := os.Create(destPath)
+	if err != nil {
+		err = fmt.Errorf("failed to create file: %w", err)
+		return
+	}
+	defer out.Close()
+
+	// 发起下载请求
+	fmt.Println(time.Now().Format(utils.FormatDateTime), "开始下载 GPR地缘风险指数 文件")
+	resp, err := http.Get(url)
+	if err != nil {
+		err = fmt.Errorf("failed to make request: %w", err)
+		return
+	}
+	defer resp.Body.Close()
+
+	// 检查HTTP状态码
+	if resp.StatusCode != http.StatusOK {
+		err = fmt.Errorf("bad status: %s", resp.Status)
+		return
+	}
+	fmt.Println(time.Now().Format(utils.FormatDateTime), "下载 GPR地缘风险指数 文件完成")
+	fmt.Println(time.Now().Format(utils.FormatDateTime), "开始写入文件")
+	// 将内容写入文件
+	buf := make([]byte, 4*1024*1024) // 4MB buffer
+	_, err = io.CopyBuffer(out, resp.Body, buf)
+	if err != nil {
+		err = fmt.Errorf("failed to copy content: %w", err)
+		return
+	}
+	fmt.Println(time.Now().Format(utils.FormatDateTime), "开始转换文件格式")
+	// 转换文件格式
+	downloadFileXlsx := destPath + "x"
+	err = ConvertXlsToXlsx(destPath, downloadFileXlsx)
+	if err != nil {
+		err = fmt.Errorf("文件格式转换失败 convert excel, Err:%w", err)
+		return
+	}
+	fmt.Println(time.Now().Format(utils.FormatDateTime), "文件格式转换完成")
+	// 使用通道等待解析完成
+	done := make(chan error)
+	go func() {
+		done <- ParseGprRiskExcel(downloadFileXlsx)
+	}()
+	// 等待解析完成或超时
+	select {
+	case err = <-done:
+		if err != nil {
+			err = fmt.Errorf("parse excel, Err:%w", err)
+			return
+		}
+	}
+	// 删除临时文件
+	defer func() {
+		os.Remove(destPath)
+	}()
+
+	fmt.Printf(" GPR地缘风险指数 File downloaded successfully: %s\n", destPath)
+	return
+}
+
+func DownloadGPRRiskTask(cont context.Context) (err error) {
+	err = DownloadGprRiskFile()
+	return
+}
+
+func ParseGprRiskExcel(path string) (err error) {
+	defer func() {
+		if err != nil {
+			msg := "失败提醒" + "ParseGprRiskExcel ErrMsg:" + err.Error()
+			fmt.Println("msg:", msg)
+			utils.FileLog.Info(msg)
+			go alarm_msg.SendAlarmMsg(msg, 3)
+		}
+	}()
+	//var xlFile *xlsx.File
+	exist, err := PathExists(path)
+	if err != nil {
+		fmt.Println(err)
+		err = fmt.Errorf("文件地址不存在 err:%s", err.Error())
+		return
+	}
+	if !exist {
+		err = fmt.Errorf("文件地址不存在")
+		return
+	}
+	//xlFile, err = xlsx.OpenFile(path)
+	xlFile, err := excelize.OpenFile(path)
+	if err != nil {
+		fmt.Println("OpenFile err:", err)
+		err = fmt.Errorf("打开文件失败 err:%s", err.Error())
+		return
+	}
+	defer func() {
+		// 关闭工作簿
+		if err = xlFile.Close(); err != nil {
+			fmt.Println(err)
+		}
+		os.Remove(path)
+	}()
+	sheetName := xlFile.GetSheetName(0)
+	fmt.Println("Sheet Name:", sheetName)
+	//解析出表头第7行
+	//拼接指标名称
+	// 指标名称
+	indexMap := make(map[string]*GprRiskIndex)
+	indexList := make([]*GprRiskIndex, 0)
+	sort := 0
+	rows, err := xlFile.GetRows(sheetName)
+	dataK := 0
+	unit := "无"
+	indexName := "GPR地缘风险指数"
+	inCode := "gprdyfxzs"
+	classifyName := "GPR地缘风险指数"
+	//for _, sheet := range xlFile.Sheets {
+	//遍历行读取
+	for i, row := range rows {
+		if i == 0 {
+			for k, text := range row {
+				if text == "GPRD" {
+					dataK = k
+					break
+				}
+			}
+		} else {
+			dateStr := ""
+			for k, text := range row {
+				if k == 0 {
+					dateT, e := time.ParseInLocation("20060102", text, time.Local)
+					if e != nil {
+						utils.FileLog.Info("日期格式转换失败 err:%s", err.Error())
+						continue
+					}
+					dateStr = dateT.Format(utils.FormatDate)
+				} else if k == dataK {
+					date := dateStr
+					dataVal := text
+					indexItem, okIndex := indexMap[indexName]
+					if !okIndex {
+						// 新增指标
+						indexItem = new(GprRiskIndex)
+						indexItem.IndexName = indexName
+						indexItem.ClassifyName = classifyName
+						indexItem.ParentClassifyName = ""
+						indexItem.ClassifySort = 0
+						indexItem.IndexCode = inCode
+						indexItem.Frequency = "日度"
+						indexItem.Sort = sort
+						indexItem.Unit = unit
+						indexItem.ExcelDataMap = make(map[string]string)
+						sort++
+					}
+					val, e := strconv.ParseFloat(dataVal, 64)
+					if e != nil {
+						utils.FileLog.Info("数据转换失败 err:%s", e.Error())
+						continue
+					}
+					indexItem.ExcelDataMap[date] = fmt.Sprintf("%.2f", utils.FloatFormatRound(val, 2))
+					indexMap[indexName] = indexItem
+				}
+			}
+		}
+	}
+	//}
+
+	for _, v := range indexMap {
+		//fmt.Printf("IndexName: %s \n", v.IndexName)
+		//fmt.Printf("IndexCode: %s \n", v.IndexCode)
+		indexList = append(indexList, v)
+		if len(indexList) > 100 {
+			err = addGprRiskData(indexList)
+			if err != nil {
+				return
+			}
+			indexList = []*GprRiskIndex{}
+		}
+	}
+	if len(indexList) > 0 {
+		err = addGprRiskData(indexList)
+		if err != nil {
+			return
+		}
+	}
+	fmt.Println("GPR地缘风险指数 执行成功")
+	return
+}
+
+func addGprRiskData(indexList []*GprRiskIndex) (err error) {
+	sheetName := "GPR地缘风险指数"
+	if len(indexList) > 0 {
+		params := make(map[string]interface{})
+		params["List"] = indexList
+		params["TerminalCode"] = ""
+		result, e := utils.PostEdbLib(params, "gpr_risk/handle/excel_data")
+		if e != nil {
+			err = fmt.Errorf("sheet :%s PostEdbLib err: %s", sheetName, e.Error())
+			b, _ := json.Marshal(params)
+			utils.FileLog.Info(fmt.Sprintf("sheet :%s PostEdbLib err: %s, params: %s", sheetName, e.Error(), string(b)))
+			return
+		}
+		resp := new(utils.BaseEdbLibResponse)
+		if e := json.Unmarshal(result, &resp); e != nil {
+			err = fmt.Errorf("sheet :%s json.Unmarshal err: %s", sheetName, e)
+			utils.FileLog.Info(fmt.Sprintf("sheet :%s json.Unmarshal err: %s", sheetName, e))
+			return
+		}
+		if resp.Ret != 200 {
+			err = fmt.Errorf("sheet :%s Msg: %s, ErrMsg: %s", sheetName, resp.Msg, resp.ErrMsg)
+			utils.FileLog.Info(fmt.Sprintf("sheet :%s Msg: %s, ErrMsg: %s", sheetName, resp.Msg, resp.ErrMsg))
+			return
+		}
+	}
+	return
+}

+ 3 - 0
services/task.go

@@ -39,14 +39,17 @@ func Task() {
 
 		crawlerIcpi := task.NewTask("refreshData", "0 0,30 16-23 * * *", CrawlerIcpi) //居民消费价格指数
 
+		downloadGPRRiskTask := task.NewTask("DownloadGPRRiskTask", "0 0,30 16-19 * * *", DownloadGPRRiskTask) //每天下午16点至19点,每隔半小时
 		// 统计局-分月季年爬
 		//refreshNationalMonthA := task.NewTask("RefreshNationalMonthDbA", "0 15 2 10 * *", national_data.RefreshNationalMonthDbA)
 		//refreshNationalMonthB := task.NewTask("RefreshNationalMonthDbB", "0 15 2 16 * *", national_data.RefreshNationalMonthDbB)
 		//refreshNationalQuarter := task.NewTask("RefreshNationalQuarterDb", "0 25 1 15 * *", national_data.RefreshNationalQuarterDb)
 		//refreshNationalYearA := task.NewTask("RefreshNationalYearDbA", "0 45 1 20 * *", national_data.RefreshNationalYearDbA)
 		//refreshNationalYearB := task.NewTask("RefreshNationalYearDbB", "0 45 1 25 * *", national_data.RefreshNationalYearDbB)
+
 		task.AddTask("美国农业部月度供需数据爬取", refreshUsdaPsd)
 		task.AddTask("美国农业部出口销售数据爬取", refreshUsdaFms)
+		task.AddTask("GPR地缘风险指数爬取", downloadGPRRiskTask)
 		task.AddTask("数据爬取", refreshData)
 		task.AddTask("欧洲天然气爬取", refreshEic)
 		// task.AddTask("中国煤炭网爬取", refreshCoal)