national_data.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. package national_data
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_crawler/models"
  6. "eta/eta_crawler/services/alarm_msg"
  7. "eta/eta_crawler/utils"
  8. "fmt"
  9. "strings"
  10. "time"
  11. )
  12. // 每个库大概同步时间
  13. // hgyd-4h fsyd-9h csyd-3h gatyd-1h gjyd、gjydsdj、gjydsc-15min
  14. // hgjd-4h fsjd-4h
  15. // hgnd-9h fsnd-2day csnd、gatnd、gjnd-1.5h
  16. // RefreshNationalDbs 刷新统计局数据(所有)
  17. func RefreshNationalDbs(cont context.Context) (err error) {
  18. utils.FileLog.Info("开始刷新统计局数据")
  19. _ = SyncXDateYQuotaDb([]string{})
  20. _ = SyncXDateYQuotaZRegDb([]string{})
  21. _ = SyncXRegYDateZQuotaDb([]string{})
  22. // 最后更新一下每个指标的开始结束日期
  23. if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
  24. alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
  25. }
  26. utils.FileLog.Info("统计局数据刷新成功")
  27. return
  28. }
  29. // RefreshNationalMonthDbA 刷新月度指标库
  30. func RefreshNationalMonthDbA(cont context.Context) (err error) {
  31. utils.FileLog.Info("统计局-开始同步月度指标库A")
  32. if err = SelectSyncFunc([]string{"hgyd", "csyd", "gatyd", "gjyd", "gjydsdj", "gjydsc", "fsyd"}); err != nil {
  33. utils.FileLog.Info("统计局-同步月度指标库A失败")
  34. return
  35. }
  36. utils.FileLog.Info("统计局-同步月度指标库A成功")
  37. return
  38. }
  39. // RefreshNationalMonthDbB 刷新月度指标库(分省月度)
  40. func RefreshNationalMonthDbB(cont context.Context) (err error) {
  41. utils.FileLog.Info("统计局-开始同步月度指标库B")
  42. if err = SelectSyncFunc([]string{"fsyd"}); err != nil {
  43. utils.FileLog.Info("统计局-同步月度指标库B失败")
  44. return
  45. }
  46. utils.FileLog.Info("统计局-同步月度指标库B成功")
  47. return
  48. }
  49. // RefreshNationalQuarterDb 刷新季度指标库
  50. func RefreshNationalQuarterDb(cont context.Context) (err error) {
  51. utils.FileLog.Info("统计局-开始同步季度指标库")
  52. if err = SelectSyncFunc([]string{"hgjd", "fsjd"}); err != nil {
  53. utils.FileLog.Info("统计局-同步季度指标库失败")
  54. return
  55. }
  56. utils.FileLog.Info("统计局-同步季度指标库成功")
  57. return
  58. }
  59. // RefreshNationalYearDbA 刷新年度指标库
  60. func RefreshNationalYearDbA(cont context.Context) (err error) {
  61. utils.FileLog.Info("统计局-开始同步年度指标库A")
  62. if err = SelectSyncFunc([]string{"hgnd", "csnd", "gatnd", "gjnd"}); err != nil {
  63. utils.FileLog.Info("统计局-同步年度指标库A失败")
  64. return
  65. }
  66. utils.FileLog.Info("统计局-同步年度指标库A成功")
  67. return
  68. }
  69. // RefreshNationalYearDbB 刷新年度指标库(分省年度数据)
  70. func RefreshNationalYearDbB(cont context.Context) (err error) {
  71. utils.FileLog.Info("统计局-开始同步年度指标库B")
  72. if err = SelectSyncFunc([]string{"fsnd"}); err != nil {
  73. utils.FileLog.Info("统计局-同步年度指标库B失败")
  74. return
  75. }
  76. utils.FileLog.Info("统计局-同步年度指标库B成功")
  77. return
  78. }
  79. func SelectSyncFunc(dbs []string) (err error) {
  80. funcA := []string{"hgyd", "hgjd", "hgnd"}
  81. funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
  82. funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
  83. // 此处要根据不同的指标库选择同步方式
  84. for _, q := range dbs {
  85. if utils.InArrayByStr(funcA, q) {
  86. if err = SyncXDateYQuotaDb([]string{q}); err != nil {
  87. return
  88. }
  89. continue
  90. }
  91. if utils.InArrayByStr(funcB, q) {
  92. if err = SyncXDateYQuotaZRegDb([]string{q}); err != nil {
  93. return
  94. }
  95. continue
  96. }
  97. if utils.InArrayByStr(funcC, q) {
  98. if err = SyncXRegYDateZQuotaDb([]string{q}); err != nil {
  99. return
  100. }
  101. }
  102. }
  103. // 最后更新一下每个指标的开始结束日期
  104. if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
  105. alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
  106. }
  107. return
  108. }
  109. // SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
  110. func SyncXDateYQuotaDb(dbs []string) (err error) {
  111. if len(dbs) == 0 {
  112. dbs = []string{"hgyd", "hgjd", "hgnd"}
  113. }
  114. defer func() {
  115. d := strings.Join(dbs, ",")
  116. if err != nil {
  117. utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
  118. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
  119. return
  120. }
  121. utils.FileLog.Info("统计局-同步%s数据库成功", d)
  122. }()
  123. // 查询无父级的指标分类
  124. for _, d := range dbs {
  125. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  126. classifyCond := ` AND is_parent = 0 AND dbcode = ?`
  127. classifyPars := make([]interface{}, 0)
  128. classifyPars = append(classifyPars, d)
  129. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  130. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  131. if e != nil {
  132. err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
  133. return
  134. }
  135. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  136. // 同步指标和数据
  137. for _, c := range classifyList {
  138. utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  139. if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
  140. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  141. return
  142. }
  143. utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  144. }
  145. }
  146. return
  147. }
  148. // SyncXDateYQuotaData 同步两维度X轴-日期, Y轴-指标的数据
  149. func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error) {
  150. defer func() {
  151. if err != nil {
  152. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  153. }
  154. }()
  155. // yd-月度 jd-季度 nd-年度
  156. frequency := ""
  157. timeParam := ""
  158. if strings.Contains(dbCode, "yd") {
  159. timeParam = "LAST36" // 最近36个月
  160. frequency = "月度"
  161. }
  162. if strings.Contains(dbCode, "jd") {
  163. timeParam = "LAST18" // 最近18个季度
  164. frequency = "季度"
  165. }
  166. if strings.Contains(dbCode, "nd") {
  167. timeParam = "LAST20" // 最近20年
  168. frequency = "年度"
  169. }
  170. var dataReq DataApiReq
  171. dataReq.DbCode = dbCode
  172. dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
  173. WdCode: "zb",
  174. ValueCode: classifyCode,
  175. }, Wds{
  176. WdCode: "sj",
  177. ValueCode: timeParam,
  178. })
  179. attempt := 0
  180. resp, e := CommonDataApiRequest(dataReq)
  181. if e != nil {
  182. //if !strings.Contains(e.Error(), "connection attempt failed") {
  183. // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  184. // return
  185. //}
  186. // 连接失败重新尝试1次
  187. for {
  188. time.Sleep(2 * time.Minute)
  189. attempt += 1
  190. utils.FileLog.Info("当前第%d次重新请求", attempt)
  191. resp, e = CommonDataApiRequest(dataReq)
  192. if e == nil {
  193. break
  194. }
  195. if attempt >= 1 {
  196. s, _ := json.Marshal(dataReq)
  197. err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
  198. return
  199. }
  200. }
  201. }
  202. // 数据集
  203. dataNodes := resp.ReturnData.DataNodes
  204. dataMap := make(map[string]QuotaDataNode)
  205. for _, d := range dataNodes {
  206. dataMap[d.Code] = d
  207. }
  208. // 取出指标(Y轴), 日期(X轴)
  209. wdNodes := resp.ReturnData.WdNodes
  210. var quotaNodes, dateNodes []QuotaWdNodeData
  211. for _, w := range wdNodes {
  212. if w.WdCode == "zb" {
  213. quotaNodes = w.Nodes
  214. continue
  215. }
  216. if w.WdCode == "sj" {
  217. dateNodes = w.Nodes
  218. }
  219. }
  220. // 遍历XY轴
  221. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  222. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  223. for _, q := range quotaNodes {
  224. indexCode := fmt.Sprintf("%s%s", dbCode, q.Code)
  225. // 指标
  226. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  227. r.Index = &models.BaseFromNationalStatisticsIndex{
  228. BaseFromNationalStatisticsClassifyId: classifyId,
  229. Dbcode: dbCode,
  230. IndexCode: indexCode,
  231. IndexName: q.Name,
  232. Frequency: frequency,
  233. Unit: q.Unit,
  234. CreateTime: time.Now().Local(),
  235. ModifyTime: time.Now().Local(),
  236. }
  237. // 数据
  238. for _, d := range dateNodes {
  239. k := fmt.Sprintf("%s.%s_%s.%s", "zb", q.Code, "sj", d.Code)
  240. v := dataMap[k]
  241. if !v.Data.HasData {
  242. continue
  243. }
  244. // 日期处理
  245. t, e := formatMonth2YearDateCode(d.Code)
  246. if e != nil {
  247. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  248. return
  249. }
  250. // 数据map
  251. if indexDataMap[indexCode] == nil {
  252. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  253. }
  254. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  255. IndexCode: indexCode,
  256. DataTime: t,
  257. Value: v.Data.Data,
  258. CreateTime: time.Now().Local(),
  259. ModifyTime: time.Now().Local(),
  260. })
  261. }
  262. indexDataList = append(indexDataList, r)
  263. }
  264. // 保存指标
  265. for _, v := range indexDataList {
  266. ds := indexDataMap[v.Index.IndexCode]
  267. if ds == nil || (ds != nil && len(ds) == 0) {
  268. continue
  269. }
  270. v.DataList = ds
  271. if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
  272. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  273. return
  274. }
  275. }
  276. return
  277. }
  278. // SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
  279. func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
  280. if len(dbs) == 0 {
  281. dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
  282. }
  283. defer func() {
  284. d := strings.Join(dbs, ",")
  285. if err != nil {
  286. utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
  287. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
  288. return
  289. }
  290. utils.FileLog.Info("统计局-同步%s数据库成功", d)
  291. }()
  292. // 需要同步的数据库
  293. for _, d := range dbs {
  294. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  295. // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
  296. classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
  297. classifyPars := make([]interface{}, 0)
  298. classifyPars = append(classifyPars, d)
  299. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  300. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  301. if e != nil {
  302. err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
  303. return
  304. }
  305. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  306. // 查询其他维度-地区
  307. wdList, e := GetOtherWd(d, "", "")
  308. var regList []OtherWdNodes
  309. for _, wd := range wdList {
  310. if wd.WdCode == "reg" {
  311. regList = wd.Nodes
  312. break
  313. }
  314. }
  315. if len(regList) == 0 {
  316. err = fmt.Errorf("其他维度为空, DbCode: %s", d)
  317. return
  318. }
  319. // 同步指标和数据
  320. for _, c := range classifyList {
  321. utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  322. if e = SyncXDateYQuotaZRegData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id, regList); e != nil {
  323. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  324. return
  325. }
  326. utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  327. }
  328. }
  329. return
  330. }
  331. // SyncXDateYQuotaZRegData 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据
  332. func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regList []OtherWdNodes) (err error) {
  333. defer func() {
  334. if err != nil {
  335. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  336. }
  337. }()
  338. // 根据DbCode判断频度和查询的时间区间
  339. frequency := ""
  340. timeParam := ""
  341. if strings.Contains(dbCode, "yd") {
  342. timeParam = "LAST36" // 最近36个月
  343. frequency = "月度"
  344. }
  345. if strings.Contains(dbCode, "jd") {
  346. timeParam = "LAST18" // 最近18个季度
  347. frequency = "季度"
  348. }
  349. if strings.Contains(dbCode, "nd") {
  350. timeParam = "LAST20" // 最近20年
  351. frequency = "年度"
  352. }
  353. // 遍历地区维度, 查询指标和数据
  354. for _, reg := range regList {
  355. var dataReq DataApiReq
  356. dataReq.DbCode = dbCode
  357. dataReq.WdsList = append(dataReq.WdsList, Wds{
  358. WdCode: "reg",
  359. ValueCode: reg.Code,
  360. })
  361. dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
  362. WdCode: "zb",
  363. ValueCode: classifyCode,
  364. }, Wds{
  365. WdCode: "sj",
  366. ValueCode: timeParam,
  367. })
  368. attempt := 0
  369. resp, e := CommonDataApiRequest(dataReq)
  370. if e != nil {
  371. //if !strings.Contains(e.Error(), "connection attempt failed") {
  372. // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  373. // return
  374. //}
  375. // 连接失败重新尝试1次
  376. for {
  377. time.Sleep(2 * time.Minute)
  378. attempt += 1
  379. utils.FileLog.Info("当前第%d次重新请求", attempt)
  380. resp, e = CommonDataApiRequest(dataReq)
  381. if e == nil {
  382. break
  383. }
  384. if attempt >= 1 {
  385. s, _ := json.Marshal(dataReq)
  386. err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
  387. return
  388. }
  389. }
  390. }
  391. // 数据集
  392. dataNodes := resp.ReturnData.DataNodes
  393. dataMap := make(map[string]QuotaDataNode)
  394. for _, d := range dataNodes {
  395. dataMap[d.Code] = d
  396. }
  397. // 取出指标(Y轴), 日期(X轴)
  398. wdNodes := resp.ReturnData.WdNodes
  399. var quotaNodes, dateNodes []QuotaWdNodeData
  400. for _, w := range wdNodes {
  401. if w.WdCode == "zb" {
  402. quotaNodes = w.Nodes
  403. continue
  404. }
  405. if w.WdCode == "sj" {
  406. dateNodes = w.Nodes
  407. }
  408. }
  409. // 遍历XY轴
  410. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  411. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  412. for _, q := range quotaNodes {
  413. // dbcode+指标code+地区code
  414. indexCode := fmt.Sprintf("%s%s%s", dbCode, q.Code, reg.Code)
  415. // 指标
  416. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  417. r.Index = &models.BaseFromNationalStatisticsIndex{
  418. BaseFromNationalStatisticsClassifyId: classifyId,
  419. Dbcode: dbCode,
  420. IndexCode: indexCode,
  421. IndexName: q.Name,
  422. Frequency: frequency,
  423. Unit: q.Unit,
  424. Reg: reg.Name,
  425. CreateTime: time.Now().Local(),
  426. ModifyTime: time.Now().Local(),
  427. }
  428. // 数据
  429. // zb.A01010201_reg.110000_sj.201608
  430. for _, d := range dateNodes {
  431. k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", q.Code, "reg", reg.Code, "sj", d.Code)
  432. v := dataMap[k]
  433. if !v.Data.HasData {
  434. continue
  435. }
  436. // 日期处理
  437. t, e := formatMonth2YearDateCode(d.Code)
  438. if e != nil {
  439. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  440. return
  441. }
  442. // 数据map
  443. if indexDataMap[indexCode] == nil {
  444. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  445. }
  446. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  447. IndexCode: indexCode,
  448. DataTime: t,
  449. Value: v.Data.Data,
  450. CreateTime: time.Now().Local(),
  451. ModifyTime: time.Now().Local(),
  452. })
  453. }
  454. indexDataList = append(indexDataList, r)
  455. }
  456. // 保存指标
  457. for _, v := range indexDataList {
  458. ds := indexDataMap[v.Index.IndexCode]
  459. if ds == nil || (ds != nil && len(ds) == 0) {
  460. continue
  461. }
  462. v.DataList = ds
  463. if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
  464. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  465. return
  466. }
  467. }
  468. }
  469. return
  470. }
  471. // SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
  472. func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
  473. if len(dbs) == 0 {
  474. dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
  475. }
  476. defer func() {
  477. d := strings.Join(dbs, ",")
  478. if err != nil {
  479. utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
  480. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
  481. return
  482. }
  483. utils.FileLog.Info("统计局-同步%s数据库成功", d)
  484. }()
  485. // 需要同步的数据库
  486. for _, d := range dbs {
  487. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  488. // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
  489. classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
  490. classifyPars := make([]interface{}, 0)
  491. classifyPars = append(classifyPars, d)
  492. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  493. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  494. if e != nil {
  495. err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
  496. return
  497. }
  498. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  499. // 同步指标和数据
  500. for _, c := range classifyList {
  501. utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  502. if e = SyncXRegYDateZQuotaDbData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
  503. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  504. return
  505. }
  506. utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  507. }
  508. }
  509. return
  510. }
  511. // SyncXRegYDateZQuotaDbData 同步三维度X轴-地区, Y轴-日期, Z轴-指标的数据
  512. func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err error) {
  513. defer func() {
  514. if err != nil {
  515. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  516. }
  517. }()
  518. // 根据DbCode判断频度和查询的时间区间
  519. frequency := ""
  520. timeParam := ""
  521. if strings.Contains(dbCode, "yd") {
  522. timeParam = "LAST36" // 最近36个月
  523. frequency = "月度"
  524. }
  525. if strings.Contains(dbCode, "jd") {
  526. timeParam = "LAST18" // 最近18个季度
  527. frequency = "季度"
  528. }
  529. if strings.Contains(dbCode, "nd") {
  530. timeParam = "LAST20" // 最近20年
  531. frequency = "年度"
  532. }
  533. // 先以指标作为行进行默认查询, 取出其中的指标作为当前分类的指标
  534. //f := url.Values{}
  535. //f.Add("m", "QueryData")
  536. //f.Add("dbcode", "gatyd")
  537. //f.Add("rowcode", "zb")
  538. //f.Add("colcode", "reg")
  539. //f.Add("wds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
  540. //f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A01"}]`)
  541. var defaultReq DataApiReq
  542. defaultReq.DbCode = dbCode
  543. defaultReq.RowCode = "zb"
  544. defaultReq.ColCode = "reg"
  545. defaultReq.WdsList = append(defaultReq.WdsList, Wds{
  546. WdCode: "sj",
  547. ValueCode: timeParam,
  548. })
  549. defaultReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
  550. WdCode: "zb",
  551. ValueCode: classifyCode,
  552. })
  553. defaultResult, e := CommonDataApiRequest(defaultReq)
  554. if e != nil {
  555. err = fmt.Errorf("默认查询数据失败, Err: %s", e.Error())
  556. return
  557. }
  558. var quotaWds []QuotaWdNodeData
  559. for _, n := range defaultResult.ReturnData.WdNodes {
  560. if n.WdCode == "zb" {
  561. quotaWds = n.Nodes
  562. break
  563. }
  564. }
  565. // 遍历指标维度
  566. for _, quota := range quotaWds {
  567. //f := url.Values{}
  568. //f.Add("m", "QueryData")
  569. //f.Add("dbcode", "gatyd")
  570. //f.Add("rowcode", "sj")
  571. //f.Add("colcode", "reg")
  572. //f.Add("wds", `[{"wdcode":"zb","valuecode":"A010A"}]`)
  573. //f.Add("dfwds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
  574. var dataReq DataApiReq
  575. dataReq.DbCode = dbCode
  576. dataReq.RowCode = "sj"
  577. dataReq.ColCode = "reg"
  578. dataReq.WdsList = append(defaultReq.WdsList, Wds{
  579. WdCode: "zb",
  580. ValueCode: quota.Code,
  581. })
  582. dataReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
  583. WdCode: "sj",
  584. ValueCode: timeParam,
  585. })
  586. attempt := 0
  587. resp, e := CommonDataApiRequest(dataReq)
  588. if e != nil {
  589. //if !strings.Contains(e.Error(), "connection attempt failed") {
  590. // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  591. // return
  592. //}
  593. // 连接失败重新尝试1次
  594. for {
  595. time.Sleep(2 * time.Minute)
  596. attempt += 1
  597. utils.FileLog.Info("当前第%d次重新请求", attempt)
  598. resp, e = CommonDataApiRequest(dataReq)
  599. if e == nil {
  600. break
  601. }
  602. if attempt >= 1 {
  603. s, _ := json.Marshal(dataReq)
  604. err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
  605. return
  606. }
  607. }
  608. }
  609. // 数据集
  610. dataNodes := resp.ReturnData.DataNodes
  611. dataMap := make(map[string]QuotaDataNode)
  612. for _, d := range dataNodes {
  613. dataMap[d.Code] = d
  614. }
  615. // 取出指标(Y轴), 日期(X轴)
  616. wdNodes := resp.ReturnData.WdNodes
  617. //var quotaNodes, dateNodes, regNodes []QuotaWdNodeData
  618. var dateNodes, regNodes []QuotaWdNodeData
  619. for _, w := range wdNodes {
  620. switch w.WdCode {
  621. case "zb":
  622. //quotaNodes = w.Nodes
  623. break
  624. case "sj":
  625. dateNodes = w.Nodes
  626. break
  627. case "reg":
  628. regNodes = w.Nodes
  629. break
  630. }
  631. }
  632. // 遍历XY轴
  633. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  634. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  635. // 遍历X轴-地区
  636. for _, reg := range regNodes {
  637. // 指标: dbcode+指标code+地区code
  638. indexCode := fmt.Sprintf("%s%s%s", dbCode, quota.Code, reg.Code)
  639. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  640. r.Index = &models.BaseFromNationalStatisticsIndex{
  641. BaseFromNationalStatisticsClassifyId: classifyId,
  642. Dbcode: dbCode,
  643. IndexCode: indexCode,
  644. IndexName: quota.Name,
  645. Frequency: frequency,
  646. Unit: quota.Unit,
  647. Reg: reg.Name,
  648. CreateTime: time.Now().Local(),
  649. ModifyTime: time.Now().Local(),
  650. }
  651. // 遍历Y轴-日期
  652. for _, d := range dateNodes {
  653. k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", quota.Code, "reg", reg.Code, "sj", d.Code)
  654. v := dataMap[k]
  655. if !v.Data.HasData {
  656. continue
  657. }
  658. // 日期去重
  659. t, e := formatMonth2YearDateCode(d.Code)
  660. if e != nil {
  661. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  662. return
  663. }
  664. // 数据map
  665. if indexDataMap[indexCode] == nil {
  666. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  667. }
  668. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  669. IndexCode: indexCode,
  670. DataTime: t,
  671. Value: v.Data.Data,
  672. CreateTime: time.Now().Local(),
  673. ModifyTime: time.Now().Local(),
  674. })
  675. }
  676. indexDataList = append(indexDataList, r)
  677. }
  678. // 保存指标
  679. for _, v := range indexDataList {
  680. ds := indexDataMap[v.Index.IndexCode]
  681. if ds == nil || (ds != nil && len(ds) == 0) {
  682. continue
  683. }
  684. v.DataList = ds
  685. if e := models.SaveNationalStatisticsIndexAndData(v.Index, v.DataList); e != nil {
  686. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  687. return
  688. }
  689. }
  690. }
  691. return
  692. }