|
- package national_data
- import (
- "context"
- "encoding/json"
- "eta/eta_crawler/models"
- "eta/eta_crawler/services/alarm_msg"
- "eta/eta_crawler/utils"
- "fmt"
- "strings"
- "time"
- )
- // 每个库大概同步时间
- // hgyd-4h fsyd-9h csyd-3h gatyd-1h gjyd、gjydsdj、gjydsc-15min
- // hgjd-4h fsjd-4h
- // hgnd-9h fsnd-2day csnd、gatnd、gjnd-1.5h
- // RefreshNationalDbs 刷新统计局数据(所有)
- func RefreshNationalDbs(cont context.Context) (err error) {
- utils.FileLog.Info("开始刷新统计局数据")
- _ = SyncXDateYQuotaDb([]string{})
- _ = SyncXDateYQuotaZRegDb([]string{})
- _ = SyncXRegYDateZQuotaDb([]string{})
- // 最后更新一下每个指标的开始结束日期
- if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
- alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
- }
- utils.FileLog.Info("统计局数据刷新成功")
- return
- }
- // RefreshNationalMonthDbA 刷新月度指标库
- func RefreshNationalMonthDbA(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步月度指标库A")
- if err = SelectSyncFunc([]string{"hgyd", "csyd", "gatyd", "gjyd", "gjydsdj", "gjydsc", "fsyd"}); err != nil {
- utils.FileLog.Info("统计局-同步月度指标库A失败")
- return
- }
- utils.FileLog.Info("统计局-同步月度指标库A成功")
- return
- }
- // RefreshNationalMonthDbB 刷新月度指标库(分省月度)
- func RefreshNationalMonthDbB(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步月度指标库B")
- if err = SelectSyncFunc([]string{"fsyd"}); err != nil {
- utils.FileLog.Info("统计局-同步月度指标库B失败")
- return
- }
- utils.FileLog.Info("统计局-同步月度指标库B成功")
- return
- }
- // RefreshNationalQuarterDb 刷新季度指标库
- func RefreshNationalQuarterDb(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步季度指标库")
- if err = SelectSyncFunc([]string{"hgjd", "fsjd"}); err != nil {
- utils.FileLog.Info("统计局-同步季度指标库失败")
- return
- }
- utils.FileLog.Info("统计局-同步季度指标库成功")
- return
- }
- // RefreshNationalYearDbA 刷新年度指标库
- func RefreshNationalYearDbA(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步年度指标库A")
- if err = SelectSyncFunc([]string{"hgnd", "csnd", "gatnd", "gjnd"}); err != nil {
- utils.FileLog.Info("统计局-同步年度指标库A失败")
- return
- }
- utils.FileLog.Info("统计局-同步年度指标库A成功")
- return
- }
- // RefreshNationalYearDbB 刷新年度指标库(分省年度数据)
- func RefreshNationalYearDbB(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步年度指标库B")
- if err = SelectSyncFunc([]string{"fsnd"}); err != nil {
- utils.FileLog.Info("统计局-同步年度指标库B失败")
- return
- }
- utils.FileLog.Info("统计局-同步年度指标库B成功")
- return
- }
- func SelectSyncFunc(dbs []string) (err error) {
- funcA := []string{"hgyd", "hgjd", "hgnd"}
- funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
- funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
- // 此处要根据不同的指标库选择同步方式
- for _, q := range dbs {
- if utils.InArrayByStr(funcA, q) {
- if err = SyncXDateYQuotaDb([]string{q}); err != nil {
- return
- }
- continue
- }
- if utils.InArrayByStr(funcB, q) {
- if err = SyncXDateYQuotaZRegDb([]string{q}); err != nil {
- return
- }
- continue
- }
- if utils.InArrayByStr(funcC, q) {
- if err = SyncXRegYDateZQuotaDb([]string{q}); err != nil {
- return
- }
- }
- }
- // 最后更新一下每个指标的开始结束日期
- if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
- alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
- }
- return
- }
- // SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
- func SyncXDateYQuotaDb(dbs []string) (err error) {
- if len(dbs) == 0 {
- dbs = []string{"hgyd", "hgjd", "hgnd"}
- }
- defer func() {
- d := strings.Join(dbs, ",")
- if err != nil {
- utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
- return
- }
- utils.FileLog.Info("统计局-同步%s数据库成功", d)
- }()
- // 查询无父级的指标分类
- for _, d := range dbs {
- classifyOB := new(models.BaseFromNationalStatisticsClassify)
- classifyCond := ` AND is_parent = 0 AND dbcode = ?`
- classifyPars := make([]interface{}, 0)
- classifyPars = append(classifyPars, d)
- classifyOrder := ` base_from_national_statistics_classify_id ASC`
- classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
- if e != nil {
- err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
- return
- }
- utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
- // 同步指标和数据
- for _, c := range classifyList {
- utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
- err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
- return
- }
- utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- }
- }
- return
- }
- // SyncXDateYQuotaData 同步两维度X轴-日期, Y轴-指标的数据
- func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
- }
- }()
- // yd-月度 jd-季度 nd-年度
- frequency := ""
- timeParam := ""
- if strings.Contains(dbCode, "yd") {
- timeParam = "LAST36" // 最近36个月
- frequency = "月度"
- }
- if strings.Contains(dbCode, "jd") {
- timeParam = "LAST18" // 最近18个季度
- frequency = "季度"
- }
- if strings.Contains(dbCode, "nd") {
- timeParam = "LAST20" // 最近20年
- frequency = "年度"
- }
- var dataReq DataApiReq
- dataReq.DbCode = dbCode
- dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
- WdCode: "zb",
- ValueCode: classifyCode,
- }, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- attempt := 0
- resp, e := CommonDataApiRequest(dataReq)
- if e != nil {
- //if !strings.Contains(e.Error(), "connection attempt failed") {
- // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
- // return
- //}
- // 连接失败重新尝试1次
- for {
- time.Sleep(2 * time.Minute)
- attempt += 1
- utils.FileLog.Info("当前第%d次重新请求", attempt)
- resp, e = CommonDataApiRequest(dataReq)
- if e == nil {
- break
- }
- if attempt >= 1 {
- s, _ := json.Marshal(dataReq)
- err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
- return
- }
- }
- }
- // 数据集
- dataNodes := resp.ReturnData.DataNodes
- dataMap := make(map[string]QuotaDataNode)
- for _, d := range dataNodes {
- dataMap[d.Code] = d
- }
- // 取出指标(Y轴), 日期(X轴)
- wdNodes := resp.ReturnData.WdNodes
- var quotaNodes, dateNodes []QuotaWdNodeData
- for _, w := range wdNodes {
- if w.WdCode == "zb" {
- quotaNodes = w.Nodes
- continue
- }
- if w.WdCode == "sj" {
- dateNodes = w.Nodes
- }
- }
- // 遍历XY轴
- indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
- indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
- for _, q := range quotaNodes {
- indexCode := fmt.Sprintf("%s%s", dbCode, q.Code)
- // 指标
- r := new(models.SaveNationalStatisticsIndexAndDataReq)
- r.Index = &models.BaseFromNationalStatisticsIndex{
- BaseFromNationalStatisticsClassifyId: classifyId,
- Dbcode: dbCode,
- IndexCode: indexCode,
- IndexName: q.Name,
- Frequency: frequency,
- Unit: q.Unit,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- }
- // 数据
- for _, d := range dateNodes {
- k := fmt.Sprintf("%s.%s_%s.%s", "zb", q.Code, "sj", d.Code)
- v := dataMap[k]
- if !v.Data.HasData {
- continue
- }
- // 日期处理
- t, e := formatMonth2YearDateCode(d.Code)
- if e != nil {
- err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
- return
- }
- // 数据map
- if indexDataMap[indexCode] == nil {
- indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
- }
- indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
- IndexCode: indexCode,
- DataTime: t,
- Value: v.Data.Data,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- })
- }
- indexDataList = append(indexDataList, r)
- }
- // 保存指标
- for _, v := range indexDataList {
- ds := indexDataMap[v.Index.IndexCode]
- if ds == nil || (ds != nil && len(ds) == 0) {
- continue
- }
- v.DataList = ds
- if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
- err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
- return
- }
- }
- return
- }
- // SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
- func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
- if len(dbs) == 0 {
- dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
- }
- defer func() {
- d := strings.Join(dbs, ",")
- if err != nil {
- utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
- return
- }
- utils.FileLog.Info("统计局-同步%s数据库成功", d)
- }()
- // 需要同步的数据库
- for _, d := range dbs {
- classifyOB := new(models.BaseFromNationalStatisticsClassify)
- // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
- classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
- classifyPars := make([]interface{}, 0)
- classifyPars = append(classifyPars, d)
- classifyOrder := ` base_from_national_statistics_classify_id ASC`
- classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
- if e != nil {
- err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
- return
- }
- utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
- // 查询其他维度-地区
- wdList, e := GetOtherWd(d, "", "")
- var regList []OtherWdNodes
- for _, wd := range wdList {
- if wd.WdCode == "reg" {
- regList = wd.Nodes
- break
- }
- }
- if len(regList) == 0 {
- err = fmt.Errorf("其他维度为空, DbCode: %s", d)
- return
- }
- // 同步指标和数据
- for _, c := range classifyList {
- utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- if e = SyncXDateYQuotaZRegData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id, regList); e != nil {
- err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
- return
- }
- utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- }
- }
- return
- }
- // SyncXDateYQuotaZRegData 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据
- func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regList []OtherWdNodes) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
- }
- }()
- // 根据DbCode判断频度和查询的时间区间
- frequency := ""
- timeParam := ""
- if strings.Contains(dbCode, "yd") {
- timeParam = "LAST36" // 最近36个月
- frequency = "月度"
- }
- if strings.Contains(dbCode, "jd") {
- timeParam = "LAST18" // 最近18个季度
- frequency = "季度"
- }
- if strings.Contains(dbCode, "nd") {
- timeParam = "LAST20" // 最近20年
- frequency = "年度"
- }
- // 遍历地区维度, 查询指标和数据
- for _, reg := range regList {
- var dataReq DataApiReq
- dataReq.DbCode = dbCode
- dataReq.WdsList = append(dataReq.WdsList, Wds{
- WdCode: "reg",
- ValueCode: reg.Code,
- })
- dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
- WdCode: "zb",
- ValueCode: classifyCode,
- }, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- attempt := 0
- resp, e := CommonDataApiRequest(dataReq)
- if e != nil {
- //if !strings.Contains(e.Error(), "connection attempt failed") {
- // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
- // return
- //}
- // 连接失败重新尝试1次
- for {
- time.Sleep(2 * time.Minute)
- attempt += 1
- utils.FileLog.Info("当前第%d次重新请求", attempt)
- resp, e = CommonDataApiRequest(dataReq)
- if e == nil {
- break
- }
- if attempt >= 1 {
- s, _ := json.Marshal(dataReq)
- err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
- return
- }
- }
- }
- // 数据集
- dataNodes := resp.ReturnData.DataNodes
- dataMap := make(map[string]QuotaDataNode)
- for _, d := range dataNodes {
- dataMap[d.Code] = d
- }
- // 取出指标(Y轴), 日期(X轴)
- wdNodes := resp.ReturnData.WdNodes
- var quotaNodes, dateNodes []QuotaWdNodeData
- for _, w := range wdNodes {
- if w.WdCode == "zb" {
- quotaNodes = w.Nodes
- continue
- }
- if w.WdCode == "sj" {
- dateNodes = w.Nodes
- }
- }
- // 遍历XY轴
- indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
- indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
- for _, q := range quotaNodes {
- // dbcode+指标code+地区code
- indexCode := fmt.Sprintf("%s%s%s", dbCode, q.Code, reg.Code)
- // 指标
- r := new(models.SaveNationalStatisticsIndexAndDataReq)
- r.Index = &models.BaseFromNationalStatisticsIndex{
- BaseFromNationalStatisticsClassifyId: classifyId,
- Dbcode: dbCode,
- IndexCode: indexCode,
- IndexName: q.Name,
- Frequency: frequency,
- Unit: q.Unit,
- Reg: reg.Name,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- }
- // 数据
- // zb.A01010201_reg.110000_sj.201608
- for _, d := range dateNodes {
- k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", q.Code, "reg", reg.Code, "sj", d.Code)
- v := dataMap[k]
- if !v.Data.HasData {
- continue
- }
- // 日期处理
- t, e := formatMonth2YearDateCode(d.Code)
- if e != nil {
- err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
- return
- }
- // 数据map
- if indexDataMap[indexCode] == nil {
- indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
- }
- indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
- IndexCode: indexCode,
- DataTime: t,
- Value: v.Data.Data,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- })
- }
- indexDataList = append(indexDataList, r)
- }
- // 保存指标
- for _, v := range indexDataList {
- ds := indexDataMap[v.Index.IndexCode]
- if ds == nil || (ds != nil && len(ds) == 0) {
- continue
- }
- v.DataList = ds
- if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
- err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
- return
- }
- }
- }
- return
- }
- // SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
- func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
- if len(dbs) == 0 {
- dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
- }
- defer func() {
- d := strings.Join(dbs, ",")
- if err != nil {
- utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
- return
- }
- utils.FileLog.Info("统计局-同步%s数据库成功", d)
- }()
- // 需要同步的数据库
- for _, d := range dbs {
- classifyOB := new(models.BaseFromNationalStatisticsClassify)
- // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
- classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
- classifyPars := make([]interface{}, 0)
- classifyPars = append(classifyPars, d)
- classifyOrder := ` base_from_national_statistics_classify_id ASC`
- classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
- if e != nil {
- err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
- return
- }
- utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
- // 同步指标和数据
- for _, c := range classifyList {
- utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- if e = SyncXRegYDateZQuotaDbData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
- err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
- return
- }
- utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- }
- }
- return
- }
- // SyncXRegYDateZQuotaDbData 同步三维度X轴-地区, Y轴-日期, Z轴-指标的数据
- func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
- }
- }()
- // 根据DbCode判断频度和查询的时间区间
- frequency := ""
- timeParam := ""
- if strings.Contains(dbCode, "yd") {
- timeParam = "LAST36" // 最近36个月
- frequency = "月度"
- }
- if strings.Contains(dbCode, "jd") {
- timeParam = "LAST18" // 最近18个季度
- frequency = "季度"
- }
- if strings.Contains(dbCode, "nd") {
- timeParam = "LAST20" // 最近20年
- frequency = "年度"
- }
- // 先以指标作为行进行默认查询, 取出其中的指标作为当前分类的指标
- //f := url.Values{}
- //f.Add("m", "QueryData")
- //f.Add("dbcode", "gatyd")
- //f.Add("rowcode", "zb")
- //f.Add("colcode", "reg")
- //f.Add("wds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
- //f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A01"}]`)
- var defaultReq DataApiReq
- defaultReq.DbCode = dbCode
- defaultReq.RowCode = "zb"
- defaultReq.ColCode = "reg"
- defaultReq.WdsList = append(defaultReq.WdsList, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- defaultReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
- WdCode: "zb",
- ValueCode: classifyCode,
- })
- defaultResult, e := CommonDataApiRequest(defaultReq)
- if e != nil {
- err = fmt.Errorf("默认查询数据失败, Err: %s", e.Error())
- return
- }
- var quotaWds []QuotaWdNodeData
- for _, n := range defaultResult.ReturnData.WdNodes {
- if n.WdCode == "zb" {
- quotaWds = n.Nodes
- break
- }
- }
- // 遍历指标维度
- for _, quota := range quotaWds {
- //f := url.Values{}
- //f.Add("m", "QueryData")
- //f.Add("dbcode", "gatyd")
- //f.Add("rowcode", "sj")
- //f.Add("colcode", "reg")
- //f.Add("wds", `[{"wdcode":"zb","valuecode":"A010A"}]`)
- //f.Add("dfwds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
- var dataReq DataApiReq
- dataReq.DbCode = dbCode
- dataReq.RowCode = "sj"
- dataReq.ColCode = "reg"
- dataReq.WdsList = append(defaultReq.WdsList, Wds{
- WdCode: "zb",
- ValueCode: quota.Code,
- })
- dataReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- attempt := 0
- resp, e := CommonDataApiRequest(dataReq)
- if e != nil {
- //if !strings.Contains(e.Error(), "connection attempt failed") {
- // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
- // return
- //}
- // 连接失败重新尝试1次
- for {
- time.Sleep(2 * time.Minute)
- attempt += 1
- utils.FileLog.Info("当前第%d次重新请求", attempt)
- resp, e = CommonDataApiRequest(dataReq)
- if e == nil {
- break
- }
- if attempt >= 1 {
- s, _ := json.Marshal(dataReq)
- err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
- return
- }
- }
- }
- // 数据集
- dataNodes := resp.ReturnData.DataNodes
- dataMap := make(map[string]QuotaDataNode)
- for _, d := range dataNodes {
- dataMap[d.Code] = d
- }
- // 取出指标(Y轴), 日期(X轴)
- wdNodes := resp.ReturnData.WdNodes
- //var quotaNodes, dateNodes, regNodes []QuotaWdNodeData
- var dateNodes, regNodes []QuotaWdNodeData
- for _, w := range wdNodes {
- switch w.WdCode {
- case "zb":
- //quotaNodes = w.Nodes
- break
- case "sj":
- dateNodes = w.Nodes
- break
- case "reg":
- regNodes = w.Nodes
- break
- }
- }
- // 遍历XY轴
- indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
- indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
- // 遍历X轴-地区
- for _, reg := range regNodes {
- // 指标: dbcode+指标code+地区code
- indexCode := fmt.Sprintf("%s%s%s", dbCode, quota.Code, reg.Code)
- r := new(models.SaveNationalStatisticsIndexAndDataReq)
- r.Index = &models.BaseFromNationalStatisticsIndex{
- BaseFromNationalStatisticsClassifyId: classifyId,
- Dbcode: dbCode,
- IndexCode: indexCode,
- IndexName: quota.Name,
- Frequency: frequency,
- Unit: quota.Unit,
- Reg: reg.Name,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- }
- // 遍历Y轴-日期
- for _, d := range dateNodes {
- k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", quota.Code, "reg", reg.Code, "sj", d.Code)
- v := dataMap[k]
- if !v.Data.HasData {
- continue
- }
- // 日期去重
- t, e := formatMonth2YearDateCode(d.Code)
- if e != nil {
- err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
- return
- }
- // 数据map
- if indexDataMap[indexCode] == nil {
- indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
- }
- indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
- IndexCode: indexCode,
- DataTime: t,
- Value: v.Data.Data,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- })
- }
- indexDataList = append(indexDataList, r)
- }
- // 保存指标
- for _, v := range indexDataList {
- ds := indexDataMap[v.Index.IndexCode]
- if ds == nil || (ds != nil && len(ds) == 0) {
- continue
- }
- v.DataList = ds
- if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
- err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
- return
- }
- }
- }
- return
- }
|