source_changes_visitors_covid.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package services
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "errors"
  6. "fmt"
  7. "github.com/chromedp/cdproto/browser"
  8. "github.com/chromedp/chromedp"
  9. "github.com/shopspring/decimal"
  10. "hongze/hongze_data_crawler/models"
  11. "hongze/hongze_data_crawler/services/alarm_msg"
  12. "hongze/hongze_data_crawler/utils"
  13. "io"
  14. "os"
  15. "path/filepath"
  16. "time"
  17. )
  18. func AddSourceChangesVisitorsCovid() (err error) {
  19. utils.FileLog.Info("爬取谷歌出行记录 开始爬取:")
  20. defer func() {
  21. if err != nil {
  22. utils.FileLog.Info("爬取谷歌出行记录失败 Err:" + err.Error())
  23. msg := "失败提醒"+"AddSourceChangesVisitorsCovid ErrMsg:"+err.Error()
  24. go alarm_msg.SendAlarmMsg(msg, 3)
  25. }
  26. }()
  27. fileName, err :=GetSourceChangesVisitorsCovid()
  28. if err != nil {
  29. err = errors.New("爬取谷歌出行记录失败"+err.Error())
  30. return
  31. }
  32. if fileName == "" {
  33. err = errors.New("爬取谷歌出行记录失败, 文件下载失败")
  34. return
  35. }
  36. fs, err := os.Open(fileName)
  37. if err != nil {
  38. err = errors.New("打开文件失败"+err.Error())
  39. return
  40. }
  41. defer fs.Close()
  42. r := csv.NewReader(fs)
  43. //针对大文件,一行一行的读取文件
  44. count := 0
  45. var list []*models.BaseFromChangesVisitorsCovid
  46. now := time.Now()
  47. lastItem, err := models.GetLatestBaseFromChangesVisitorsCovid()
  48. if err != nil {
  49. if err.Error() != utils.ErrNoRow() {
  50. err = errors.New("查询最新的记录失败"+err.Error())
  51. return
  52. }else{
  53. err = nil
  54. }
  55. }
  56. var before20 time.Time
  57. var lastDay time.Time
  58. if lastItem != nil {
  59. lastDay = lastItem.Day
  60. before20 = lastItem.Day.AddDate(0, 0, -20)
  61. }
  62. for {
  63. row, tErr := r.Read()
  64. if tErr != nil && tErr != io.EOF {
  65. err = errors.New("读取内容失败 "+ tErr.Error())
  66. return
  67. }
  68. if tErr == io.EOF {
  69. break
  70. }
  71. if count >= 1000 {
  72. //批量新增
  73. count = 0
  74. tErr = models.AddBaseFromChangesVisitorsCovidMulti(list)
  75. if tErr != nil {
  76. err = errors.New("批量新增失败 "+ tErr.Error())
  77. return
  78. }
  79. list = make([]*models.BaseFromChangesVisitorsCovid, 0)
  80. }
  81. if len(row) >= 9{
  82. tmp := new(models.BaseFromChangesVisitorsCovid)
  83. tmp.Entity = row[0]
  84. if !filterCountry(tmp.Entity) { //只需要五个国家的数据
  85. continue
  86. }
  87. tmp.Code = row[1]
  88. tmp.EdbCode = tmp.Code + "-TravelIndex"
  89. day, tErr := time.Parse(utils.FormatDate, row[2])
  90. if tErr != nil {
  91. continue
  92. }
  93. tmp.Day = day
  94. if day.Before(before20) && lastItem != nil {
  95. // 丢弃20天前的数据,只处理增量的数据
  96. continue
  97. }
  98. tmp.RetailAndRecreation = row[3]
  99. retailAndRecreation, _ := decimal.NewFromString(tmp.RetailAndRecreation)
  100. tmp.GroceryAndPharmacy = row[4]
  101. groceryAndPharmacy, _ := decimal.NewFromString(tmp.GroceryAndPharmacy)
  102. tmp.Residential = row[5]
  103. residential, _ := decimal.NewFromString(tmp.Residential)
  104. tmp.TransitStations = row[6]
  105. transitStations, _ := decimal.NewFromString(tmp.TransitStations)
  106. tmp.Parks = row[7]
  107. parks, _ := decimal.NewFromString(tmp.Parks)
  108. tmp.Workplaces = row[8]
  109. workplaces, _ := decimal.NewFromString(tmp.Workplaces)
  110. total := retailAndRecreation.Add(groceryAndPharmacy).Add(residential).Add(transitStations).Add(parks).Add(workplaces)
  111. tmp.Total = total.String()
  112. tmp.CreateTime = now
  113. tmp.ModifyTime = now
  114. if day.Format(utils.FormatDate) <= lastDay.Format(utils.FormatDate) && lastItem != nil { //如果是10天内的数据判断数据库中是否已存在
  115. _, tErr = models.GetBaseFromChangesVisitorsCovidByEntityDay(tmp.Entity, row[2])
  116. if tErr == nil {
  117. //已存在记录,则跳过
  118. continue
  119. }
  120. }
  121. list = append(list, tmp)
  122. count ++
  123. }
  124. }
  125. if len(list) > 0 {
  126. //批量新增
  127. tErr := models.AddBaseFromChangesVisitorsCovidMulti(list)
  128. if tErr != nil {
  129. err = errors.New("批量新增失败 "+ tErr.Error())
  130. return
  131. }
  132. }
  133. //处理文件后删除下载的内容
  134. err = os.Remove(fileName)
  135. if err != nil {
  136. err = errors.New("删除文件失败 "+ err.Error())
  137. return
  138. }
  139. utils.FileLog.Info("爬取谷歌出行记录 入库成功")
  140. return
  141. }
  142. // GetSourceChangesVisitorsCovid 爬取谷歌出行记录
  143. func GetSourceChangesVisitorsCovid() (filePathStr string, err error) {
  144. options := []chromedp.ExecAllocatorOption{
  145. chromedp.WindowSize(1920,1080),
  146. chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
  147. }
  148. options = append(chromedp.DefaultExecAllocatorOptions[:], options...)
  149. //创建chrome窗口
  150. allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...)
  151. // create chrome instance
  152. ctx, cancel := chromedp.NewContext(
  153. allocCtx,
  154. chromedp.WithLogf(utils.FileLog.Info),
  155. )
  156. defer cancel()
  157. // create a timeout
  158. ctx, cancel = context.WithTimeout(ctx, 200*time.Second)
  159. defer cancel()
  160. // set up a channel so we can block later while we monitor the download
  161. // progress
  162. done := make(chan string, 1)
  163. // set up a listener to watch the download events and close the channel
  164. // when complete this could be expanded to handle multiple downloads
  165. // through creating a guid map, monitor download urls via
  166. // EventDownloadWillBegin, etc
  167. chromedp.ListenTarget(ctx, func(v interface{}) {
  168. if ev, ok := v.(*browser.EventDownloadProgress); ok {
  169. completed := "(unknown)"
  170. if ev.TotalBytes != 0 {
  171. completed = fmt.Sprintf("%0.2f%%", ev.ReceivedBytes/ev.TotalBytes*100.0)
  172. }
  173. utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 state: %s, completed: %s\n", ev.State.String(), completed))
  174. if ev.State == browser.DownloadProgressStateCompleted {
  175. done <- ev.GUID
  176. close(done)
  177. utils.FileLog.Info("爬取谷歌出行记录 download finished")
  178. }
  179. }
  180. })
  181. dirPath, err := os.Getwd()
  182. if err != nil {
  183. //log.Fatal(err)
  184. return
  185. }
  186. dirPath += "/download_file/changes_visitors_covid"
  187. //log.Print("start time"+time.Now().Format("2006-01-02_15:04:05.999"))
  188. // navigate to a page, wait for an element, click
  189. err = chromedp.Run(ctx,
  190. chromedp.Navigate(`https://ourworldindata.org/grapher/changes-visitors-covid`),
  191. // wait for footer element is visible (ie, page is loaded)
  192. chromedp.WaitVisible(`.GrapherComponent`),
  193. chromedp.ScrollIntoView(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart-click-download")]`),
  194. chromedp.Click(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart-click-download")]`, chromedp.NodeReady),
  195. // configure headless browser downloads. note that
  196. // SetDownloadBehaviorBehaviorAllowAndName is preferred here over
  197. // SetDownloadBehaviorBehaviorAllow so that the file will be named as
  198. // the GUID. please note that it only works with 92.0.4498.0 or later
  199. // due to issue 1204880, see https://bugs.chromium.org/p/chromium/issues/detail?id=1204880
  200. browser.SetDownloadBehavior(browser.SetDownloadBehaviorBehaviorAllowAndName).
  201. WithDownloadPath(dirPath).
  202. WithEventsEnabled(true),
  203. chromedp.Click(`//div//button[contains(@data-track-note, "chart-download-csv")]`, chromedp.NodeVisible),
  204. )
  205. if err != nil {
  206. //log.Fatal(err)
  207. return
  208. }
  209. //log.Print("end time"+time.Now().Format("2006-01-02_15:04:05.999"))
  210. // This will block until the chromedp listener closes the channel
  211. guid := <-done
  212. // We can predict the exact file location and name here because of how we
  213. // configured SetDownloadBehavior and WithDownloadPath
  214. filePathStr = filepath.Join(dirPath, guid)
  215. utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 wrote %s", filePathStr))
  216. return
  217. }
  218. func filterCountry(entity string) bool {
  219. switch entity {
  220. case "United States":
  221. return true
  222. case "United Kingdom":
  223. return true
  224. case "France":
  225. return true
  226. case "Spain":
  227. return true
  228. case "Italy":
  229. return true
  230. default:
  231. return false
  232. }
  233. }