Browse Source

统计局-优化

hsun 1 year ago
parent
commit
9e940417c6

+ 1 - 0
models/base_from_national_statistics_index.go

@@ -14,6 +14,7 @@ type BaseFromNationalStatisticsIndex struct {
 	Dbcode                               string    `description:"dbcode"`
 	IndexCode                            string    `description:"指标编码"`
 	IndexName                            string    `description:"指标名称"`
+	Unit                                 string    `description:"单位"`
 	Frequency                            string    `description:"频度"`
 	Reg                                  string    `description:"地区"`
 	StartDate                            time.Time `description:"开始日期"`

+ 30 - 11
services/national_data/common.go

@@ -1,10 +1,12 @@
 package national_data
 
 import (
+	"bytes"
 	"crypto/tls"
 	"encoding/json"
 	"fmt"
 	"hongze/hongze_data_crawler/utils"
+	"io"
 	"io/ioutil"
 	"net/http"
 	"net/url"
@@ -18,7 +20,8 @@ const (
 
 func NationalHttpPost(reqUrl, payload string) (result []byte, err error) {
 	// 随机延迟执行
-	r := utils.RangeRand(4000, 8000)
+	r := utils.RangeRand(6000, 9000)
+	//fmt.Printf("随机延迟%d\n", r)
 	if r > 6100 && r < 6250 {
 		time.Sleep(15 * time.Second)
 	} else {
@@ -57,12 +60,17 @@ func NationalHttpPost(reqUrl, payload string) (result []byte, err error) {
 	if err != nil {
 		return
 	}
-	defer res.Body.Close()
-	body, err := ioutil.ReadAll(res.Body)
+	defer func() {
+		_ = res.Body.Close()
+	}()
+	// 此处用io.Copy替代ioutil.ReadAll方法避免数据过大读取不完整
+	var b []byte
+	buf := bytes.NewBuffer(b)
+	_, err = io.Copy(buf, res.Body)
 	if err != nil {
 		return
 	}
-	result = body
+	result = buf.Bytes()
 	return
 }
 
@@ -119,6 +127,14 @@ type Wds struct {
 
 // CommonDataApiRequest 数据接口请求
 func CommonDataApiRequest(req DataApiReq) (resp QuotaListDataResp, err error) {
+	var b []byte
+	defer func() {
+		if err != nil {
+			r, _ := json.Marshal(req)
+			utils.FileLog.Error("CommonDataApiRequest Err request: %s", r)
+			utils.FileLog.Info("CommonDataApiRequest Err result: %s", string(b))
+		}
+	}()
 	if req.DbCode == "" {
 		return
 	}
@@ -162,13 +178,16 @@ func CommonDataApiRequest(req DataApiReq) (resp QuotaListDataResp, err error) {
 	f.Add("h", "1")
 
 	// 响应
-	r, e := NationalHttpPost(NationalStatisticsBaseReqUrl, f.Encode())
+	b, e := NationalHttpPost(NationalStatisticsBaseReqUrl, f.Encode())
 	if e != nil {
 		err = fmt.Errorf("http request err: %s", e.Error())
 		return
 	}
-	//utils.FileLog.Info("result: %s", string(r))
-	if e = json.Unmarshal(r, &resp); e != nil {
+	if len(b) == 0 {
+		err = fmt.Errorf("http result empty")
+		return
+	}
+	if e = json.Unmarshal(b, &resp); e != nil {
 		err = fmt.Errorf("resp unmarshal err: %s", e.Error())
 		return
 	}
@@ -339,12 +358,12 @@ func ApiTest() (err error) {
 	//f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A01"}]`)
 	f := url.Values{}
 	f.Add("m", "QueryData")
-	f.Add("dbcode", "fsyd")
+	f.Add("dbcode", "fsjd")
 	f.Add("rowcode", "zb")
 	f.Add("colcode", "sj")
 	//f.Add("wds", `[{"wdcode":"reg","valuecode":"110000"}]`)
-	f.Add("wds", `[]`)
-	f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A010101"}]`)
+	f.Add("wds", `[{"wdcode":"reg","valuecode":"310000"}]`)
+	f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A0501"},{"wdcode":"sj","valuecode":"LAST18"}]`)
 	f.Add("k1", fmt.Sprint(time.Now().UnixNano()/1e6))
 	f.Add("h", "1")
 
@@ -370,6 +389,6 @@ func ApiTest() (err error) {
 		fmt.Println("请求失败, Err: ", e.Error())
 		return
 	}
-	utils.FileLog.Info("result: %s", string(r))
+	utils.FileLog.Info("test result: %s", string(r))
 	return
 }

+ 127 - 34
services/national_data/national_data.go

@@ -2,6 +2,7 @@ package national_data
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"hongze/hongze_data_crawler/models"
 	"hongze/hongze_data_crawler/services/alarm_msg"
@@ -32,7 +33,10 @@ func RefreshNationalDbs(cont context.Context) (err error) {
 // RefreshNationalMonthDbA 刷新月度指标库
 func RefreshNationalMonthDbA(cont context.Context) (err error) {
 	utils.FileLog.Info("统计局-开始同步月度指标库A")
-	SelectSyncFunc([]string{"hgyd", "fsyd"})
+	if err = SelectSyncFunc([]string{"hgyd", "fsyd"}); err != nil {
+		utils.FileLog.Info("统计局-同步月度指标库A失败")
+		return
+	}
 	utils.FileLog.Info("统计局-同步月度指标库A成功")
 	return
 }
@@ -40,7 +44,10 @@ func RefreshNationalMonthDbA(cont context.Context) (err error) {
 // RefreshNationalMonthDbB 刷新月度指标库(主要城市月度、港澳台月度)
 func RefreshNationalMonthDbB(cont context.Context) (err error) {
 	utils.FileLog.Info("统计局-开始同步月度指标库B")
-	SelectSyncFunc([]string{"csyd", "gatyd"})
+	if err = SelectSyncFunc([]string{"csyd", "gatyd"}); err != nil {
+		utils.FileLog.Info("统计局-同步月度指标库B失败")
+		return
+	}
 	utils.FileLog.Info("统计局-同步月度指标库B成功")
 	return
 }
@@ -48,7 +55,10 @@ func RefreshNationalMonthDbB(cont context.Context) (err error) {
 // RefreshNationalMonthDbC 刷新月度指标库(国际数据)
 func RefreshNationalMonthDbC(cont context.Context) (err error) {
 	utils.FileLog.Info("统计局-开始同步月度指标库C")
-	SelectSyncFunc([]string{"gjyd", "gjydsdj", "gjydsc"})
+	if err = SelectSyncFunc([]string{"gjyd", "gjydsdj", "gjydsc"}); err != nil {
+		utils.FileLog.Info("统计局-同步月度指标库C失败")
+		return
+	}
 	utils.FileLog.Info("统计局-同步月度指标库C成功")
 	return
 }
@@ -56,7 +66,10 @@ func RefreshNationalMonthDbC(cont context.Context) (err error) {
 // RefreshNationalQuarterDb 刷新季度指标库
 func RefreshNationalQuarterDb(cont context.Context) (err error) {
 	utils.FileLog.Info("统计局-开始同步季度指标库")
-	SelectSyncFunc([]string{"hgjd", "fsjd"})
+	if err = SelectSyncFunc([]string{"hgjd", "fsjd"}); err != nil {
+		utils.FileLog.Info("统计局-同步季度指标库失败")
+		return
+	}
 	utils.FileLog.Info("统计局-同步季度指标库成功")
 	return
 }
@@ -64,7 +77,14 @@ func RefreshNationalQuarterDb(cont context.Context) (err error) {
 // RefreshNationalYearDbA 刷新年度指标库(年度数据、分省年度数据)
 func RefreshNationalYearDbA(cont context.Context) (err error) {
 	utils.FileLog.Info("统计局-开始同步年度指标库A")
-	SelectSyncFunc([]string{"hgnd", "fsnd"})
+	if err = SelectSyncFunc([]string{"hgnd", "fsnd"}); err != nil {
+		utils.FileLog.Info("统计局-同步年度指标库A失败")
+		return
+	}
+	//if err = SelectSyncFunc([]string{"fsnd", "csnd", "gatnd", "gjnd"}); err != nil {
+	//	utils.FileLog.Info("统计局-同步年度指标库A失败")
+	//	return
+	//}
 	utils.FileLog.Info("统计局-同步年度指标库A成功")
 	return
 }
@@ -72,27 +92,36 @@ func RefreshNationalYearDbA(cont context.Context) (err error) {
 // RefreshNationalYearDbB 刷新年度指标库(主要城市年度数据、港澳台年度数据、国际年度数据)
 func RefreshNationalYearDbB(cont context.Context) (err error) {
 	utils.FileLog.Info("统计局-开始同步年度指标库B")
-	SelectSyncFunc([]string{"csnd", "gatnd", "gjnd"})
+	if err = SelectSyncFunc([]string{"csnd", "gatnd", "gjnd"}); err != nil {
+		utils.FileLog.Info("统计局-同步年度指标库B失败")
+		return
+	}
 	utils.FileLog.Info("统计局-同步年度指标库B成功")
 	return
 }
 
-func SelectSyncFunc(dbs []string) {
+func SelectSyncFunc(dbs []string) (err error) {
 	funcA := []string{"hgyd", "hgjd", "hgnd"}
 	funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
 	funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
 	// 此处要根据不同的指标库选择同步方式
 	for _, q := range dbs {
 		if utils.InArrayByStr(funcA, q) {
-			_ = SyncXDateYQuotaDb([]string{q})
+			if err = SyncXDateYQuotaDb([]string{q}); err != nil {
+				return
+			}
 			continue
 		}
 		if utils.InArrayByStr(funcB, q) {
-			_ = SyncXDateYQuotaZRegDb([]string{q})
+			if err = SyncXDateYQuotaZRegDb([]string{q}); err != nil {
+				return
+			}
 			continue
 		}
 		if utils.InArrayByStr(funcC, q) {
-			_ = SyncXRegYDateZQuotaDb([]string{q})
+			if err = SyncXRegYDateZQuotaDb([]string{q}); err != nil {
+				return
+			}
 		}
 	}
 	// 最后更新一下每个指标的开始结束日期
@@ -104,17 +133,18 @@ func SelectSyncFunc(dbs []string) {
 
 // SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
 func SyncXDateYQuotaDb(dbs []string) (err error) {
+	if len(dbs) == 0 {
+		dbs = []string{"hgyd", "hgjd", "hgnd"}
+	}
 	defer func() {
+		d := strings.Join(dbs, ",")
 		if err != nil {
-			utils.FileLog.Error("统计局-同步月度/季度/年度数据失败, ErrMsg: %s", err.Error())
-			go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步月度/季度/年度数据失败, ErrMsg: %s", err.Error()), 3)
+			utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
+			go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
 			return
 		}
-		utils.FileLog.Info("统计局-同步月度/季度/年度数据成功")
+		utils.FileLog.Info("统计局-同步%s数据库成功", d)
 	}()
-	if len(dbs) == 0 {
-		dbs = []string{"hgyd", "hgjd", "hgnd"}
-	}
 
 	// 查询无父级的指标分类
 	for _, d := range dbs {
@@ -175,10 +205,28 @@ func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error
 		WdCode:    "sj",
 		ValueCode: timeParam,
 	})
+	attempt := 0
 	resp, e := CommonDataApiRequest(dataReq)
 	if e != nil {
-		err = fmt.Errorf("请求数据接口失败, Err: %s", e.Error())
-		return
+		//if !strings.Contains(e.Error(), "connection attempt failed") {
+		//	err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
+		//	return
+		//}
+		// 连接失败重新尝试3次
+		for {
+			time.Sleep(2 * time.Minute)
+			attempt += 1
+			utils.FileLog.Info("当前第%d次重新请求", attempt)
+			resp, e = CommonDataApiRequest(dataReq)
+			if e == nil {
+				break
+			}
+			if attempt >= 3 {
+				s, _ := json.Marshal(dataReq)
+				err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
+				return
+			}
+		}
 	}
 
 	// 数据集
@@ -247,6 +295,7 @@ func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error
 			IndexCode:                            indexCode,
 			IndexName:                            q.Name,
 			Frequency:                            frequency,
+			Unit:                                 q.Unit,
 			CreateTime:                           time.Now().Local(),
 			ModifyTime:                           time.Now().Local(),
 		}
@@ -305,23 +354,28 @@ func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error
 
 // SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
 func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
+	if len(dbs) == 0 {
+		dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
+	}
 	defer func() {
+		d := strings.Join(dbs, ",")
 		if err != nil {
-			utils.FileLog.Error("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格失败, ErrMsg: %s", err.Error())
-			go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格失败, ErrMsg: %s", err.Error()), 3)
+			utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
+			go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
 			return
 		}
-		utils.FileLog.Info("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格成功")
+		utils.FileLog.Info("统计局-同步%s数据库成功", d)
 	}()
-	if len(dbs) == 0 {
-		dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
-	}
 
 	// 需要同步的数据库
 	for _, d := range dbs {
 		classifyOB := new(models.BaseFromNationalStatisticsClassify)
 		// 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
 		classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
+		// TODO:记得删
+		//if d == "fsnd" {
+		//	classifyCond += ` AND base_from_national_statistics_classify_id >= 3771`
+		//}
 		classifyPars := make([]interface{}, 0)
 		classifyPars = append(classifyPars, d)
 		classifyOrder := ` base_from_national_statistics_classify_id ASC`
@@ -398,10 +452,28 @@ func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regLis
 			WdCode:    "sj",
 			ValueCode: timeParam,
 		})
+		attempt := 0
 		resp, e := CommonDataApiRequest(dataReq)
 		if e != nil {
-			err = fmt.Errorf("获取分类下的指标数据失败, Err: %s", e.Error())
-			return
+			//if !strings.Contains(e.Error(), "connection attempt failed") {
+			//	err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
+			//	return
+			//}
+			// 连接失败重新尝试3次
+			for {
+				time.Sleep(2 * time.Minute)
+				attempt += 1
+				utils.FileLog.Info("当前第%d次重新请求", attempt)
+				resp, e = CommonDataApiRequest(dataReq)
+				if e == nil {
+					break
+				}
+				if attempt >= 3 {
+					s, _ := json.Marshal(dataReq)
+					err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
+					return
+				}
+			}
 		}
 
 		// 数据集
@@ -471,6 +543,7 @@ func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regLis
 				IndexCode:                            indexCode,
 				IndexName:                            q.Name,
 				Frequency:                            frequency,
+				Unit:                                 q.Unit,
 				Reg:                                  reg.Name,
 				CreateTime:                           time.Now().Local(),
 				ModifyTime:                           time.Now().Local(),
@@ -532,17 +605,18 @@ func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regLis
 
 // SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
 func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
+	if len(dbs) == 0 {
+		dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
+	}
 	defer func() {
+		d := strings.Join(dbs, ",")
 		if err != nil {
-			utils.FileLog.Error("统计局-同步港澳台、国际数据指标失败, ErrMsg: %s", err.Error())
-			go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步港澳台、国际数据指标失败, ErrMsg: %s", err.Error()), 3)
+			utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
+			go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
 			return
 		}
-		utils.FileLog.Info("统计局-同步港澳台、国际数据指标成功")
+		utils.FileLog.Info("统计局-同步%s数据库成功", d)
 	}()
-	if len(dbs) == 0 {
-		dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
-	}
 
 	// 需要同步的数据库
 	for _, d := range dbs {
@@ -650,10 +724,28 @@ func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err
 			WdCode:    "sj",
 			ValueCode: timeParam,
 		})
+		attempt := 0
 		resp, e := CommonDataApiRequest(dataReq)
 		if e != nil {
-			err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
-			return
+			//if !strings.Contains(e.Error(), "connection attempt failed") {
+			//	err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
+			//	return
+			//}
+			// 连接失败重新尝试3次
+			for {
+				time.Sleep(2 * time.Minute)
+				attempt += 1
+				utils.FileLog.Info("当前第%d次重新请求", attempt)
+				resp, e = CommonDataApiRequest(dataReq)
+				if e == nil {
+					break
+				}
+				if attempt >= 3 {
+					s, _ := json.Marshal(dataReq)
+					err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
+					return
+				}
+			}
 		}
 
 		// 数据集
@@ -728,6 +820,7 @@ func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err
 				IndexCode:                            indexCode,
 				IndexName:                            quota.Name,
 				Frequency:                            frequency,
+				Unit:                                 quota.Unit,
 				Reg:                                  reg.Name,
 				CreateTime:                           time.Now().Local(),
 				ModifyTime:                           time.Now().Local(),

+ 12 - 0
services/task.go

@@ -74,3 +74,15 @@ func RefreshChangesVisitorsCovid(cont context.Context) (err error) {
 	err = AddSourceChangesVisitorsCovid()
 	return
 }
+
+//func Task2() {
+//	fmt.Println("start")
+//
+//	var cont context.Context
+//	_ = national_data.RefreshNationalYearDbA(cont)
+//
+//	//_ = national_data.RefreshNationalYearDbB(cont)
+//
+//	//_ = national_data.ApiTest()
+//	fmt.Println("end")
+//}