|
- package national_data
- import (
- "context"
- "encoding/json"
- "eta/eta_crawler/models"
- "eta/eta_crawler/services/alarm_msg"
- "eta/eta_crawler/utils"
- "fmt"
- "strings"
- "time"
- )
- func RefreshNationalDbs(cont context.Context) (err error) {
- utils.FileLog.Info("开始刷新统计局数据")
- _ = SyncXDateYQuotaDb([]string{})
- _ = SyncXDateYQuotaZRegDb([]string{})
- _ = SyncXRegYDateZQuotaDb([]string{})
-
- if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
- alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
- }
- utils.FileLog.Info("统计局数据刷新成功")
- return
- }
- func RefreshNationalMonthDbA(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步月度指标库A")
- if err = SelectSyncFunc([]string{"hgyd", "csyd", "gatyd", "gjyd", "gjydsdj", "gjydsc", "fsyd"}); err != nil {
- utils.FileLog.Info("统计局-同步月度指标库A失败")
- return
- }
- utils.FileLog.Info("统计局-同步月度指标库A成功")
- return
- }
- func RefreshNationalMonthDbB(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步月度指标库B")
- if err = SelectSyncFunc([]string{"fsyd"}); err != nil {
- utils.FileLog.Info("统计局-同步月度指标库B失败")
- return
- }
- utils.FileLog.Info("统计局-同步月度指标库B成功")
- return
- }
- func RefreshNationalQuarterDb(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步季度指标库")
- if err = SelectSyncFunc([]string{"hgjd", "fsjd"}); err != nil {
- utils.FileLog.Info("统计局-同步季度指标库失败")
- return
- }
- utils.FileLog.Info("统计局-同步季度指标库成功")
- return
- }
- func RefreshNationalYearDbA(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步年度指标库A")
- if err = SelectSyncFunc([]string{"hgnd", "csnd", "gatnd", "gjnd"}); err != nil {
- utils.FileLog.Info("统计局-同步年度指标库A失败")
- return
- }
- utils.FileLog.Info("统计局-同步年度指标库A成功")
- return
- }
- func RefreshNationalYearDbB(cont context.Context) (err error) {
- utils.FileLog.Info("统计局-开始同步年度指标库B")
- if err = SelectSyncFunc([]string{"fsnd"}); err != nil {
- utils.FileLog.Info("统计局-同步年度指标库B失败")
- return
- }
- utils.FileLog.Info("统计局-同步年度指标库B成功")
- return
- }
- func SelectSyncFunc(dbs []string) (err error) {
- funcA := []string{"hgyd", "hgjd", "hgnd"}
- funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
- funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
-
- for _, q := range dbs {
- if utils.InArrayByStr(funcA, q) {
- if err = SyncXDateYQuotaDb([]string{q}); err != nil {
- return
- }
- continue
- }
- if utils.InArrayByStr(funcB, q) {
- if err = SyncXDateYQuotaZRegDb([]string{q}); err != nil {
- return
- }
- continue
- }
- if utils.InArrayByStr(funcC, q) {
- if err = SyncXRegYDateZQuotaDb([]string{q}); err != nil {
- return
- }
- }
- }
-
- if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
- alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
- }
- return
- }
- func SyncXDateYQuotaDb(dbs []string) (err error) {
- if len(dbs) == 0 {
- dbs = []string{"hgyd", "hgjd", "hgnd"}
- }
- defer func() {
- d := strings.Join(dbs, ",")
- if err != nil {
- utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
- return
- }
- utils.FileLog.Info("统计局-同步%s数据库成功", d)
- }()
-
- for _, d := range dbs {
- classifyOB := new(models.BaseFromNationalStatisticsClassify)
- classifyCond := ` AND is_parent = 0 AND dbcode = ?`
- classifyPars := make([]interface{}, 0)
- classifyPars = append(classifyPars, d)
- classifyOrder := ` base_from_national_statistics_classify_id ASC`
- classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
- if e != nil {
- err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
- return
- }
- utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
-
- for _, c := range classifyList {
- utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
- err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
- return
- }
- utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- }
- }
- return
- }
- func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
- }
- }()
-
- frequency := ""
- timeParam := ""
- if strings.Contains(dbCode, "yd") {
- timeParam = "LAST36"
- frequency = "月度"
- }
- if strings.Contains(dbCode, "jd") {
- timeParam = "LAST18"
- frequency = "季度"
- }
- if strings.Contains(dbCode, "nd") {
- timeParam = "LAST20"
- frequency = "年度"
- }
- var dataReq DataApiReq
- dataReq.DbCode = dbCode
- dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
- WdCode: "zb",
- ValueCode: classifyCode,
- }, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- attempt := 0
- resp, e := CommonDataApiRequest(dataReq)
- if e != nil {
-
-
-
-
-
- for {
- time.Sleep(2 * time.Minute)
- attempt += 1
- utils.FileLog.Info("当前第%d次重新请求", attempt)
- resp, e = CommonDataApiRequest(dataReq)
- if e == nil {
- break
- }
- if attempt >= 1 {
- s, _ := json.Marshal(dataReq)
- err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
- return
- }
- }
- }
-
- dataNodes := resp.ReturnData.DataNodes
- dataMap := make(map[string]QuotaDataNode)
- for _, d := range dataNodes {
- dataMap[d.Code] = d
- }
-
- wdNodes := resp.ReturnData.WdNodes
- var quotaNodes, dateNodes []QuotaWdNodeData
- for _, w := range wdNodes {
- if w.WdCode == "zb" {
- quotaNodes = w.Nodes
- continue
- }
- if w.WdCode == "sj" {
- dateNodes = w.Nodes
- }
- }
-
- indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
- indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
- for _, q := range quotaNodes {
- indexCode := fmt.Sprintf("%s%s", dbCode, q.Code)
-
- r := new(models.SaveNationalStatisticsIndexAndDataReq)
- r.Index = &models.BaseFromNationalStatisticsIndex{
- BaseFromNationalStatisticsClassifyId: classifyId,
- Dbcode: dbCode,
- IndexCode: indexCode,
- IndexName: q.Name,
- Frequency: frequency,
- Unit: q.Unit,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- }
-
- for _, d := range dateNodes {
- k := fmt.Sprintf("%s.%s_%s.%s", "zb", q.Code, "sj", d.Code)
- v := dataMap[k]
- if !v.Data.HasData {
- continue
- }
-
- t, e := formatMonth2YearDateCode(d.Code)
- if e != nil {
- err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
- return
- }
-
- if indexDataMap[indexCode] == nil {
- indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
- }
- indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
- IndexCode: indexCode,
- DataTime: t,
- Value: v.Data.Data,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- })
- }
- indexDataList = append(indexDataList, r)
- }
-
- for _, v := range indexDataList {
- ds := indexDataMap[v.Index.IndexCode]
- if ds == nil || (ds != nil && len(ds) == 0) {
- continue
- }
- v.DataList = ds
- if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
- err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
- return
- }
- }
- return
- }
- func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
- if len(dbs) == 0 {
- dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
- }
- defer func() {
- d := strings.Join(dbs, ",")
- if err != nil {
- utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
- return
- }
- utils.FileLog.Info("统计局-同步%s数据库成功", d)
- }()
-
- for _, d := range dbs {
- classifyOB := new(models.BaseFromNationalStatisticsClassify)
-
- classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
- classifyPars := make([]interface{}, 0)
- classifyPars = append(classifyPars, d)
- classifyOrder := ` base_from_national_statistics_classify_id ASC`
- classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
- if e != nil {
- err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
- return
- }
- utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
-
- wdList, e := GetOtherWd(d, "", "")
- var regList []OtherWdNodes
- for _, wd := range wdList {
- if wd.WdCode == "reg" {
- regList = wd.Nodes
- break
- }
- }
- if len(regList) == 0 {
- err = fmt.Errorf("其他维度为空, DbCode: %s", d)
- return
- }
-
- for _, c := range classifyList {
- utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- if e = SyncXDateYQuotaZRegData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id, regList); e != nil {
- err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
- return
- }
- utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- }
- }
- return
- }
- func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regList []OtherWdNodes) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
- }
- }()
-
- frequency := ""
- timeParam := ""
- if strings.Contains(dbCode, "yd") {
- timeParam = "LAST36"
- frequency = "月度"
- }
- if strings.Contains(dbCode, "jd") {
- timeParam = "LAST18"
- frequency = "季度"
- }
- if strings.Contains(dbCode, "nd") {
- timeParam = "LAST20"
- frequency = "年度"
- }
-
- for _, reg := range regList {
- var dataReq DataApiReq
- dataReq.DbCode = dbCode
- dataReq.WdsList = append(dataReq.WdsList, Wds{
- WdCode: "reg",
- ValueCode: reg.Code,
- })
- dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
- WdCode: "zb",
- ValueCode: classifyCode,
- }, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- attempt := 0
- resp, e := CommonDataApiRequest(dataReq)
- if e != nil {
-
-
-
-
-
- for {
- time.Sleep(2 * time.Minute)
- attempt += 1
- utils.FileLog.Info("当前第%d次重新请求", attempt)
- resp, e = CommonDataApiRequest(dataReq)
- if e == nil {
- break
- }
- if attempt >= 1 {
- s, _ := json.Marshal(dataReq)
- err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
- return
- }
- }
- }
-
- dataNodes := resp.ReturnData.DataNodes
- dataMap := make(map[string]QuotaDataNode)
- for _, d := range dataNodes {
- dataMap[d.Code] = d
- }
-
- wdNodes := resp.ReturnData.WdNodes
- var quotaNodes, dateNodes []QuotaWdNodeData
- for _, w := range wdNodes {
- if w.WdCode == "zb" {
- quotaNodes = w.Nodes
- continue
- }
- if w.WdCode == "sj" {
- dateNodes = w.Nodes
- }
- }
-
- indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
- indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
- for _, q := range quotaNodes {
-
- indexCode := fmt.Sprintf("%s%s%s", dbCode, q.Code, reg.Code)
-
- r := new(models.SaveNationalStatisticsIndexAndDataReq)
- r.Index = &models.BaseFromNationalStatisticsIndex{
- BaseFromNationalStatisticsClassifyId: classifyId,
- Dbcode: dbCode,
- IndexCode: indexCode,
- IndexName: q.Name,
- Frequency: frequency,
- Unit: q.Unit,
- Reg: reg.Name,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- }
-
-
- for _, d := range dateNodes {
- k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", q.Code, "reg", reg.Code, "sj", d.Code)
- v := dataMap[k]
- if !v.Data.HasData {
- continue
- }
-
- t, e := formatMonth2YearDateCode(d.Code)
- if e != nil {
- err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
- return
- }
-
- if indexDataMap[indexCode] == nil {
- indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
- }
- indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
- IndexCode: indexCode,
- DataTime: t,
- Value: v.Data.Data,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- })
- }
- indexDataList = append(indexDataList, r)
- }
-
- for _, v := range indexDataList {
- ds := indexDataMap[v.Index.IndexCode]
- if ds == nil || (ds != nil && len(ds) == 0) {
- continue
- }
- v.DataList = ds
- if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
- err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
- return
- }
- }
- }
- return
- }
- func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
- if len(dbs) == 0 {
- dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
- }
- defer func() {
- d := strings.Join(dbs, ",")
- if err != nil {
- utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
- return
- }
- utils.FileLog.Info("统计局-同步%s数据库成功", d)
- }()
-
- for _, d := range dbs {
- classifyOB := new(models.BaseFromNationalStatisticsClassify)
-
- classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
- classifyPars := make([]interface{}, 0)
- classifyPars = append(classifyPars, d)
- classifyOrder := ` base_from_national_statistics_classify_id ASC`
- classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
- if e != nil {
- err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
- return
- }
- utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
-
- for _, c := range classifyList {
- utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- if e = SyncXRegYDateZQuotaDbData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
- err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
- return
- }
- utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
- }
- }
- return
- }
- func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
- }
- }()
-
- frequency := ""
- timeParam := ""
- if strings.Contains(dbCode, "yd") {
- timeParam = "LAST36"
- frequency = "月度"
- }
- if strings.Contains(dbCode, "jd") {
- timeParam = "LAST18"
- frequency = "季度"
- }
- if strings.Contains(dbCode, "nd") {
- timeParam = "LAST20"
- frequency = "年度"
- }
-
-
-
-
-
-
-
-
- var defaultReq DataApiReq
- defaultReq.DbCode = dbCode
- defaultReq.RowCode = "zb"
- defaultReq.ColCode = "reg"
- defaultReq.WdsList = append(defaultReq.WdsList, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- defaultReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
- WdCode: "zb",
- ValueCode: classifyCode,
- })
- defaultResult, e := CommonDataApiRequest(defaultReq)
- if e != nil {
- err = fmt.Errorf("默认查询数据失败, Err: %s", e.Error())
- return
- }
- var quotaWds []QuotaWdNodeData
- for _, n := range defaultResult.ReturnData.WdNodes {
- if n.WdCode == "zb" {
- quotaWds = n.Nodes
- break
- }
- }
-
- for _, quota := range quotaWds {
-
-
-
-
-
-
-
- var dataReq DataApiReq
- dataReq.DbCode = dbCode
- dataReq.RowCode = "sj"
- dataReq.ColCode = "reg"
- dataReq.WdsList = append(defaultReq.WdsList, Wds{
- WdCode: "zb",
- ValueCode: quota.Code,
- })
- dataReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
- WdCode: "sj",
- ValueCode: timeParam,
- })
- attempt := 0
- resp, e := CommonDataApiRequest(dataReq)
- if e != nil {
-
-
-
-
-
- for {
- time.Sleep(2 * time.Minute)
- attempt += 1
- utils.FileLog.Info("当前第%d次重新请求", attempt)
- resp, e = CommonDataApiRequest(dataReq)
- if e == nil {
- break
- }
- if attempt >= 1 {
- s, _ := json.Marshal(dataReq)
- err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
- return
- }
- }
- }
-
- dataNodes := resp.ReturnData.DataNodes
- dataMap := make(map[string]QuotaDataNode)
- for _, d := range dataNodes {
- dataMap[d.Code] = d
- }
-
- wdNodes := resp.ReturnData.WdNodes
-
- var dateNodes, regNodes []QuotaWdNodeData
- for _, w := range wdNodes {
- switch w.WdCode {
- case "zb":
-
- break
- case "sj":
- dateNodes = w.Nodes
- break
- case "reg":
- regNodes = w.Nodes
- break
- }
- }
-
- indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
- indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
-
- for _, reg := range regNodes {
-
- indexCode := fmt.Sprintf("%s%s%s", dbCode, quota.Code, reg.Code)
- r := new(models.SaveNationalStatisticsIndexAndDataReq)
- r.Index = &models.BaseFromNationalStatisticsIndex{
- BaseFromNationalStatisticsClassifyId: classifyId,
- Dbcode: dbCode,
- IndexCode: indexCode,
- IndexName: quota.Name,
- Frequency: frequency,
- Unit: quota.Unit,
- Reg: reg.Name,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- }
-
- for _, d := range dateNodes {
- k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", quota.Code, "reg", reg.Code, "sj", d.Code)
- v := dataMap[k]
- if !v.Data.HasData {
- continue
- }
-
- t, e := formatMonth2YearDateCode(d.Code)
- if e != nil {
- err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
- return
- }
-
- if indexDataMap[indexCode] == nil {
- indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
- }
- indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
- IndexCode: indexCode,
- DataTime: t,
- Value: v.Data.Data,
- CreateTime: time.Now().Local(),
- ModifyTime: time.Now().Local(),
- })
- }
- indexDataList = append(indexDataList, r)
- }
-
- for _, v := range indexDataList {
- ds := indexDataMap[v.Index.IndexCode]
- if ds == nil || (ds != nil && len(ds) == 0) {
- continue
- }
- v.DataList = ds
- if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
- err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
- return
- }
- }
- }
- return
- }
|