|
@@ -0,0 +1,856 @@
|
|
|
+package national_data
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "hongze/hongze_data_crawler/models"
|
|
|
+ "hongze/hongze_data_crawler/services/alarm_msg"
|
|
|
+ "hongze/hongze_data_crawler/utils"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+// 每个库大概同步时间
|
|
|
+// hgyd-4h fsyd-9h csyd-3h gatyd-1h gjyd、gjydsdj、gjydsc-15min
|
|
|
+// hgjd-4h fsjd-4h
|
|
|
+// hgnd-9h fsnd-2day csnd、gatnd、gjnd-1.5h
|
|
|
+
|
|
|
+// RefreshNationalDbs 刷新统计局数据(所有)
|
|
|
+func RefreshNationalDbs(cont context.Context) (err error) {
|
|
|
+ utils.FileLog.Info("开始刷新统计局数据")
|
|
|
+
|
|
|
+ _ = SyncXDateYQuotaDb([]string{})
|
|
|
+
|
|
|
+ _ = SyncXDateYQuotaZRegDb([]string{})
|
|
|
+
|
|
|
+ _ = SyncXRegYDateZQuotaDb([]string{})
|
|
|
+
|
|
|
+ // 最后更新一下每个指标的开始结束日期
|
|
|
+ if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
|
|
|
+ alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
|
|
|
+ }
|
|
|
+
|
|
|
+ utils.FileLog.Info("统计局数据刷新成功")
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// RefreshNationalMonthDbA 刷新月度指标库
|
|
|
+func RefreshNationalMonthDbA(cont context.Context) (err error) {
|
|
|
+ utils.FileLog.Info("统计局-开始同步月度指标库A")
|
|
|
+ if err = SelectSyncFunc([]string{"hgyd", "csyd", "gatyd", "gjyd", "gjydsdj", "gjydsc"}); err != nil {
|
|
|
+ utils.FileLog.Info("统计局-同步月度指标库A失败")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步月度指标库A成功")
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// RefreshNationalMonthDbB 刷新月度指标库(分省月度)
|
|
|
+func RefreshNationalMonthDbB(cont context.Context) (err error) {
|
|
|
+ utils.FileLog.Info("统计局-开始同步月度指标库B")
|
|
|
+ if err = SelectSyncFunc([]string{"fsyd"}); err != nil {
|
|
|
+ utils.FileLog.Info("统计局-同步月度指标库B失败")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步月度指标库B成功")
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// RefreshNationalQuarterDb 刷新季度指标库
|
|
|
+func RefreshNationalQuarterDb(cont context.Context) (err error) {
|
|
|
+ utils.FileLog.Info("统计局-开始同步季度指标库")
|
|
|
+ if err = SelectSyncFunc([]string{"hgjd", "fsjd"}); err != nil {
|
|
|
+ utils.FileLog.Info("统计局-同步季度指标库失败")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步季度指标库成功")
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// RefreshNationalYearDbA 刷新年度指标库
|
|
|
+func RefreshNationalYearDbA(cont context.Context) (err error) {
|
|
|
+ utils.FileLog.Info("统计局-开始同步年度指标库A")
|
|
|
+ if err = SelectSyncFunc([]string{"hgnd", "csnd", "gatnd", "gjnd"}); err != nil {
|
|
|
+ utils.FileLog.Info("统计局-同步年度指标库A失败")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步年度指标库A成功")
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// RefreshNationalYearDbB 刷新年度指标库(分省年度数据)
|
|
|
+func RefreshNationalYearDbB(cont context.Context) (err error) {
|
|
|
+ utils.FileLog.Info("统计局-开始同步年度指标库B")
|
|
|
+ if err = SelectSyncFunc([]string{"fsnd"}); err != nil {
|
|
|
+ utils.FileLog.Info("统计局-同步年度指标库B失败")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步年度指标库B成功")
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func SelectSyncFunc(dbs []string) (err error) {
|
|
|
+ funcA := []string{"hgyd", "hgjd", "hgnd"}
|
|
|
+ funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
|
|
|
+ funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
|
|
|
+ // 此处要根据不同的指标库选择同步方式
|
|
|
+ for _, q := range dbs {
|
|
|
+ if utils.InArrayByStr(funcA, q) {
|
|
|
+ if err = SyncXDateYQuotaDb([]string{q}); err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if utils.InArrayByStr(funcB, q) {
|
|
|
+ if err = SyncXDateYQuotaZRegDb([]string{q}); err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if utils.InArrayByStr(funcC, q) {
|
|
|
+ if err = SyncXRegYDateZQuotaDb([]string{q}); err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 最后更新一下每个指标的开始结束日期
|
|
|
+ if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
|
|
|
+ alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
|
|
|
+func SyncXDateYQuotaDb(dbs []string) (err error) {
|
|
|
+ if len(dbs) == 0 {
|
|
|
+ dbs = []string{"hgyd", "hgjd", "hgnd"}
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ d := strings.Join(dbs, ",")
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
|
|
|
+ go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步%s数据库成功", d)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 查询无父级的指标分类
|
|
|
+ for _, d := range dbs {
|
|
|
+ classifyOB := new(models.BaseFromNationalStatisticsClassify)
|
|
|
+ classifyCond := ` AND is_parent = 0 AND dbcode = ?`
|
|
|
+ classifyPars := make([]interface{}, 0)
|
|
|
+ classifyPars = append(classifyPars, d)
|
|
|
+ classifyOrder := ` base_from_national_statistics_classify_id ASC`
|
|
|
+ classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
|
|
|
+
|
|
|
+ // 同步指标和数据
|
|
|
+ for _, c := range classifyList {
|
|
|
+ utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
|
|
|
+ if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
|
|
|
+ err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SyncXDateYQuotaData 同步两维度X轴-日期, Y轴-指标的数据
|
|
|
+func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ // yd-月度 jd-季度 nd-年度
|
|
|
+ frequency := ""
|
|
|
+ timeParam := ""
|
|
|
+ if strings.Contains(dbCode, "yd") {
|
|
|
+ timeParam = "LAST36" // 最近36个月
|
|
|
+ frequency = "月度"
|
|
|
+ }
|
|
|
+ if strings.Contains(dbCode, "jd") {
|
|
|
+ timeParam = "LAST18" // 最近18个季度
|
|
|
+ frequency = "季度"
|
|
|
+ }
|
|
|
+ if strings.Contains(dbCode, "nd") {
|
|
|
+ timeParam = "LAST20" // 最近20年
|
|
|
+ frequency = "年度"
|
|
|
+ }
|
|
|
+
|
|
|
+ var dataReq DataApiReq
|
|
|
+ dataReq.DbCode = dbCode
|
|
|
+ dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
|
|
|
+ WdCode: "zb",
|
|
|
+ ValueCode: classifyCode,
|
|
|
+ }, Wds{
|
|
|
+ WdCode: "sj",
|
|
|
+ ValueCode: timeParam,
|
|
|
+ })
|
|
|
+ attempt := 0
|
|
|
+ resp, e := CommonDataApiRequest(dataReq)
|
|
|
+ if e != nil {
|
|
|
+ //if !strings.Contains(e.Error(), "connection attempt failed") {
|
|
|
+ // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+ // 连接失败重新尝试3次
|
|
|
+ for {
|
|
|
+ time.Sleep(2 * time.Minute)
|
|
|
+ attempt += 1
|
|
|
+ utils.FileLog.Info("当前第%d次重新请求", attempt)
|
|
|
+ resp, e = CommonDataApiRequest(dataReq)
|
|
|
+ if e == nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if attempt >= 3 {
|
|
|
+ s, _ := json.Marshal(dataReq)
|
|
|
+ err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据集
|
|
|
+ dataNodes := resp.ReturnData.DataNodes
|
|
|
+ dataMap := make(map[string]QuotaDataNode)
|
|
|
+ for _, d := range dataNodes {
|
|
|
+ dataMap[d.Code] = d
|
|
|
+ }
|
|
|
+
|
|
|
+ // 取出指标(Y轴), 日期(X轴)
|
|
|
+ wdNodes := resp.ReturnData.WdNodes
|
|
|
+ var quotaNodes, dateNodes []QuotaWdNodeData
|
|
|
+ for _, w := range wdNodes {
|
|
|
+ if w.WdCode == "zb" {
|
|
|
+ quotaNodes = w.Nodes
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if w.WdCode == "sj" {
|
|
|
+ dateNodes = w.Nodes
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 指标编码去重, 指标编码+日期数据去重
|
|
|
+ indexOB := new(models.BaseFromNationalStatisticsIndex)
|
|
|
+ indexCond := ` AND dbcode = ?`
|
|
|
+ indexPars := make([]interface{}, 0)
|
|
|
+ indexPars = append(indexPars, dbCode)
|
|
|
+ indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexExistMap := make(map[string]bool)
|
|
|
+ for _, v := range indexList {
|
|
|
+ indexExistMap[v.IndexCode] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历XY轴
|
|
|
+ indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
|
|
|
+ indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
|
|
|
+ for _, q := range quotaNodes {
|
|
|
+ indexCode := fmt.Sprintf("%s%s", dbCode, q.Code)
|
|
|
+
|
|
|
+ // 数据去重
|
|
|
+ dataExistMap := make(map[string]bool)
|
|
|
+ dataOB := new(models.BaseFromNationalStatisticsData)
|
|
|
+ dataCond := ` AND index_code = ?`
|
|
|
+ dataPars := make([]interface{}, 0)
|
|
|
+ dataPars = append(dataPars, indexCode)
|
|
|
+ dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, v := range dataList {
|
|
|
+ dataExistMap[v.DataTime.Format(utils.FormatDate)] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 指标
|
|
|
+ r := new(models.SaveNationalStatisticsIndexAndDataReq)
|
|
|
+ r.Index = &models.BaseFromNationalStatisticsIndex{
|
|
|
+ BaseFromNationalStatisticsClassifyId: classifyId,
|
|
|
+ Dbcode: dbCode,
|
|
|
+ IndexCode: indexCode,
|
|
|
+ IndexName: q.Name,
|
|
|
+ Frequency: frequency,
|
|
|
+ Unit: q.Unit,
|
|
|
+ CreateTime: time.Now().Local(),
|
|
|
+ ModifyTime: time.Now().Local(),
|
|
|
+ }
|
|
|
+ if indexExistMap[indexCode] {
|
|
|
+ r.IndexExist = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据
|
|
|
+ for _, d := range dateNodes {
|
|
|
+ k := fmt.Sprintf("%s.%s_%s.%s", "zb", q.Code, "sj", d.Code)
|
|
|
+ v := dataMap[k]
|
|
|
+ if !v.Data.HasData {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ t, e := formatMonth2YearDateCode(d.Code)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if dataExistMap[t.Format(utils.FormatDate)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据map
|
|
|
+ if indexDataMap[indexCode] == nil {
|
|
|
+ indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
|
|
|
+ }
|
|
|
+ indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
|
|
|
+ IndexCode: indexCode,
|
|
|
+ DataTime: t,
|
|
|
+ Value: v.Data.Data,
|
|
|
+ CreateTime: time.Now().Local(),
|
|
|
+ ModifyTime: time.Now().Local(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ indexDataList = append(indexDataList, r)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 保存指标
|
|
|
+ for _, v := range indexDataList {
|
|
|
+ ds := indexDataMap[v.Index.IndexCode]
|
|
|
+ if ds == nil || (ds != nil && len(ds) == 0) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ v.DataList = ds
|
|
|
+ if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
|
|
|
+ err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
|
|
|
+func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
|
|
|
+ if len(dbs) == 0 {
|
|
|
+ dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ d := strings.Join(dbs, ",")
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
|
|
|
+ go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步%s数据库成功", d)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 需要同步的数据库
|
|
|
+ for _, d := range dbs {
|
|
|
+ classifyOB := new(models.BaseFromNationalStatisticsClassify)
|
|
|
+ // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
|
|
|
+ classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
|
|
|
+ classifyPars := make([]interface{}, 0)
|
|
|
+ classifyPars = append(classifyPars, d)
|
|
|
+ classifyOrder := ` base_from_national_statistics_classify_id ASC`
|
|
|
+ classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
|
|
|
+
|
|
|
+ // 查询其他维度-地区
|
|
|
+ wdList, e := GetOtherWd(d, "", "")
|
|
|
+ var regList []OtherWdNodes
|
|
|
+ for _, wd := range wdList {
|
|
|
+ if wd.WdCode == "reg" {
|
|
|
+ regList = wd.Nodes
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(regList) == 0 {
|
|
|
+ err = fmt.Errorf("其他维度为空, DbCode: %s", d)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // 同步指标和数据
|
|
|
+ for _, c := range classifyList {
|
|
|
+ utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
|
|
|
+ if e = SyncXDateYQuotaZRegData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id, regList); e != nil {
|
|
|
+ err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SyncXDateYQuotaZRegData 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据
|
|
|
+func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regList []OtherWdNodes) (err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 根据DbCode判断频度和查询的时间区间
|
|
|
+ frequency := ""
|
|
|
+ timeParam := ""
|
|
|
+ if strings.Contains(dbCode, "yd") {
|
|
|
+ timeParam = "LAST36" // 最近36个月
|
|
|
+ frequency = "月度"
|
|
|
+ }
|
|
|
+ if strings.Contains(dbCode, "jd") {
|
|
|
+ timeParam = "LAST18" // 最近18个季度
|
|
|
+ frequency = "季度"
|
|
|
+ }
|
|
|
+ if strings.Contains(dbCode, "nd") {
|
|
|
+ timeParam = "LAST20" // 最近20年
|
|
|
+ frequency = "年度"
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历地区维度, 查询指标和数据
|
|
|
+ for _, reg := range regList {
|
|
|
+ var dataReq DataApiReq
|
|
|
+ dataReq.DbCode = dbCode
|
|
|
+ dataReq.WdsList = append(dataReq.WdsList, Wds{
|
|
|
+ WdCode: "reg",
|
|
|
+ ValueCode: reg.Code,
|
|
|
+ })
|
|
|
+ dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
|
|
|
+ WdCode: "zb",
|
|
|
+ ValueCode: classifyCode,
|
|
|
+ }, Wds{
|
|
|
+ WdCode: "sj",
|
|
|
+ ValueCode: timeParam,
|
|
|
+ })
|
|
|
+ attempt := 0
|
|
|
+ resp, e := CommonDataApiRequest(dataReq)
|
|
|
+ if e != nil {
|
|
|
+ //if !strings.Contains(e.Error(), "connection attempt failed") {
|
|
|
+ // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+ // 连接失败重新尝试3次
|
|
|
+ for {
|
|
|
+ time.Sleep(2 * time.Minute)
|
|
|
+ attempt += 1
|
|
|
+ utils.FileLog.Info("当前第%d次重新请求", attempt)
|
|
|
+ resp, e = CommonDataApiRequest(dataReq)
|
|
|
+ if e == nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if attempt >= 3 {
|
|
|
+ s, _ := json.Marshal(dataReq)
|
|
|
+ err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据集
|
|
|
+ dataNodes := resp.ReturnData.DataNodes
|
|
|
+ dataMap := make(map[string]QuotaDataNode)
|
|
|
+ for _, d := range dataNodes {
|
|
|
+ dataMap[d.Code] = d
|
|
|
+ }
|
|
|
+
|
|
|
+ // 取出指标(Y轴), 日期(X轴)
|
|
|
+ wdNodes := resp.ReturnData.WdNodes
|
|
|
+ var quotaNodes, dateNodes []QuotaWdNodeData
|
|
|
+ for _, w := range wdNodes {
|
|
|
+ if w.WdCode == "zb" {
|
|
|
+ quotaNodes = w.Nodes
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if w.WdCode == "sj" {
|
|
|
+ dateNodes = w.Nodes
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 指标编码去重, 指标编码+日期数据去重
|
|
|
+ indexOB := new(models.BaseFromNationalStatisticsIndex)
|
|
|
+ indexCond := ` AND dbcode = ?`
|
|
|
+ indexPars := make([]interface{}, 0)
|
|
|
+ indexPars = append(indexPars, dbCode)
|
|
|
+ indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexExistMap := make(map[string]bool)
|
|
|
+ for _, v := range indexList {
|
|
|
+ indexExistMap[v.IndexCode] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历XY轴
|
|
|
+ indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
|
|
|
+ indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
|
|
|
+ for _, q := range quotaNodes {
|
|
|
+ // dbcode+指标code+地区code
|
|
|
+ indexCode := fmt.Sprintf("%s%s%s", dbCode, q.Code, reg.Code)
|
|
|
+
|
|
|
+ // 数据去重
|
|
|
+ dataExistMap := make(map[string]bool)
|
|
|
+ dataOB := new(models.BaseFromNationalStatisticsData)
|
|
|
+ dataCond := ` AND index_code = ?`
|
|
|
+ dataPars := make([]interface{}, 0)
|
|
|
+ dataPars = append(dataPars, indexCode)
|
|
|
+ dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, v := range dataList {
|
|
|
+ dataExistMap[v.DataTime.Format(utils.FormatDate)] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 指标
|
|
|
+ r := new(models.SaveNationalStatisticsIndexAndDataReq)
|
|
|
+ r.Index = &models.BaseFromNationalStatisticsIndex{
|
|
|
+ BaseFromNationalStatisticsClassifyId: classifyId,
|
|
|
+ Dbcode: dbCode,
|
|
|
+ IndexCode: indexCode,
|
|
|
+ IndexName: q.Name,
|
|
|
+ Frequency: frequency,
|
|
|
+ Unit: q.Unit,
|
|
|
+ Reg: reg.Name,
|
|
|
+ CreateTime: time.Now().Local(),
|
|
|
+ ModifyTime: time.Now().Local(),
|
|
|
+ }
|
|
|
+ if indexExistMap[indexCode] {
|
|
|
+ r.IndexExist = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据
|
|
|
+ // zb.A01010201_reg.110000_sj.201608
|
|
|
+ for _, d := range dateNodes {
|
|
|
+ k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", q.Code, "reg", reg.Code, "sj", d.Code)
|
|
|
+ v := dataMap[k]
|
|
|
+ if !v.Data.HasData {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ t, e := formatMonth2YearDateCode(d.Code)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if dataExistMap[t.Format(utils.FormatDate)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据map
|
|
|
+ if indexDataMap[indexCode] == nil {
|
|
|
+ indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
|
|
|
+ }
|
|
|
+ indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
|
|
|
+ IndexCode: indexCode,
|
|
|
+ DataTime: t,
|
|
|
+ Value: v.Data.Data,
|
|
|
+ CreateTime: time.Now().Local(),
|
|
|
+ ModifyTime: time.Now().Local(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ indexDataList = append(indexDataList, r)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 保存指标
|
|
|
+ for _, v := range indexDataList {
|
|
|
+ ds := indexDataMap[v.Index.IndexCode]
|
|
|
+ if ds == nil || (ds != nil && len(ds) == 0) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ v.DataList = ds
|
|
|
+ if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
|
|
|
+ err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
|
|
|
+func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
|
|
|
+ if len(dbs) == 0 {
|
|
|
+ dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ d := strings.Join(dbs, ",")
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
|
|
|
+ go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("统计局-同步%s数据库成功", d)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 需要同步的数据库
|
|
|
+ for _, d := range dbs {
|
|
|
+ classifyOB := new(models.BaseFromNationalStatisticsClassify)
|
|
|
+ // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
|
|
|
+ classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
|
|
|
+ classifyPars := make([]interface{}, 0)
|
|
|
+ classifyPars = append(classifyPars, d)
|
|
|
+ classifyOrder := ` base_from_national_statistics_classify_id ASC`
|
|
|
+ classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
|
|
|
+
|
|
|
+ // 同步指标和数据
|
|
|
+ for _, c := range classifyList {
|
|
|
+ utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
|
|
|
+ if e = SyncXRegYDateZQuotaDbData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
|
|
|
+ err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SyncXRegYDateZQuotaDbData 同步三维度X轴-地区, Y轴-日期, Z轴-指标的数据
|
|
|
+func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ // 根据DbCode判断频度和查询的时间区间
|
|
|
+ frequency := ""
|
|
|
+ timeParam := ""
|
|
|
+ if strings.Contains(dbCode, "yd") {
|
|
|
+ timeParam = "LAST36" // 最近36个月
|
|
|
+ frequency = "月度"
|
|
|
+ }
|
|
|
+ if strings.Contains(dbCode, "jd") {
|
|
|
+ timeParam = "LAST18" // 最近18个季度
|
|
|
+ frequency = "季度"
|
|
|
+ }
|
|
|
+ if strings.Contains(dbCode, "nd") {
|
|
|
+ timeParam = "LAST20" // 最近20年
|
|
|
+ frequency = "年度"
|
|
|
+ }
|
|
|
+
|
|
|
+ // 先以指标作为行进行默认查询, 取出其中的指标作为当前分类的指标
|
|
|
+ //f := url.Values{}
|
|
|
+ //f.Add("m", "QueryData")
|
|
|
+ //f.Add("dbcode", "gatyd")
|
|
|
+ //f.Add("rowcode", "zb")
|
|
|
+ //f.Add("colcode", "reg")
|
|
|
+ //f.Add("wds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
|
|
|
+ //f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A01"}]`)
|
|
|
+ var defaultReq DataApiReq
|
|
|
+ defaultReq.DbCode = dbCode
|
|
|
+ defaultReq.RowCode = "zb"
|
|
|
+ defaultReq.ColCode = "reg"
|
|
|
+ defaultReq.WdsList = append(defaultReq.WdsList, Wds{
|
|
|
+ WdCode: "sj",
|
|
|
+ ValueCode: timeParam,
|
|
|
+ })
|
|
|
+ defaultReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
|
|
|
+ WdCode: "zb",
|
|
|
+ ValueCode: classifyCode,
|
|
|
+ })
|
|
|
+ defaultResult, e := CommonDataApiRequest(defaultReq)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("默认查询数据失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var quotaWds []QuotaWdNodeData
|
|
|
+ for _, n := range defaultResult.ReturnData.WdNodes {
|
|
|
+ if n.WdCode == "zb" {
|
|
|
+ quotaWds = n.Nodes
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历指标维度
|
|
|
+ for _, quota := range quotaWds {
|
|
|
+ //f := url.Values{}
|
|
|
+ //f.Add("m", "QueryData")
|
|
|
+ //f.Add("dbcode", "gatyd")
|
|
|
+ //f.Add("rowcode", "sj")
|
|
|
+ //f.Add("colcode", "reg")
|
|
|
+ //f.Add("wds", `[{"wdcode":"zb","valuecode":"A010A"}]`)
|
|
|
+ //f.Add("dfwds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
|
|
|
+ var dataReq DataApiReq
|
|
|
+ dataReq.DbCode = dbCode
|
|
|
+ dataReq.RowCode = "sj"
|
|
|
+ dataReq.ColCode = "reg"
|
|
|
+ dataReq.WdsList = append(defaultReq.WdsList, Wds{
|
|
|
+ WdCode: "zb",
|
|
|
+ ValueCode: quota.Code,
|
|
|
+ })
|
|
|
+ dataReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
|
|
|
+ WdCode: "sj",
|
|
|
+ ValueCode: timeParam,
|
|
|
+ })
|
|
|
+ attempt := 0
|
|
|
+ resp, e := CommonDataApiRequest(dataReq)
|
|
|
+ if e != nil {
|
|
|
+ //if !strings.Contains(e.Error(), "connection attempt failed") {
|
|
|
+ // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+ // 连接失败重新尝试3次
|
|
|
+ for {
|
|
|
+ time.Sleep(2 * time.Minute)
|
|
|
+ attempt += 1
|
|
|
+ utils.FileLog.Info("当前第%d次重新请求", attempt)
|
|
|
+ resp, e = CommonDataApiRequest(dataReq)
|
|
|
+ if e == nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if attempt >= 3 {
|
|
|
+ s, _ := json.Marshal(dataReq)
|
|
|
+ err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据集
|
|
|
+ dataNodes := resp.ReturnData.DataNodes
|
|
|
+ dataMap := make(map[string]QuotaDataNode)
|
|
|
+ for _, d := range dataNodes {
|
|
|
+ dataMap[d.Code] = d
|
|
|
+ }
|
|
|
+
|
|
|
+ // 取出指标(Y轴), 日期(X轴)
|
|
|
+ wdNodes := resp.ReturnData.WdNodes
|
|
|
+ //var quotaNodes, dateNodes, regNodes []QuotaWdNodeData
|
|
|
+ var dateNodes, regNodes []QuotaWdNodeData
|
|
|
+ for _, w := range wdNodes {
|
|
|
+ switch w.WdCode {
|
|
|
+ case "zb":
|
|
|
+ //quotaNodes = w.Nodes
|
|
|
+ break
|
|
|
+ case "sj":
|
|
|
+ dateNodes = w.Nodes
|
|
|
+ break
|
|
|
+ case "reg":
|
|
|
+ regNodes = w.Nodes
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 指标编码去重, 指标编码+日期数据去重
|
|
|
+ indexOB := new(models.BaseFromNationalStatisticsIndex)
|
|
|
+ indexCond := ` AND dbcode = ?`
|
|
|
+ indexPars := make([]interface{}, 0)
|
|
|
+ indexPars = append(indexPars, dbCode)
|
|
|
+ indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ indexExistMap := make(map[string]bool)
|
|
|
+ for _, v := range indexList {
|
|
|
+ indexExistMap[v.IndexCode] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历XY轴
|
|
|
+ indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
|
|
|
+ indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
|
|
|
+
|
|
|
+ // 遍历X轴-地区
|
|
|
+ for _, reg := range regNodes {
|
|
|
+ // 指标: dbcode+指标code+地区code
|
|
|
+ indexCode := fmt.Sprintf("%s%s%s", dbCode, quota.Code, reg.Code)
|
|
|
+
|
|
|
+ // 数据去重
|
|
|
+ dataExistMap := make(map[string]bool)
|
|
|
+ dataOB := new(models.BaseFromNationalStatisticsData)
|
|
|
+ dataCond := ` AND index_code = ?`
|
|
|
+ dataPars := make([]interface{}, 0)
|
|
|
+ dataPars = append(dataPars, indexCode)
|
|
|
+ dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, v := range dataList {
|
|
|
+ dataExistMap[v.DataTime.Format(utils.FormatDate)] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ r := new(models.SaveNationalStatisticsIndexAndDataReq)
|
|
|
+ r.Index = &models.BaseFromNationalStatisticsIndex{
|
|
|
+ BaseFromNationalStatisticsClassifyId: classifyId,
|
|
|
+ Dbcode: dbCode,
|
|
|
+ IndexCode: indexCode,
|
|
|
+ IndexName: quota.Name,
|
|
|
+ Frequency: frequency,
|
|
|
+ Unit: quota.Unit,
|
|
|
+ Reg: reg.Name,
|
|
|
+ CreateTime: time.Now().Local(),
|
|
|
+ ModifyTime: time.Now().Local(),
|
|
|
+ }
|
|
|
+ if indexExistMap[indexCode] {
|
|
|
+ r.IndexExist = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历Y轴-日期
|
|
|
+ for _, d := range dateNodes {
|
|
|
+ k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", quota.Code, "reg", reg.Code, "sj", d.Code)
|
|
|
+ v := dataMap[k]
|
|
|
+ if !v.Data.HasData {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 日期去重
|
|
|
+ t, e := formatMonth2YearDateCode(d.Code)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if dataExistMap[t.Format(utils.FormatDate)] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据map
|
|
|
+ if indexDataMap[indexCode] == nil {
|
|
|
+ indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
|
|
|
+ }
|
|
|
+ indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
|
|
|
+ IndexCode: indexCode,
|
|
|
+ DataTime: t,
|
|
|
+ Value: v.Data.Data,
|
|
|
+ CreateTime: time.Now().Local(),
|
|
|
+ ModifyTime: time.Now().Local(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ indexDataList = append(indexDataList, r)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 保存指标
|
|
|
+ for _, v := range indexDataList {
|
|
|
+ ds := indexDataMap[v.Index.IndexCode]
|
|
|
+ if ds == nil || (ds != nil && len(ds) == 0) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ v.DataList = ds
|
|
|
+ if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
|
|
|
+ err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|