Browse Source

test 统计局

hsun 1 year ago
parent
commit
b002c151a9
4 changed files with 160 additions and 54 deletions
  1. 9 2
      services/national_data/common.go
  2. 115 27
      services/national_data/national_data.go
  3. 17 25
      services/task.go
  4. 19 0
      utils/common.go

+ 9 - 2
services/national_data/common.go

@@ -17,7 +17,14 @@ const (
 )
 
 func NationalHttpPost(reqUrl, payload string) (result []byte, err error) {
-	time.Sleep(2200 * time.Millisecond) // 目前来看这个速度是不会中断的...就是慢...
+	// 随机延迟执行
+	r := utils.RangeRand(3000, 6000)
+	if r > 5000 && r < 5100 {
+		time.Sleep(10 * time.Second)
+	} else {
+		time.Sleep(time.Duration(r) * time.Millisecond)
+	}
+
 	tr := &http.Transport{
 		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
 	}
@@ -160,7 +167,7 @@ func CommonDataApiRequest(req DataApiReq) (resp QuotaListDataResp, err error) {
 		err = fmt.Errorf("http request err: %s", e.Error())
 		return
 	}
-	utils.FileLog.Info("result: %s", string(r))
+	//utils.FileLog.Info("result: %s", string(r))
 	if e = json.Unmarshal(r, &resp); e != nil {
 		err = fmt.Errorf("resp unmarshal err: %s", e.Error())
 		return

+ 115 - 27
services/national_data/national_data.go

@@ -10,22 +10,100 @@ import (
 	"time"
 )
 
-// RefreshNationalData 刷新统计局数据
-func RefreshNationalData(cont context.Context) (err error) {
+// RefreshNationalDbs 刷新统计局数据(所有)
+func RefreshNationalDbs(cont context.Context) (err error) {
 	utils.FileLog.Info("开始刷新统计局数据")
 
-	_ = SyncXDateYQuotaDb()
+	_ = SyncXDateYQuotaDb([]string{})
 
-	_ = SyncXDateYQuotaZRegDb()
+	_ = SyncXDateYQuotaZRegDb([]string{})
 
-	_ = SyncXRegYDateZQuotaDb()
+	_ = SyncXRegYDateZQuotaDb([]string{})
+
+	// 最后更新一下每个指标的开始结束日期
+	if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
+		alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
+	}
 
 	utils.FileLog.Info("统计局数据刷新成功")
 	return
 }
 
+// RefreshNationalMonthDbA 刷新月度指标库
+func RefreshNationalMonthDbA(cont context.Context) (err error) {
+	utils.FileLog.Info("统计局-开始同步月度指标库A")
+	SelectSyncFunc([]string{"hgyd", "fsyd"})
+	utils.FileLog.Info("统计局-同步月度指标库A成功")
+	return
+}
+
+// RefreshNationalMonthDbB 刷新月度指标库(主要城市月度、港澳台月度)
+func RefreshNationalMonthDbB(cont context.Context) (err error) {
+	utils.FileLog.Info("统计局-开始同步月度指标库B")
+	SelectSyncFunc([]string{"csyd", "gatyd"})
+	utils.FileLog.Info("统计局-同步月度指标库B成功")
+	return
+}
+
+// RefreshNationalMonthDbC 刷新月度指标库(国际数据)
+func RefreshNationalMonthDbC(cont context.Context) (err error) {
+	utils.FileLog.Info("统计局-开始同步月度指标库C")
+	SelectSyncFunc([]string{"gjyd", "gjydsdj", "gjydsc"})
+	utils.FileLog.Info("统计局-同步月度指标库C成功")
+	return
+}
+
+// RefreshNationalQuarterDb 刷新季度指标库
+func RefreshNationalQuarterDb(cont context.Context) (err error) {
+	utils.FileLog.Info("统计局-开始同步季度指标库")
+	SelectSyncFunc([]string{"hgjd", "fsjd"})
+	utils.FileLog.Info("统计局-同步季度指标库成功")
+	return
+}
+
+// RefreshNationalYearDbA 刷新年度指标库(年度数据、分省年度数据)
+func RefreshNationalYearDbA(cont context.Context) (err error) {
+	utils.FileLog.Info("统计局-开始同步年度指标库A")
+	SelectSyncFunc([]string{"hgnd", "fsnd"})
+	utils.FileLog.Info("统计局-同步年度指标库A成功")
+	return
+}
+
+// RefreshNationalYearDbB 刷新年度指标库(主要城市年度数据、港澳台年度数据、国际年度数据)
+func RefreshNationalYearDbB(cont context.Context) (err error) {
+	utils.FileLog.Info("统计局-开始同步年度指标库B")
+	SelectSyncFunc([]string{"csnd", "gatnd", "gjnd"})
+	utils.FileLog.Info("统计局-同步年度指标库B成功")
+	return
+}
+
+func SelectSyncFunc(dbs []string) {
+	funcA := []string{"hgyd", "hgjd", "hgnd"}
+	funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
+	funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
+	// 此处要根据不同的指标库选择同步方式
+	for _, q := range dbs {
+		if utils.InArrayByStr(funcA, q) {
+			_ = SyncXDateYQuotaDb([]string{q})
+			continue
+		}
+		if utils.InArrayByStr(funcB, q) {
+			_ = SyncXDateYQuotaZRegDb([]string{q})
+			continue
+		}
+		if utils.InArrayByStr(funcC, q) {
+			_ = SyncXRegYDateZQuotaDb([]string{q})
+		}
+	}
+	// 最后更新一下每个指标的开始结束日期
+	if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
+		alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
+	}
+	return
+}
+
 // SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
-func SyncXDateYQuotaDb() (err error) {
+func SyncXDateYQuotaDb(dbs []string) (err error) {
 	defer func() {
 		if err != nil {
 			utils.FileLog.Error("统计局-同步月度/季度/年度数据失败, ErrMsg: %s", err.Error())
@@ -34,25 +112,31 @@ func SyncXDateYQuotaDb() (err error) {
 		}
 		utils.FileLog.Info("统计局-同步月度/季度/年度数据成功")
 	}()
-
-	// 查询无父级的指标分类
-	classifyOB := new(models.BaseFromNationalStatisticsClassify)
-	classifyCond := ` AND is_parent = 0 AND dbcode IN ('hgyd', 'hgjd', 'hgnd')`
-	classifyPars := make([]interface{}, 0)
-	classifyOrder := ` base_from_national_statistics_classify_id ASC`
-	classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
-	if e != nil {
-		err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
-		return
+	if len(dbs) == 0 {
+		dbs = []string{"hgyd", "hgjd", "hgnd"}
 	}
-	utils.FileLog.Info("分类长度: %d\n", len(classifyList))
 
-	// 同步指标和数据
-	for _, c := range classifyList {
-		if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
-			err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
+	// 查询无父级的指标分类
+	for _, d := range dbs {
+		classifyOB := new(models.BaseFromNationalStatisticsClassify)
+		classifyCond := ` AND is_parent = 0 AND dbcode = ?`
+		classifyPars := make([]interface{}, 0)
+		classifyPars = append(classifyPars, d)
+		classifyOrder := ` base_from_national_statistics_classify_id ASC`
+		classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
+		if e != nil {
+			err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
 			return
 		}
+		utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
+
+		// 同步指标和数据
+		for _, c := range classifyList {
+			if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
+				err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
+				return
+			}
+		}
 	}
 	return
 }
@@ -212,7 +296,7 @@ func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error
 }
 
 // SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
-func SyncXDateYQuotaZRegDb() (err error) {
+func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
 	defer func() {
 		if err != nil {
 			utils.FileLog.Error("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格失败, ErrMsg: %s", err.Error())
@@ -221,10 +305,12 @@ func SyncXDateYQuotaZRegDb() (err error) {
 		}
 		utils.FileLog.Info("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格成功")
 	}()
+	if len(dbs) == 0 {
+		dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
+	}
 
 	// 需要同步的数据库
-	dbCodes := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
-	for _, d := range dbCodes {
+	for _, d := range dbs {
 		classifyOB := new(models.BaseFromNationalStatisticsClassify)
 		// 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
 		classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
@@ -429,7 +515,7 @@ func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regLis
 }
 
 // SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
-func SyncXRegYDateZQuotaDb() (err error) {
+func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
 	defer func() {
 		if err != nil {
 			utils.FileLog.Error("统计局-同步港澳台、国际数据指标失败, ErrMsg: %s", err.Error())
@@ -438,10 +524,12 @@ func SyncXRegYDateZQuotaDb() (err error) {
 		}
 		utils.FileLog.Info("统计局-同步港澳台、国际数据指标成功")
 	}()
+	if len(dbs) == 0 {
+		dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
+	}
 
 	// 需要同步的数据库
-	dbCodes := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
-	for _, d := range dbCodes {
+	for _, d := range dbs {
 		classifyOB := new(models.BaseFromNationalStatisticsClassify)
 		// 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
 		classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`

+ 17 - 25
services/task.go

@@ -14,14 +14,28 @@ func Task() {
 	refreshCoal := task.NewTask("refreshData", "0 0,30 17-23 * * *", RefreshCoal)
 	refreshVisitors := task.NewTask("RefreshChangesVisitorsCovid", "0 30 2-22/10 * * *", RefreshChangesVisitorsCovid)
 	syncEiaSteoData := task.NewTask("SyncEiaSteoData", "0 0 22 * * *", SyncEiaSteoData)
-	refreshNationalData := task.NewTask("RefreshNationalData", "0 15 2 10 * *", national_data.RefreshNationalData)
+
+	// 统计局-稳妥起见拆成多个时间爬取
+	refreshNationalMonthA := task.NewTask("RefreshNationalMonthDbA", "0 15 2 8 * *", national_data.RefreshNationalMonthDbA)
+	refreshNationalMonthB := task.NewTask("RefreshNationalMonthDbB", "0 15 2 16 * *", national_data.RefreshNationalMonthDbB)
+	refreshNationalMonthC := task.NewTask("RefreshNationalMonthDbC", "0 15 2 24 * *", national_data.RefreshNationalMonthDbC)
+	refreshNationalQuarter := task.NewTask("RefreshNationalQuarterDb", "0 25 1 15 1,4,7,10 *", national_data.RefreshNationalQuarterDb)
+	refreshNationalYearA := task.NewTask("RefreshNationalYearDbA", "0 45 1 13 2 *", national_data.RefreshNationalYearDbA)
+	refreshNationalYearB := task.NewTask("RefreshNationalYearDbB", "0 45 1 26 2 *", national_data.RefreshNationalYearDbB)
 
 	task.AddTask("数据爬取", refreshData)
 	task.AddTask("欧洲天然气爬取", refreshEic)
 	task.AddTask("中国煤炭网爬取", refreshCoal)
 	task.AddTask("谷歌出行指数爬取", refreshVisitors)
-	task.AddTask("eia steo报告", syncEiaSteoData)  //每天22点爬一次
-	task.AddTask("统计局数据爬取", refreshNationalData) // 每月10号2:15执行一次
+	task.AddTask("eia steo报告", syncEiaSteoData) //每天22点爬一次
+
+	task.AddTask("统计局数据爬取-月度A", refreshNationalMonthA) // 每月8号2:15执行
+	task.AddTask("统计局数据爬取-月度B", refreshNationalMonthB) // 每月16号2:15执行
+	task.AddTask("统计局数据爬取-月度C", refreshNationalMonthC) // 每月24号2:15执行
+	task.AddTask("统计局数据爬取-季度", refreshNationalQuarter) // 每年1/4/7/10月15日1:25执行
+	task.AddTask("统计局数据爬取-年度A", refreshNationalYearA)  // 每年2月13日1:45执行
+	task.AddTask("统计局数据爬取-年度B", refreshNationalYearB)  // 每年2月26日1:45执行
+
 	task.StartTask()
 	//FileCoalJsm()
 	//FileCoalFirm()
@@ -60,25 +74,3 @@ func RefreshChangesVisitorsCovid(cont context.Context) (err error) {
 	err = AddSourceChangesVisitorsCovid()
 	return
 }
-
-func Task11() {
-	fmt.Println("start")
-
-	//_ = national_data.SyncXDateYQuotaZRegDb()
-	//_ = national_data.SyncQuotaClassifyTree()
-	//_ = national_data.SyncQuotaDataFromDbCodeAndId()
-
-	//reqUrl := "https://data.stats.gov.cn/easyquery.htm"
-
-	//formData := url.Values{}
-	//formData.Add("id", "zb")
-	//formData.Add("dbcode", "hgyd")
-	//formData.Add("wdcode", "zb")
-	//formData.Add("m", "getTree")
-	//
-	//formDataStr := formData.Encode()
-	//result, err := national_data.NationalHttpPost(reqUrl, formDataStr)
-	//fmt.Println(err)
-	//fmt.Println(string(result))
-	fmt.Println("end")
-}

+ 19 - 0
utils/common.go

@@ -3,6 +3,7 @@ package utils
 import (
 	"bufio"
 	"crypto/md5"
+	cryRand "crypto/rand"
 	"crypto/sha1"
 	"encoding/base64"
 	"encoding/hex"
@@ -13,6 +14,7 @@ import (
 	"image/png"
 	"io"
 	"math"
+	"math/big"
 	"math/rand"
 	"net/http"
 	"os"
@@ -945,3 +947,20 @@ func InArrayByStr(idStrList []string, searchId string) (has bool) {
 	}
 	return
 }
+
+// RangeRand 取区间随机数
+func RangeRand(min, max int64) int64 {
+	if min > max {
+		return max
+	}
+	if min < 0 {
+		f64Min := math.Abs(float64(min))
+		i64Min := int64(f64Min)
+		result, _ := cryRand.Int(cryRand.Reader, big.NewInt(max+1+i64Min))
+
+		return result.Int64() - i64Min
+	} else {
+		result, _ := cryRand.Int(cryRand.Reader, big.NewInt(max-min+1))
+		return min + result.Int64()
+	}
+}