national_data.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. package national_data
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "hongze/hongze_data_crawler/models"
  7. "hongze/hongze_data_crawler/services/alarm_msg"
  8. "hongze/hongze_data_crawler/utils"
  9. "strings"
  10. "time"
  11. )
  12. // RefreshNationalDbs 刷新统计局数据(所有)
  13. func RefreshNationalDbs(cont context.Context) (err error) {
  14. utils.FileLog.Info("开始刷新统计局数据")
  15. _ = SyncXDateYQuotaDb([]string{})
  16. _ = SyncXDateYQuotaZRegDb([]string{})
  17. _ = SyncXRegYDateZQuotaDb([]string{})
  18. // 最后更新一下每个指标的开始结束日期
  19. if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
  20. alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
  21. }
  22. utils.FileLog.Info("统计局数据刷新成功")
  23. return
  24. }
  25. // RefreshNationalMonthDbA 刷新月度指标库
  26. func RefreshNationalMonthDbA(cont context.Context) (err error) {
  27. utils.FileLog.Info("统计局-开始同步月度指标库A")
  28. if err = SelectSyncFunc([]string{"hgyd", "fsyd"}); err != nil {
  29. utils.FileLog.Info("统计局-同步月度指标库A失败")
  30. return
  31. }
  32. utils.FileLog.Info("统计局-同步月度指标库A成功")
  33. return
  34. }
  35. // RefreshNationalMonthDbB 刷新月度指标库(主要城市月度、港澳台月度)
  36. func RefreshNationalMonthDbB(cont context.Context) (err error) {
  37. utils.FileLog.Info("统计局-开始同步月度指标库B")
  38. if err = SelectSyncFunc([]string{"csyd", "gatyd"}); err != nil {
  39. utils.FileLog.Info("统计局-同步月度指标库B失败")
  40. return
  41. }
  42. utils.FileLog.Info("统计局-同步月度指标库B成功")
  43. return
  44. }
  45. // RefreshNationalMonthDbC 刷新月度指标库(国际数据)
  46. func RefreshNationalMonthDbC(cont context.Context) (err error) {
  47. utils.FileLog.Info("统计局-开始同步月度指标库C")
  48. if err = SelectSyncFunc([]string{"gjyd", "gjydsdj", "gjydsc"}); err != nil {
  49. utils.FileLog.Info("统计局-同步月度指标库C失败")
  50. return
  51. }
  52. utils.FileLog.Info("统计局-同步月度指标库C成功")
  53. return
  54. }
  55. // RefreshNationalQuarterDb 刷新季度指标库
  56. func RefreshNationalQuarterDb(cont context.Context) (err error) {
  57. utils.FileLog.Info("统计局-开始同步季度指标库")
  58. if err = SelectSyncFunc([]string{"hgjd", "fsjd"}); err != nil {
  59. utils.FileLog.Info("统计局-同步季度指标库失败")
  60. return
  61. }
  62. utils.FileLog.Info("统计局-同步季度指标库成功")
  63. return
  64. }
  65. // RefreshNationalYearDbA 刷新年度指标库(年度数据、分省年度数据)
  66. func RefreshNationalYearDbA(cont context.Context) (err error) {
  67. utils.FileLog.Info("统计局-开始同步年度指标库A")
  68. if err = SelectSyncFunc([]string{"hgnd", "fsnd"}); err != nil {
  69. utils.FileLog.Info("统计局-同步年度指标库A失败")
  70. return
  71. }
  72. //if err = SelectSyncFunc([]string{"fsnd", "csnd", "gatnd", "gjnd"}); err != nil {
  73. // utils.FileLog.Info("统计局-同步年度指标库A失败")
  74. // return
  75. //}
  76. utils.FileLog.Info("统计局-同步年度指标库A成功")
  77. return
  78. }
  79. // RefreshNationalYearDbB 刷新年度指标库(主要城市年度数据、港澳台年度数据、国际年度数据)
  80. func RefreshNationalYearDbB(cont context.Context) (err error) {
  81. utils.FileLog.Info("统计局-开始同步年度指标库B")
  82. if err = SelectSyncFunc([]string{"csnd", "gatnd", "gjnd"}); err != nil {
  83. utils.FileLog.Info("统计局-同步年度指标库B失败")
  84. return
  85. }
  86. utils.FileLog.Info("统计局-同步年度指标库B成功")
  87. return
  88. }
  89. func SelectSyncFunc(dbs []string) (err error) {
  90. funcA := []string{"hgyd", "hgjd", "hgnd"}
  91. funcB := []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
  92. funcC := []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
  93. // 此处要根据不同的指标库选择同步方式
  94. for _, q := range dbs {
  95. if utils.InArrayByStr(funcA, q) {
  96. if err = SyncXDateYQuotaDb([]string{q}); err != nil {
  97. return
  98. }
  99. continue
  100. }
  101. if utils.InArrayByStr(funcB, q) {
  102. if err = SyncXDateYQuotaZRegDb([]string{q}); err != nil {
  103. return
  104. }
  105. continue
  106. }
  107. if utils.InArrayByStr(funcC, q) {
  108. if err = SyncXRegYDateZQuotaDb([]string{q}); err != nil {
  109. return
  110. }
  111. }
  112. }
  113. // 最后更新一下每个指标的开始结束日期
  114. if e := models.UpdateNationalStatisticsIndexStartEndDate(); e != nil {
  115. alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-更新指标开始结束日期失败, ErrMsg: %s", e.Error()), 3)
  116. }
  117. return
  118. }
  119. // SyncXDateYQuotaDb 同步两维度X轴-日期, Y轴-指标数据库(月度/季度/年度数据指标)
  120. func SyncXDateYQuotaDb(dbs []string) (err error) {
  121. if len(dbs) == 0 {
  122. dbs = []string{"hgyd", "hgjd", "hgnd"}
  123. }
  124. defer func() {
  125. d := strings.Join(dbs, ",")
  126. if err != nil {
  127. utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
  128. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
  129. return
  130. }
  131. utils.FileLog.Info("统计局-同步%s数据库成功", d)
  132. }()
  133. // 查询无父级的指标分类
  134. for _, d := range dbs {
  135. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  136. classifyCond := ` AND is_parent = 0 AND dbcode = ?`
  137. classifyPars := make([]interface{}, 0)
  138. classifyPars = append(classifyPars, d)
  139. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  140. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  141. if e != nil {
  142. err = fmt.Errorf("获取指标分类列表失败, Err: %s", e.Error())
  143. return
  144. }
  145. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  146. // 同步指标和数据
  147. for _, c := range classifyList {
  148. utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  149. if e = SyncXDateYQuotaData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
  150. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  151. return
  152. }
  153. utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  154. }
  155. }
  156. return
  157. }
  158. // SyncXDateYQuotaData 同步两维度X轴-日期, Y轴-指标的数据
  159. func SyncXDateYQuotaData(classifyId int, dbCode, classifyCode string) (err error) {
  160. defer func() {
  161. if err != nil {
  162. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  163. }
  164. }()
  165. // yd-月度 jd-季度 nd-年度
  166. frequency := ""
  167. timeParam := ""
  168. if strings.Contains(dbCode, "yd") {
  169. timeParam = "LAST36" // 最近36个月
  170. frequency = "月度"
  171. }
  172. if strings.Contains(dbCode, "jd") {
  173. timeParam = "LAST18" // 最近18个季度
  174. frequency = "季度"
  175. }
  176. if strings.Contains(dbCode, "nd") {
  177. timeParam = "LAST20" // 最近20年
  178. frequency = "年度"
  179. }
  180. var dataReq DataApiReq
  181. dataReq.DbCode = dbCode
  182. dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
  183. WdCode: "zb",
  184. ValueCode: classifyCode,
  185. }, Wds{
  186. WdCode: "sj",
  187. ValueCode: timeParam,
  188. })
  189. attempt := 0
  190. resp, e := CommonDataApiRequest(dataReq)
  191. if e != nil {
  192. //if !strings.Contains(e.Error(), "connection attempt failed") {
  193. // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  194. // return
  195. //}
  196. // 连接失败重新尝试3次
  197. for {
  198. time.Sleep(2 * time.Minute)
  199. attempt += 1
  200. utils.FileLog.Info("当前第%d次重新请求", attempt)
  201. resp, e = CommonDataApiRequest(dataReq)
  202. if e == nil {
  203. break
  204. }
  205. if attempt >= 3 {
  206. s, _ := json.Marshal(dataReq)
  207. err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
  208. return
  209. }
  210. }
  211. }
  212. // 数据集
  213. dataNodes := resp.ReturnData.DataNodes
  214. dataMap := make(map[string]QuotaDataNode)
  215. for _, d := range dataNodes {
  216. dataMap[d.Code] = d
  217. }
  218. // 取出指标(Y轴), 日期(X轴)
  219. wdNodes := resp.ReturnData.WdNodes
  220. var quotaNodes, dateNodes []QuotaWdNodeData
  221. for _, w := range wdNodes {
  222. if w.WdCode == "zb" {
  223. quotaNodes = w.Nodes
  224. continue
  225. }
  226. if w.WdCode == "sj" {
  227. dateNodes = w.Nodes
  228. }
  229. }
  230. // 指标编码去重, 指标编码+日期数据去重
  231. indexOB := new(models.BaseFromNationalStatisticsIndex)
  232. indexCond := ` AND dbcode = ?`
  233. indexPars := make([]interface{}, 0)
  234. indexPars = append(indexPars, dbCode)
  235. indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
  236. if e != nil {
  237. err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
  238. return
  239. }
  240. indexExistMap := make(map[string]bool)
  241. indexCodeArr := make([]string, 0)
  242. for _, v := range indexList {
  243. indexExistMap[v.IndexCode] = true
  244. indexCodeArr = append(indexCodeArr, v.IndexCode)
  245. }
  246. dataExistMap := make(map[string]bool)
  247. if len(indexCodeArr) > 0 {
  248. dataOB := new(models.BaseFromNationalStatisticsData)
  249. dataCond := ` AND index_code IN (` + utils.GetOrmInReplace(len(indexCodeArr)) + `)`
  250. dataPars := make([]interface{}, 0)
  251. dataPars = append(dataPars, indexCodeArr)
  252. dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
  253. if e != nil {
  254. err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
  255. return
  256. }
  257. for _, v := range dataList {
  258. dataExistMap[fmt.Sprintf("%s%s", v.IndexCode, v.DataTime.Format(utils.FormatDate))] = true
  259. }
  260. }
  261. // 遍历XY轴
  262. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  263. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  264. for _, q := range quotaNodes {
  265. indexCode := fmt.Sprintf("%s%s", dbCode, q.Code)
  266. // 指标
  267. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  268. r.Index = &models.BaseFromNationalStatisticsIndex{
  269. BaseFromNationalStatisticsClassifyId: classifyId,
  270. Dbcode: dbCode,
  271. IndexCode: indexCode,
  272. IndexName: q.Name,
  273. Frequency: frequency,
  274. Unit: q.Unit,
  275. CreateTime: time.Now().Local(),
  276. ModifyTime: time.Now().Local(),
  277. }
  278. if indexExistMap[indexCode] {
  279. r.IndexExist = true
  280. }
  281. // 数据
  282. for _, d := range dateNodes {
  283. k := fmt.Sprintf("%s.%s_%s.%s", "zb", q.Code, "sj", d.Code)
  284. v := dataMap[k]
  285. if !v.Data.HasData {
  286. continue
  287. }
  288. // 日期去重
  289. t, e := formatMonth2YearDateCode(d.Code)
  290. if e != nil {
  291. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  292. return
  293. }
  294. existKey := fmt.Sprintf("%s%s", indexCode, t.Format(utils.FormatDate))
  295. if dataExistMap[existKey] {
  296. continue
  297. }
  298. // 数据map
  299. if indexDataMap[indexCode] == nil {
  300. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  301. }
  302. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  303. IndexCode: indexCode,
  304. DataTime: t,
  305. Value: v.Data.Data,
  306. CreateTime: time.Now().Local(),
  307. ModifyTime: time.Now().Local(),
  308. })
  309. }
  310. indexDataList = append(indexDataList, r)
  311. }
  312. // 保存指标
  313. for _, v := range indexDataList {
  314. ds := indexDataMap[v.Index.IndexCode]
  315. if ds == nil || (ds != nil && len(ds) == 0) {
  316. continue
  317. }
  318. v.DataList = ds
  319. if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
  320. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  321. return
  322. }
  323. }
  324. return
  325. }
  326. // SyncXDateYQuotaZRegDb 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据库(分省月季年度、主要城市月年度、国际市场月度商品价格)
  327. func SyncXDateYQuotaZRegDb(dbs []string) (err error) {
  328. if len(dbs) == 0 {
  329. dbs = []string{"fsyd", "fsjd", "fsnd", "csyd", "csnd", "gjydsc"}
  330. }
  331. defer func() {
  332. d := strings.Join(dbs, ",")
  333. if err != nil {
  334. utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
  335. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
  336. return
  337. }
  338. utils.FileLog.Info("统计局-同步%s数据库成功", d)
  339. }()
  340. // 需要同步的数据库
  341. for _, d := range dbs {
  342. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  343. // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
  344. classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
  345. // TODO:记得删
  346. //if d == "fsnd" {
  347. // classifyCond += ` AND base_from_national_statistics_classify_id >= 3771`
  348. //}
  349. classifyPars := make([]interface{}, 0)
  350. classifyPars = append(classifyPars, d)
  351. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  352. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  353. if e != nil {
  354. err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
  355. return
  356. }
  357. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  358. // 查询其他维度-地区
  359. wdList, e := GetOtherWd(d, "", "")
  360. var regList []OtherWdNodes
  361. for _, wd := range wdList {
  362. if wd.WdCode == "reg" {
  363. regList = wd.Nodes
  364. break
  365. }
  366. }
  367. if len(regList) == 0 {
  368. err = fmt.Errorf("其他维度为空, DbCode: %s", d)
  369. return
  370. }
  371. // 同步指标和数据
  372. for _, c := range classifyList {
  373. utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  374. if e = SyncXDateYQuotaZRegData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id, regList); e != nil {
  375. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  376. return
  377. }
  378. utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  379. }
  380. }
  381. return
  382. }
  383. // SyncXDateYQuotaZRegData 同步三维度X轴-日期, Y轴-指标, Z轴-地区的数据
  384. func SyncXDateYQuotaZRegData(classifyId int, dbCode, classifyCode string, regList []OtherWdNodes) (err error) {
  385. defer func() {
  386. if err != nil {
  387. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  388. }
  389. }()
  390. // 根据DbCode判断频度和查询的时间区间
  391. frequency := ""
  392. timeParam := ""
  393. if strings.Contains(dbCode, "yd") {
  394. timeParam = "LAST36" // 最近36个月
  395. frequency = "月度"
  396. }
  397. if strings.Contains(dbCode, "jd") {
  398. timeParam = "LAST18" // 最近18个季度
  399. frequency = "季度"
  400. }
  401. if strings.Contains(dbCode, "nd") {
  402. timeParam = "LAST20" // 最近20年
  403. frequency = "年度"
  404. }
  405. // 遍历地区维度, 查询指标和数据
  406. for _, reg := range regList {
  407. var dataReq DataApiReq
  408. dataReq.DbCode = dbCode
  409. dataReq.WdsList = append(dataReq.WdsList, Wds{
  410. WdCode: "reg",
  411. ValueCode: reg.Code,
  412. })
  413. dataReq.DfwdsList = append(dataReq.DfwdsList, Wds{
  414. WdCode: "zb",
  415. ValueCode: classifyCode,
  416. }, Wds{
  417. WdCode: "sj",
  418. ValueCode: timeParam,
  419. })
  420. attempt := 0
  421. resp, e := CommonDataApiRequest(dataReq)
  422. if e != nil {
  423. //if !strings.Contains(e.Error(), "connection attempt failed") {
  424. // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  425. // return
  426. //}
  427. // 连接失败重新尝试3次
  428. for {
  429. time.Sleep(2 * time.Minute)
  430. attempt += 1
  431. utils.FileLog.Info("当前第%d次重新请求", attempt)
  432. resp, e = CommonDataApiRequest(dataReq)
  433. if e == nil {
  434. break
  435. }
  436. if attempt >= 3 {
  437. s, _ := json.Marshal(dataReq)
  438. err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
  439. return
  440. }
  441. }
  442. }
  443. // 数据集
  444. dataNodes := resp.ReturnData.DataNodes
  445. dataMap := make(map[string]QuotaDataNode)
  446. for _, d := range dataNodes {
  447. dataMap[d.Code] = d
  448. }
  449. // 取出指标(Y轴), 日期(X轴)
  450. wdNodes := resp.ReturnData.WdNodes
  451. var quotaNodes, dateNodes []QuotaWdNodeData
  452. for _, w := range wdNodes {
  453. if w.WdCode == "zb" {
  454. quotaNodes = w.Nodes
  455. continue
  456. }
  457. if w.WdCode == "sj" {
  458. dateNodes = w.Nodes
  459. }
  460. }
  461. // 指标编码去重, 指标编码+日期数据去重
  462. indexOB := new(models.BaseFromNationalStatisticsIndex)
  463. indexCond := ` AND dbcode = ?`
  464. indexPars := make([]interface{}, 0)
  465. indexPars = append(indexPars, dbCode)
  466. indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
  467. if e != nil {
  468. err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
  469. return
  470. }
  471. indexExistMap := make(map[string]bool)
  472. indexCodeArr := make([]string, 0)
  473. for _, v := range indexList {
  474. indexExistMap[v.IndexCode] = true
  475. indexCodeArr = append(indexCodeArr, v.IndexCode)
  476. }
  477. dataExistMap := make(map[string]bool)
  478. if len(indexCodeArr) > 0 {
  479. dataOB := new(models.BaseFromNationalStatisticsData)
  480. dataCond := ` AND index_code IN (` + utils.GetOrmInReplace(len(indexCodeArr)) + `)`
  481. dataPars := make([]interface{}, 0)
  482. dataPars = append(dataPars, indexCodeArr)
  483. dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
  484. if e != nil {
  485. err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
  486. return
  487. }
  488. for _, v := range dataList {
  489. dataExistMap[fmt.Sprintf("%s%s", v.IndexCode, v.DataTime.Format(utils.FormatDate))] = true
  490. }
  491. }
  492. // 遍历XY轴
  493. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  494. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  495. for _, q := range quotaNodes {
  496. // dbcode+指标code+地区code
  497. indexCode := fmt.Sprintf("%s%s%s", dbCode, q.Code, reg.Code)
  498. // 指标
  499. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  500. r.Index = &models.BaseFromNationalStatisticsIndex{
  501. BaseFromNationalStatisticsClassifyId: classifyId,
  502. Dbcode: dbCode,
  503. IndexCode: indexCode,
  504. IndexName: q.Name,
  505. Frequency: frequency,
  506. Unit: q.Unit,
  507. Reg: reg.Name,
  508. CreateTime: time.Now().Local(),
  509. ModifyTime: time.Now().Local(),
  510. }
  511. if indexExistMap[indexCode] {
  512. r.IndexExist = true
  513. }
  514. // 数据
  515. // zb.A01010201_reg.110000_sj.201608
  516. for _, d := range dateNodes {
  517. k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", q.Code, "reg", reg.Code, "sj", d.Code)
  518. v := dataMap[k]
  519. if !v.Data.HasData {
  520. continue
  521. }
  522. // 日期去重
  523. t, e := formatMonth2YearDateCode(d.Code)
  524. if e != nil {
  525. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  526. return
  527. }
  528. existKey := fmt.Sprintf("%s%s", indexCode, t.Format(utils.FormatDate))
  529. if dataExistMap[existKey] {
  530. continue
  531. }
  532. // 数据map
  533. if indexDataMap[indexCode] == nil {
  534. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  535. }
  536. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  537. IndexCode: indexCode,
  538. DataTime: t,
  539. Value: v.Data.Data,
  540. CreateTime: time.Now().Local(),
  541. ModifyTime: time.Now().Local(),
  542. })
  543. }
  544. indexDataList = append(indexDataList, r)
  545. }
  546. // 保存指标
  547. for _, v := range indexDataList {
  548. ds := indexDataMap[v.Index.IndexCode]
  549. if ds == nil || (ds != nil && len(ds) == 0) {
  550. continue
  551. }
  552. v.DataList = ds
  553. if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
  554. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  555. return
  556. }
  557. }
  558. }
  559. return
  560. }
  561. // SyncXRegYDateZQuotaDb 同步三维度X轴-地区, Y轴-日期的数据库(港澳台、国际数据指标)
  562. func SyncXRegYDateZQuotaDb(dbs []string) (err error) {
  563. if len(dbs) == 0 {
  564. dbs = []string{"gatyd", "gatnd", "gjyd", "gjydsdj", "gjnd"}
  565. }
  566. defer func() {
  567. d := strings.Join(dbs, ",")
  568. if err != nil {
  569. utils.FileLog.Error("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error())
  570. go alarm_msg.SendAlarmMsg(fmt.Sprintf("统计局-同步%s数据库失败, ErrMsg: %s", d, err.Error()), 3)
  571. return
  572. }
  573. utils.FileLog.Info("统计局-同步%s数据库成功", d)
  574. }()
  575. // 需要同步的数据库
  576. for _, d := range dbs {
  577. classifyOB := new(models.BaseFromNationalStatisticsClassify)
  578. // 注意此处只需要同步分类中为指标的即可, 分类为地区的数据在指标中均有包含
  579. classifyCond := ` AND is_parent = 0 AND wdcode = 'zb' AND dbcode = ?`
  580. classifyPars := make([]interface{}, 0)
  581. classifyPars = append(classifyPars, d)
  582. classifyOrder := ` base_from_national_statistics_classify_id ASC`
  583. classifyList, e := classifyOB.GetItemsByCondition(classifyCond, classifyPars, []string{}, classifyOrder)
  584. if e != nil {
  585. err = fmt.Errorf("获取%s分类列表失败, Err: %s", d, e.Error())
  586. return
  587. }
  588. utils.FileLog.Info("%s分类长度: %d\n", d, len(classifyList))
  589. // 同步指标和数据
  590. for _, c := range classifyList {
  591. utils.FileLog.Info("开始同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  592. if e = SyncXRegYDateZQuotaDbData(c.BaseFromNationalStatisticsClassifyId, c.Dbcode, c.Id); e != nil {
  593. err = fmt.Errorf("同步指标数据失败, DbCode: %s, ClassifyId: %d, Err: %s", c.Dbcode, c.BaseFromNationalStatisticsClassifyId, e.Error())
  594. return
  595. }
  596. utils.FileLog.Info("结束同步分类-%d: %s", c.BaseFromNationalStatisticsClassifyId, c.ClassifyName)
  597. }
  598. }
  599. return
  600. }
  601. // SyncXRegYDateZQuotaDbData 同步三维度X轴-地区, Y轴-日期, Z轴-指标的数据
  602. func SyncXRegYDateZQuotaDbData(classifyId int, dbCode, classifyCode string) (err error) {
  603. defer func() {
  604. if err != nil {
  605. utils.FileLog.Error("同步数据库DbCode: %s, 分类ClassifyId: %d失败, ErrMsg: %s", dbCode, classifyId, err.Error())
  606. }
  607. }()
  608. // 根据DbCode判断频度和查询的时间区间
  609. frequency := ""
  610. timeParam := ""
  611. if strings.Contains(dbCode, "yd") {
  612. timeParam = "LAST36" // 最近36个月
  613. frequency = "月度"
  614. }
  615. if strings.Contains(dbCode, "jd") {
  616. timeParam = "LAST18" // 最近18个季度
  617. frequency = "季度"
  618. }
  619. if strings.Contains(dbCode, "nd") {
  620. timeParam = "LAST20" // 最近20年
  621. frequency = "年度"
  622. }
  623. // 先以指标作为行进行默认查询, 取出其中的指标作为当前分类的指标
  624. //f := url.Values{}
  625. //f.Add("m", "QueryData")
  626. //f.Add("dbcode", "gatyd")
  627. //f.Add("rowcode", "zb")
  628. //f.Add("colcode", "reg")
  629. //f.Add("wds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
  630. //f.Add("dfwds", `[{"wdcode":"zb","valuecode":"A01"}]`)
  631. var defaultReq DataApiReq
  632. defaultReq.DbCode = dbCode
  633. defaultReq.RowCode = "zb"
  634. defaultReq.ColCode = "reg"
  635. defaultReq.WdsList = append(defaultReq.WdsList, Wds{
  636. WdCode: "sj",
  637. ValueCode: timeParam,
  638. })
  639. defaultReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
  640. WdCode: "zb",
  641. ValueCode: classifyCode,
  642. })
  643. defaultResult, e := CommonDataApiRequest(defaultReq)
  644. if e != nil {
  645. err = fmt.Errorf("默认查询数据失败, Err: %s", e.Error())
  646. return
  647. }
  648. var quotaWds []QuotaWdNodeData
  649. for _, n := range defaultResult.ReturnData.WdNodes {
  650. if n.WdCode == "zb" {
  651. quotaWds = n.Nodes
  652. break
  653. }
  654. }
  655. // 遍历指标维度
  656. for _, quota := range quotaWds {
  657. //f := url.Values{}
  658. //f.Add("m", "QueryData")
  659. //f.Add("dbcode", "gatyd")
  660. //f.Add("rowcode", "sj")
  661. //f.Add("colcode", "reg")
  662. //f.Add("wds", `[{"wdcode":"zb","valuecode":"A010A"}]`)
  663. //f.Add("dfwds", `[{"wdcode":"sj","valuecode":"LAST36"}]`)
  664. var dataReq DataApiReq
  665. dataReq.DbCode = dbCode
  666. dataReq.RowCode = "sj"
  667. dataReq.ColCode = "reg"
  668. dataReq.WdsList = append(defaultReq.WdsList, Wds{
  669. WdCode: "zb",
  670. ValueCode: quota.Code,
  671. })
  672. dataReq.DfwdsList = append(defaultReq.DfwdsList, Wds{
  673. WdCode: "sj",
  674. ValueCode: timeParam,
  675. })
  676. attempt := 0
  677. resp, e := CommonDataApiRequest(dataReq)
  678. if e != nil {
  679. //if !strings.Contains(e.Error(), "connection attempt failed") {
  680. // err = fmt.Errorf("查询数据失败, Err: %s", e.Error())
  681. // return
  682. //}
  683. // 连接失败重新尝试3次
  684. for {
  685. time.Sleep(2 * time.Minute)
  686. attempt += 1
  687. utils.FileLog.Info("当前第%d次重新请求", attempt)
  688. resp, e = CommonDataApiRequest(dataReq)
  689. if e == nil {
  690. break
  691. }
  692. if attempt >= 3 {
  693. s, _ := json.Marshal(dataReq)
  694. err = fmt.Errorf("查询数据重试失败, DataReq: %s", s)
  695. return
  696. }
  697. }
  698. }
  699. // 数据集
  700. dataNodes := resp.ReturnData.DataNodes
  701. dataMap := make(map[string]QuotaDataNode)
  702. for _, d := range dataNodes {
  703. dataMap[d.Code] = d
  704. }
  705. // 取出指标(Y轴), 日期(X轴)
  706. wdNodes := resp.ReturnData.WdNodes
  707. //var quotaNodes, dateNodes, regNodes []QuotaWdNodeData
  708. var dateNodes, regNodes []QuotaWdNodeData
  709. for _, w := range wdNodes {
  710. switch w.WdCode {
  711. case "zb":
  712. //quotaNodes = w.Nodes
  713. break
  714. case "sj":
  715. dateNodes = w.Nodes
  716. break
  717. case "reg":
  718. regNodes = w.Nodes
  719. break
  720. }
  721. }
  722. // 指标编码去重, 指标编码+日期数据去重
  723. indexOB := new(models.BaseFromNationalStatisticsIndex)
  724. indexCond := ` AND dbcode = ?`
  725. indexPars := make([]interface{}, 0)
  726. indexPars = append(indexPars, dbCode)
  727. indexList, e := indexOB.GetItemsByCondition(indexCond, indexPars, []string{"index_code"}, "")
  728. if e != nil {
  729. err = fmt.Errorf("获取指标列表失败, Err: %s", e.Error())
  730. return
  731. }
  732. indexExistMap := make(map[string]bool)
  733. indexCodeArr := make([]string, 0)
  734. for _, v := range indexList {
  735. indexExistMap[v.IndexCode] = true
  736. indexCodeArr = append(indexCodeArr, v.IndexCode)
  737. }
  738. dataExistMap := make(map[string]bool)
  739. if len(indexCodeArr) > 0 {
  740. dataOB := new(models.BaseFromNationalStatisticsData)
  741. dataCond := ` AND index_code IN (` + utils.GetOrmInReplace(len(indexCodeArr)) + `)`
  742. dataPars := make([]interface{}, 0)
  743. dataPars = append(dataPars, indexCodeArr)
  744. dataList, e := dataOB.GetItemsByCondition(dataCond, dataPars, []string{"index_code", "data_time"}, "")
  745. if e != nil {
  746. err = fmt.Errorf("获取指标数据列表失败, Err: %s", e.Error())
  747. return
  748. }
  749. for _, v := range dataList {
  750. dataExistMap[fmt.Sprintf("%s%s", v.IndexCode, v.DataTime.Format(utils.FormatDate))] = true
  751. }
  752. }
  753. // 遍历XY轴
  754. indexDataList := make([]*models.SaveNationalStatisticsIndexAndDataReq, 0)
  755. indexDataMap := make(map[string][]*models.BaseFromNationalStatisticsData)
  756. // 遍历X轴-地区
  757. for _, reg := range regNodes {
  758. // 指标: dbcode+指标code+地区code
  759. indexCode := fmt.Sprintf("%s%s%s", dbCode, quota.Code, reg.Code)
  760. r := new(models.SaveNationalStatisticsIndexAndDataReq)
  761. r.Index = &models.BaseFromNationalStatisticsIndex{
  762. BaseFromNationalStatisticsClassifyId: classifyId,
  763. Dbcode: dbCode,
  764. IndexCode: indexCode,
  765. IndexName: quota.Name,
  766. Frequency: frequency,
  767. Unit: quota.Unit,
  768. Reg: reg.Name,
  769. CreateTime: time.Now().Local(),
  770. ModifyTime: time.Now().Local(),
  771. }
  772. if indexExistMap[indexCode] {
  773. r.IndexExist = true
  774. }
  775. // 遍历Y轴-日期
  776. for _, d := range dateNodes {
  777. k := fmt.Sprintf("%s.%s_%s.%s_%s.%s", "zb", quota.Code, "reg", reg.Code, "sj", d.Code)
  778. v := dataMap[k]
  779. if !v.Data.HasData {
  780. continue
  781. }
  782. // 日期去重
  783. t, e := formatMonth2YearDateCode(d.Code)
  784. if e != nil {
  785. err = fmt.Errorf("格式化日期code失败, Err: %s", e.Error())
  786. return
  787. }
  788. existKey := fmt.Sprintf("%s%s", indexCode, t.Format(utils.FormatDate))
  789. if dataExistMap[existKey] {
  790. continue
  791. }
  792. // 数据map
  793. if indexDataMap[indexCode] == nil {
  794. indexDataMap[indexCode] = make([]*models.BaseFromNationalStatisticsData, 0)
  795. }
  796. indexDataMap[indexCode] = append(indexDataMap[indexCode], &models.BaseFromNationalStatisticsData{
  797. IndexCode: indexCode,
  798. DataTime: t,
  799. Value: v.Data.Data,
  800. CreateTime: time.Now().Local(),
  801. ModifyTime: time.Now().Local(),
  802. })
  803. }
  804. indexDataList = append(indexDataList, r)
  805. }
  806. // 保存指标
  807. for _, v := range indexDataList {
  808. ds := indexDataMap[v.Index.IndexCode]
  809. if ds == nil || (ds != nil && len(ds) == 0) {
  810. continue
  811. }
  812. v.DataList = ds
  813. if e := models.SaveNationalStatisticsIndexAndData(v); e != nil {
  814. err = fmt.Errorf("保存指标和数据失败, Err: %s", e.Error())
  815. return
  816. }
  817. }
  818. }
  819. return
  820. }