icpi.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package services
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "eta/eta_task/models/data_manage"
  6. "eta/eta_task/utils"
  7. "fmt"
  8. "time"
  9. )
  10. // ICPI消费指数指标数据同步
  11. func SyncBaseFromIcpi() (err error) {
  12. var startDate string
  13. maxDate, err := data_manage.GetBaseFromIcpiIndexMaxDate()
  14. if err != nil || maxDate.IsZero() {
  15. startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
  16. } else {
  17. startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
  18. }
  19. method := `index/list`
  20. data := make(map[string]interface{})
  21. data["Source"] = utils.DATA_SOURCE_ICPI
  22. data["StartDate"] = startDate
  23. //data["EndDate"] = endDate
  24. result, err := HttpPost("SyncRankingFromIcpi", method, data)
  25. utils.FileLog.Info(result)
  26. fmt.Println(result)
  27. respObj := new(data_manage.IcpiIndexResp)
  28. err = json.Unmarshal([]byte(result), &respObj)
  29. if err != nil {
  30. utils.FileLog.Info("err:", err.Error())
  31. fmt.Println("err:", err.Error())
  32. return err
  33. }
  34. //获取所有指标信息 某一天的
  35. /*allIndex, err := data_manage.GetBaseFromIcpiIndexAll(startDate)
  36. if err != nil {
  37. return
  38. }
  39. existIndexMap := make(map[int]*data_manage.BaseFromIcpiIndex)
  40. for _, v := range allIndex {
  41. existIndexMap[v.BaseFromIcpiIndexId] = v
  42. }*/
  43. icpiObj := new(data_manage.BaseFromIcpiIndex)
  44. for _, zv := range respObj.Data {
  45. newID, err := icpiObj.InsertOrUpdateBaseFromIcpiIndex(zv)
  46. if err != nil {
  47. fmt.Println("InsertOrUpdateBaseFromIcpiIndex error:", err)
  48. }
  49. fmt.Println("InsertOrUpdateBaseFromIcpiIndex new indexID:", newID)
  50. }
  51. return err
  52. }
  53. // ICPI消费指数-分类
  54. func SyncBaseFromIcpiClassify() (err error) {
  55. data := make(map[string]interface{})
  56. data["Source"] = utils.DATA_SOURCE_ICPI
  57. method := `classify/list`
  58. result, err := HttpPost("SyncBaseFromIcpiClassify", method, data)
  59. if err != nil {
  60. fmt.Println("SyncBaseFromIcpiClassify HttpPost Err:", err.Error())
  61. }
  62. utils.FileLog.Info(result)
  63. fmt.Println("SyncBaseFromIcpiClassify result:", result)
  64. respObj := new(data_manage.IcpiClassifyResp)
  65. err = json.Unmarshal([]byte(result), &respObj)
  66. if err != nil {
  67. return err
  68. }
  69. //获取所有分类
  70. /* allClassify, err := data_manage.GetBaseFromIcpiClassifyAll()
  71. if err != nil {
  72. return
  73. }
  74. existIndexMap := make(map[int]*data_manage.BaseFromIcpiClassify)
  75. for _, v := range allClassify {
  76. existIndexMap[v.BaseFromIcpiClassifyId] = v
  77. }*/
  78. for _, item := range respObj.Data {
  79. //if _, ok := existIndexMap[item.BaseFromIcpiClassifyId]; !ok {
  80. newID, err := data_manage.InsertOrUpdateBaseFromIcpiClassify(item)
  81. if err != nil {
  82. fmt.Println("InsertOrUpdateBaseFromIcpiClassify error:", err)
  83. }
  84. fmt.Println("InsertOrUpdateBaseFromIcpiClassify new indexID:", newID)
  85. //}
  86. }
  87. return err
  88. }
  89. // ICPI消费指数-数据
  90. func SyncBaseFromIcpiData() (err error) {
  91. startDate := time.Now().Format(utils.FormatDate) + " 00:00:00"
  92. //var startDate string
  93. maxDate, err := data_manage.GetBaseFromIcpiMaxDate()
  94. if err != nil || maxDate.IsZero() {
  95. startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
  96. } else {
  97. startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
  98. }
  99. method := `index/data/list_page`
  100. existDataMap := make(map[string]*data_manage.BaseFromIcpiData)
  101. allData, err1 := data_manage.GetAllBaseFromIcpiDataList(startDate)
  102. if err1 != nil {
  103. fmt.Println("get GetAllBaseFromIcpiDataList err:" + err1.Error())
  104. return
  105. }
  106. for _, dv := range allData {
  107. tmpKey := dv.IndexCode + "_" + dv.DataTime
  108. existDataMap[tmpKey] = dv
  109. }
  110. //获取所有指标信息 某一天的
  111. maxPage := 1
  112. for currPage := 0; currPage < maxPage; currPage++ {
  113. data := make(map[string]interface{})
  114. data["Source"] = utils.DATA_SOURCE_ICPI
  115. data["StartDate"] = startDate
  116. data["CurrPage"] = currPage
  117. data["PageSize"] = 500 //
  118. var result string
  119. result, err = HttpPost("SyncBaseFromIcpiData", method, data)
  120. utils.FileLog.Info(result)
  121. fmt.Println(result)
  122. respObj := new(data_manage.BaseFromIcpiDataResp)
  123. err = json.Unmarshal([]byte(result), &respObj)
  124. if err != nil {
  125. fmt.Println("json.Unmarshal err:" + err.Error())
  126. return err
  127. }
  128. if respObj.Ret != 200 {
  129. err = errors.New(respObj.ErrMsg)
  130. return
  131. }
  132. // 总页码数
  133. maxPage = respObj.Data.Paging.Pages
  134. addDataList := make([]*data_manage.BaseFromIcpiData, 0)
  135. if respObj.Data.List != nil && len(respObj.Data.List) > 0 {
  136. for _, dv := range respObj.Data.List {
  137. tmpKey := dv.IndexCode + "_" + dv.DataTime
  138. if _, ok := existDataMap[tmpKey]; !ok {
  139. addDataList = append(addDataList, dv)
  140. existDataMap[tmpKey] = dv
  141. }
  142. }
  143. }
  144. // 最后如果还有数据未插入,那么继续插入吧
  145. if len(addDataList) > 0 {
  146. _, err = data_manage.MultiAddBaseFromIcpiDataIndex(addDataList)
  147. if err != nil {
  148. fmt.Println("MultiAddBaseFromIcpiDataIndex error:", err)
  149. }
  150. }
  151. }
  152. return err
  153. }