national_data.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. package national_data
  2. import (
  3. "context"
  4. "fmt"
  5. "hongze/hongze_data_crawler/models"
  6. "hongze/hongze_data_crawler/services/alarm_msg"
  7. "hongze/hongze_data_crawler/utils"
  8. "strings"
  9. "time"
  10. )
  11. // RefreshNationalDbs 刷新统计局数据(所有)
  12. func RefreshNationalDbs(cont context.Context) (err error) {
  13. utils.FileLog.Info("开始刷新统计局数据")
  14. _ = SyncXDateYQuotaDb([]string{})
  15. _ = SyncXDateYQuotaZRegDb([]string{})
  16. _ = SyncXRegYDateZQuotaDb([]string{})
  17. // 最后更新一下每个指标的开始结束日期
  18. if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
  19. alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
  20. }
  21. utils.FileLog.Info("统计局数据刷新成功")
  22. return
  23. }
  24. // RefreshNationalMonthDbA 刷新月度指标库
  25. func RefreshNationalMonthDbA(cont context.Context) (err error) {
  26. utils.FileLog.Info("统计局-开始同步月度指标库A")
  27. SelectSyncFunc([]string{"hgyd", "fsyd"})
  28. utils.FileLog.Info("统计局-同步月度指标库A成功")
  29. return
  30. }
  31. // RefreshNationalMonthDbB 刷新月度指标库(主要城市月度、港澳台月度)
  32. func RefreshNationalMonthDbB(cont context.Context) (err error) {
  33. utils.FileLog.Info("统计局-开始同步月度指标库B")
  34. SelectSyncFunc([]string{"csyd", "gatyd"})
  35. utils.FileLog.Info("统计局-同步月度指标库B成功")
  36. return
  37. }
  38. // RefreshNationalMonthDbC 刷新月度指标库(国际数据)
  39. func RefreshNationalMonthDbC(cont context.Context) (err error) {
  40. utils.FileLog.Info("统计局-开始同步月度指标库C")
  41. SelectSyncFunc([]string{"gjyd", "gjydsdj", "gjydsc"})
  42. utils.FileLog.Info("统计局-同步月度指标库C成功")
  43. return
  44. }
  45. // RefreshNationalQuarterDb 刷新季度指标库
  46. func RefreshNationalQuarterDb(cont context.Context) (err error) {
  47. utils.FileLog.Info("统计局-开始同步季度指标库")
  48. SelectSyncFunc([]string{"hgjd", "fsjd"})
  49. utils.FileLog.Info("统计局-同步季度指标库成功")
  50. return
  51. }
  52. // RefreshNationalYearDbA 刷新年度指标库(年度数据、分省年度数据)
  53. func RefreshNationalYearDbA(cont context.Context) (err error) {
  54. utils.FileLog.Info("统计局-开始同步年度指标库A")
  55. SelectSyncFunc([]string{"hgnd", "fsnd"})
  56. utils.FileLog.Info("统计局-同步年度指标库A成功")
  57. return
  58. }
  59. // RefreshNationalYearDbB 刷新年度指标库(主要城市年度数据、港澳台年度数据、国际年度数据)
  60. func RefreshNationalYearDbB(cont context.Context) (err error) {
  61. utils.FileLog.Info("统计局-开始同步年度指标库B")
  62. SelectSyncFunc([]string{"csnd", "gatnd", "gjnd"})
  63. utils.FileLog.Info("统计局-同步年度指标库B成功")
  64. return
  65. }
  66. func SelectSyncFunc(dbs []string) {
  67. funcA := []string{"hgyd", "hgjd", "hgnd"}
  68. funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
  69. funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
  70. // 此处要根据不同的指标库选择同步方式
  71. for _, q := range dbs {
  72. if utils.InArrayByStr(funcA, q) {
  73. _ = SyncXDateYQuotaDb([]string{q})
  74. continue
  75. }
  76. if utils.InArrayByStr(funcB, q) {
  77. _ = SyncXDateYQuotaZRegDb([]string{q})
  78. continue
  79. }
  80. if utils.InArrayByStr(funcC, q) {
  81. _ = SyncXRegYDateZQuotaDb([]string{q})
  82. }
  83. }
  84. // 最后更新一下每个指标的开始结束日期
  85. if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
  86. alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
  87. }
  88. return
  89. }
  90. // SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
  91. func SyncXDateYQuotaDb(dbs []string) (err error) {
  92. defer func() {
  93. if err != nil {
  94. utils.FileLog.Error("统计局-同步月度/季度/年度数据失败, ErrMsg: %s", err.Error())
  95. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步月度/季度/年度数据失败, ErrMsg: %s", err.Error()), 3)
  96. return
  97. }
  98. utils.FileLog.Info("统计局-同步月度/季度/年度数据成功")
  99. }()
  100. if len(dbs) == 0 {
  101. dbs = []string{"hgyd", "hgjd", "hgnd"}
  102. }
  103. // 查询无父级的指标分类
  104. for _, d := range dbs {
  105. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  106. classifyCond := ` AND is_parent = 0 AND dbcode = ?`
  107. classifyPars := make([]interface{}, 0)
  108. classifyPars = append(classifyPars, d)
  109. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  110. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  111. if e != nil {
  112. err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
  113. return
  114. }
  115. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  116. // 同步指标和数据
  117. for _, c := range classifyList {
  118. if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
  119. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  120. return
  121. }
  122. }
  123. }
  124. return
  125. }
  126. // SyncXDateYQuotaData 同步两维度X轴-日期, Y轴-指标的数据
  127. func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error) {
  128. defer func() {
  129. if err != nil {
  130. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  131. }
  132. }()
  133. // yd-月度 jd-季度 nd-年度
  134. frequency := ""
  135. timeParam := ""
  136. if strings.Contains(dbCode, "yd") {
  137. timeParam = "LAST36" // 最近36个月
  138. frequency = "月度"
  139. }
  140. if strings.Contains(dbCode, "jd") {
  141. timeParam = "LAST18" // 最近18个季度
  142. frequency = "季度"
  143. }
  144. if strings.Contains(dbCode, "nd") {
  145. timeParam = "LAST20" // 最近20年
  146. frequency = "年度"
  147. }
  148. var dataReq DataApiReq
  149. dataReq.DbCode = dbCode
  150. dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
  151. WdCode: "zb",
  152. ValueCode: classifyCode,
  153. }, Wds{
  154. WdCode: "sj",
  155. ValueCode: timeParam,
  156. })
  157. resp, e := CommonDataApiRequest(dataReq)
  158. if e != nil {
  159. err = fmt.Errorf("请求数据接口失败, Err: %s", e.Error())
  160. return
  161. }
  162. // 数据集
  163. dataNodes := resp.ReturnData.DataNodes
  164. dataMap := make(map[string]QuotaDataNode)
  165. for _, d := range dataNodes {
  166. dataMap[d.Code] = d
  167. }
  168. // 取出指标(Y轴), 日期(X轴)
  169. wdNodes := resp.ReturnData.WdNodes
  170. var quotaNodes, dateNodes []QuotaWdNodeData
  171. for _, w := range wdNodes {
  172. if w.WdCode == "zb" {
  173. quotaNodes = w.Nodes
  174. continue
  175. }
  176. if w.WdCode == "sj" {
  177. dateNodes = w.Nodes
  178. }
  179. }
  180. // 指标编码去重, 指标编码+日期数据去重
  181. indexOB := new(models.BaseFromNationalStatisticsIndex)
  182. indexCond := ``
  183. indexPars := make([]interface{}, 0)
  184. indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
  185. if e != nil {
  186. err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
  187. return
  188. }
  189. indexExistMap := make(map[string]bool)
  190. for _, v := range indexList {
  191. indexExistMap[v.IndexCode] = true
  192. }
  193. dataOB := new(models.BaseFromNationalStatisticsData)
  194. dataCond := ``
  195. dataPars := make([]interface{}, 0)
  196. dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
  197. if e != nil {
  198. err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
  199. return
  200. }
  201. dataExistMap := make(map[string]bool)
  202. for _, v := range dataList {
  203. dataExistMap[fmt.Sprintf("%s%s", v.IndexCode, v.DataTime.Format(utils.FormatDate))] = true
  204. }
  205. // 遍历XY轴
  206. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  207. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  208. for _, q := range quotaNodes {
  209. indexCode := fmt.Sprintf("%s%s", dbCode, q.Code)
  210. // 指标
  211. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  212. r.Index = &models.BaseFromNationalStatisticsIndex{
  213. BaseFromNationalStatisticsClassifyId: classifyId,
  214. Dbcode: dbCode,
  215. IndexCode: indexCode,
  216. IndexName: q.Name,
  217. Frequency: frequency,
  218. CreateTime: time.Now().Local(),
  219. ModifyTime: time.Now().Local(),
  220. }
  221. if indexExistMap[indexCode] {
  222. r.IndexExist = true
  223. }
  224. // 数据
  225. for _, d := range dateNodes {
  226. k := fmt.Sprintf("%s.%s_%s.%s", "zb", q.Code, "sj", d.Code)
  227. v := dataMap[k]
  228. if !v.Data.HasData {
  229. continue
  230. }
  231. // 日期去重
  232. t, e := formatMonth2YearDateCode(d.Code)
  233. if e != nil {
  234. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  235. return
  236. }
  237. existKey := fmt.Sprintf("%s%s", indexCode, t.Format(utils.FormatDate))
  238. if dataExistMap[existKey] {
  239. continue
  240. }
  241. // 数据map
  242. if indexDataMap[indexCode] == nil {
  243. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  244. }
  245. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  246. IndexCode: indexCode,
  247. DataTime: t,
  248. Value: v.Data.Data,
  249. CreateTime: time.Now().Local(),
  250. ModifyTime: time.Now().Local(),
  251. })
  252. }
  253. indexDataList = append(indexDataList, r)
  254. }
  255. // 保存指标
  256. for _, v := range indexDataList {
  257. ds := indexDataMap[v.Index.IndexCode]
  258. if ds == nil || (ds != nil && len(ds) == 0) {
  259. continue
  260. }
  261. v.DataList = ds
  262. if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
  263. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  264. return
  265. }
  266. }
  267. return
  268. }
  269. // SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
  270. func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
  271. defer func() {
  272. if err != nil {
  273. utils.FileLog.Error("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格失败, ErrMsg: %s", err.Error())
  274. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格失败, ErrMsg: %s", err.Error()), 3)
  275. return
  276. }
  277. utils.FileLog.Info("统计局-同步分省月季年度、主要城市月年度、国际市场月度商品价格成功")
  278. }()
  279. if len(dbs) == 0 {
  280. dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
  281. }
  282. // 需要同步的数据库
  283. for _, d := range dbs {
  284. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  285. // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
  286. classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
  287. classifyPars := make([]interface{}, 0)
  288. classifyPars = append(classifyPars, d)
  289. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  290. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  291. if e != nil {
  292. err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
  293. return
  294. }
  295. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  296. // 查询其他维度-地区
  297. wdList, e := GetOtherWd(d, "", "")
  298. var regList []OtherWdNodes
  299. for _, wd := range wdList {
  300. if wd.WdCode == "reg" {
  301. regList = wd.Nodes
  302. break
  303. }
  304. }
  305. if len(regList) == 0 {
  306. err = fmt.Errorf("其他维度为空, DbCode: %s", d)
  307. return
  308. }
  309. // 同步指标和数据
  310. for _, c := range classifyList {
  311. if e = SyncXDateYQuotaZRegData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id, regList); e != nil {
  312. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  313. return
  314. }
  315. }
  316. }
  317. return
  318. }
  319. // SyncXDateYQuotaZRegData 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据
  320. func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regList []OtherWdNodes) (err error) {
  321. defer func() {
  322. if err != nil {
  323. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  324. }
  325. }()
  326. // 根据DbCode判断频度和查询的时间区间
  327. frequency := ""
  328. timeParam := ""
  329. if strings.Contains(dbCode, "yd") {
  330. timeParam = "LAST36" // 最近36个月
  331. frequency = "月度"
  332. }
  333. if strings.Contains(dbCode, "jd") {
  334. timeParam = "LAST18" // 最近18个季度
  335. frequency = "季度"
  336. }
  337. if strings.Contains(dbCode, "nd") {
  338. timeParam = "LAST20" // 最近20年
  339. frequency = "年度"
  340. }
  341. // 遍历地区维度, 查询指标和数据
  342. for _, reg := range regList {
  343. var dataReq DataApiReq
  344. dataReq.DbCode = dbCode
  345. dataReq.WdsList = append(dataReq.WdsList, Wds{
  346. WdCode: "reg",
  347. ValueCode: reg.Code,
  348. })
  349. dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
  350. WdCode: "zb",
  351. ValueCode: classifyCode,
  352. }, Wds{
  353. WdCode: "sj",
  354. ValueCode: timeParam,
  355. })
  356. resp, e := CommonDataApiRequest(dataReq)
  357. if e != nil {
  358. err = fmt.Errorf("获取分类下的指标数据失败, Err: %s", e.Error())
  359. return
  360. }
  361. // 数据集
  362. dataNodes := resp.ReturnData.DataNodes
  363. dataMap := make(map[string]QuotaDataNode)
  364. for _, d := range dataNodes {
  365. dataMap[d.Code] = d
  366. }
  367. // 取出指标(Y轴), 日期(X轴)
  368. wdNodes := resp.ReturnData.WdNodes
  369. var quotaNodes, dateNodes []QuotaWdNodeData
  370. for _, w := range wdNodes {
  371. if w.WdCode == "zb" {
  372. quotaNodes = w.Nodes
  373. continue
  374. }
  375. if w.WdCode == "sj" {
  376. dateNodes = w.Nodes
  377. }
  378. }
  379. // 指标编码去重, 指标编码+日期数据去重
  380. indexOB := new(models.BaseFromNationalStatisticsIndex)
  381. indexCond := ``
  382. indexPars := make([]interface{}, 0)
  383. indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
  384. if e != nil {
  385. err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
  386. return
  387. }
  388. indexExistMap := make(map[string]bool)
  389. for _, v := range indexList {
  390. indexExistMap[v.IndexCode] = true
  391. }
  392. dataOB := new(models.BaseFromNationalStatisticsData)
  393. dataCond := ``
  394. dataPars := make([]interface{}, 0)
  395. dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
  396. if e != nil {
  397. err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
  398. return
  399. }
  400. dataExistMap := make(map[string]bool)
  401. for _, v := range dataList {
  402. dataExistMap[fmt.Sprintf("%s%s", v.IndexCode, v.DataTime.Format(utils.FormatDate))] = true
  403. }
  404. // 遍历XY轴
  405. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  406. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  407. for _, q := range quotaNodes {
  408. // dbcode+指标code+地区code
  409. indexCode := fmt.Sprintf("%s%s%s", dbCode, q.Code, reg.Code)
  410. // 指标
  411. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  412. r.Index = &models.BaseFromNationalStatisticsIndex{
  413. BaseFromNationalStatisticsClassifyId: classifyId,
  414. Dbcode: dbCode,
  415. IndexCode: indexCode,
  416. IndexName: q.Name,
  417. Frequency: frequency,
  418. Reg: reg.Name,
  419. CreateTime: time.Now().Local(),
  420. ModifyTime: time.Now().Local(),
  421. }
  422. if indexExistMap[indexCode] {
  423. r.IndexExist = true
  424. }
  425. // 数据
  426. // zb.A01010201_reg.110000_sj.201608
  427. for _, d := range dateNodes {
  428. k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", q.Code, "reg", reg.Code, "sj", d.Code)
  429. v := dataMap[k]
  430. if !v.Data.HasData {
  431. continue
  432. }
  433. // 日期去重
  434. t, e := formatMonth2YearDateCode(d.Code)
  435. if e != nil {
  436. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  437. return
  438. }
  439. existKey := fmt.Sprintf("%s%s", indexCode, t.Format(utils.FormatDate))
  440. if dataExistMap[existKey] {
  441. continue
  442. }
  443. // 数据map
  444. if indexDataMap[indexCode] == nil {
  445. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  446. }
  447. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  448. IndexCode: indexCode,
  449. DataTime: t,
  450. Value: v.Data.Data,
  451. CreateTime: time.Now().Local(),
  452. ModifyTime: time.Now().Local(),
  453. })
  454. }
  455. indexDataList = append(indexDataList, r)
  456. }
  457. // 保存指标
  458. for _, v := range indexDataList {
  459. ds := indexDataMap[v.Index.IndexCode]
  460. if ds == nil || (ds != nil && len(ds) == 0) {
  461. continue
  462. }
  463. v.DataList = ds
  464. if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
  465. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  466. return
  467. }
  468. }
  469. }
  470. return
  471. }
  472. // SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
  473. func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
  474. defer func() {
  475. if err != nil {
  476. utils.FileLog.Error("统计局-同步港澳台、国际数据指标失败, ErrMsg: %s", err.Error())
  477. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步港澳台、国际数据指标失败, ErrMsg: %s", err.Error()), 3)
  478. return
  479. }
  480. utils.FileLog.Info("统计局-同步港澳台、国际数据指标成功")
  481. }()
  482. if len(dbs) == 0 {
  483. dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
  484. }
  485. // 需要同步的数据库
  486. for _, d := range dbs {
  487. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  488. // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
  489. classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
  490. classifyPars := make([]interface{}, 0)
  491. classifyPars = append(classifyPars, d)
  492. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  493. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  494. if e != nil {
  495. err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
  496. return
  497. }
  498. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  499. // 同步指标和数据
  500. for _, c := range classifyList {
  501. if e = SyncXRegYDateZQuotaDbData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
  502. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  503. return
  504. }
  505. }
  506. }
  507. return
  508. }
  509. // SyncXRegYDateZQuotaDbData 同步三维度X轴-地区, Y轴-日期, Z轴-指标的数据
  510. func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err error) {
  511. defer func() {
  512. if err != nil {
  513. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  514. }
  515. }()
  516. // 根据DbCode判断频度和查询的时间区间
  517. frequency := ""
  518. timeParam := ""
  519. if strings.Contains(dbCode, "yd") {
  520. timeParam = "LAST36" // 最近36个月
  521. frequency = "月度"
  522. }
  523. if strings.Contains(dbCode, "jd") {
  524. timeParam = "LAST18" // 最近18个季度
  525. frequency = "季度"
  526. }
  527. if strings.Contains(dbCode, "nd") {
  528. timeParam = "LAST20" // 最近20年
  529. frequency = "年度"
  530. }
  531. // 先以指标作为行进行默认查询, 取出其中的指标作为当前分类的指标
  532. //f := url.Values{}
  533. //f.Add("m", "QueryData")
  534. //f.Add("dbcode", "gatyd")
  535. //f.Add("rowcode", "zb")
  536. //f.Add("colcode", "reg")
  537. //f.Add("wds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
  538. //f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A01"}]`)
  539. var defaultReq DataApiReq
  540. defaultReq.DbCode = dbCode
  541. defaultReq.RowCode = "zb"
  542. defaultReq.ColCode = "reg"
  543. defaultReq.WdsList = append(defaultReq.WdsList, Wds{
  544. WdCode: "sj",
  545. ValueCode: timeParam,
  546. })
  547. defaultReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
  548. WdCode: "zb",
  549. ValueCode: classifyCode,
  550. })
  551. defaultResult, e := CommonDataApiRequest(defaultReq)
  552. if e != nil {
  553. err = fmt.Errorf("默认查询数据失败, Err: %s", e.Error())
  554. return
  555. }
  556. var quotaWds []QuotaWdNodeData
  557. for _, n := range defaultResult.ReturnData.WdNodes {
  558. if n.WdCode == "zb" {
  559. quotaWds = n.Nodes
  560. break
  561. }
  562. }
  563. // 遍历指标维度
  564. for _, quota := range quotaWds {
  565. //f := url.Values{}
  566. //f.Add("m", "QueryData")
  567. //f.Add("dbcode", "gatyd")
  568. //f.Add("rowcode", "sj")
  569. //f.Add("colcode", "reg")
  570. //f.Add("wds", `[{"wdcode":"zb","valuecode":"A010A"}]`)
  571. //f.Add("dfwds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
  572. var dataReq DataApiReq
  573. dataReq.DbCode = dbCode
  574. dataReq.RowCode = "sj"
  575. dataReq.ColCode = "reg"
  576. dataReq.WdsList = append(defaultReq.WdsList, Wds{
  577. WdCode: "zb",
  578. ValueCode: quota.Code,
  579. })
  580. dataReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
  581. WdCode: "sj",
  582. ValueCode: timeParam,
  583. })
  584. resp, e := CommonDataApiRequest(dataReq)
  585. if e != nil {
  586. err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  587. return
  588. }
  589. // 数据集
  590. dataNodes := resp.ReturnData.DataNodes
  591. dataMap := make(map[string]QuotaDataNode)
  592. for _, d := range dataNodes {
  593. dataMap[d.Code] = d
  594. }
  595. // 取出指标(Y轴), 日期(X轴)
  596. wdNodes := resp.ReturnData.WdNodes
  597. //var quotaNodes, dateNodes, regNodes []QuotaWdNodeData
  598. var dateNodes, regNodes []QuotaWdNodeData
  599. for _, w := range wdNodes {
  600. switch w.WdCode {
  601. case "zb":
  602. //quotaNodes = w.Nodes
  603. break
  604. case "sj":
  605. dateNodes = w.Nodes
  606. break
  607. case "reg":
  608. regNodes = w.Nodes
  609. break
  610. }
  611. }
  612. // 指标编码去重, 指标编码+日期数据去重
  613. indexOB := new(models.BaseFromNationalStatisticsIndex)
  614. indexCond := ``
  615. indexPars := make([]interface{}, 0)
  616. indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
  617. if e != nil {
  618. err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
  619. return
  620. }
  621. indexExistMap := make(map[string]bool)
  622. for _, v := range indexList {
  623. indexExistMap[v.IndexCode] = true
  624. }
  625. dataOB := new(models.BaseFromNationalStatisticsData)
  626. dataCond := ``
  627. dataPars := make([]interface{}, 0)
  628. dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
  629. if e != nil {
  630. err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
  631. return
  632. }
  633. dataExistMap := make(map[string]bool)
  634. for _, v := range dataList {
  635. dataExistMap[fmt.Sprintf("%s%s", v.IndexCode, v.DataTime.Format(utils.FormatDate))] = true
  636. }
  637. // 遍历XY轴
  638. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  639. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  640. // 遍历X轴-地区
  641. for _, reg := range regNodes {
  642. // 指标: dbcode+指标code+地区code
  643. indexCode := fmt.Sprintf("%s%s%s", dbCode, quota.Code, reg.Code)
  644. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  645. r.Index = &models.BaseFromNationalStatisticsIndex{
  646. BaseFromNationalStatisticsClassifyId: classifyId,
  647. Dbcode: dbCode,
  648. IndexCode: indexCode,
  649. IndexName: quota.Name,
  650. Frequency: frequency,
  651. Reg: reg.Name,
  652. CreateTime: time.Now().Local(),
  653. ModifyTime: time.Now().Local(),
  654. }
  655. if indexExistMap[indexCode] {
  656. r.IndexExist = true
  657. }
  658. // 遍历Y轴-日期
  659. for _, d := range dateNodes {
  660. k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", quota.Code, "reg", reg.Code, "sj", d.Code)
  661. v := dataMap[k]
  662. if !v.Data.HasData {
  663. continue
  664. }
  665. // 日期去重
  666. t, e := formatMonth2YearDateCode(d.Code)
  667. if e != nil {
  668. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  669. return
  670. }
  671. existKey := fmt.Sprintf("%s%s", indexCode, t.Format(utils.FormatDate))
  672. if dataExistMap[existKey] {
  673. continue
  674. }
  675. // 数据map
  676. if indexDataMap[indexCode] == nil {
  677. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  678. }
  679. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  680. IndexCode: indexCode,
  681. DataTime: t,
  682. Value: v.Data.Data,
  683. CreateTime: time.Now().Local(),
  684. ModifyTime: time.Now().Local(),
  685. })
  686. }
  687. indexDataList = append(indexDataList, r)
  688. }
  689. // 保存指标
  690. for _, v := range indexDataList {
  691. ds := indexDataMap[v.Index.IndexCode]
  692. if ds == nil || (ds != nil && len(ds) == 0) {
  693. continue
  694. }
  695. v.DataList = ds
  696. if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
  697. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  698. return
  699. }
  700. }
  701. }
  702. return
  703. }