source_changes_visitors_covid.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package services
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "errors"
  6. "eta/eta_crawler/models"
  7. "eta/eta_crawler/services/alarm_msg"
  8. "eta/eta_crawler/utils"
  9. "fmt"
  10. "github.com/chromedp/cdproto/browser"
  11. "github.com/chromedp/chromedp"
  12. "github.com/shopspring/decimal"
  13. "io"
  14. "os"
  15. "path/filepath"
  16. "time"
  17. )
  18. func AddSourceChangesVisitorsCovid() (err error) {
  19. utils.FileLog.Info("爬取谷歌出行记录 开始爬取:")
  20. defer func() {
  21. if err != nil {
  22. utils.FileLog.Info("爬取谷歌出行记录失败 Err:" + err.Error())
  23. msg := "失败提醒" + "AddSourceChangesVisitorsCovid ErrMsg:" + err.Error()
  24. go alarm_msg.SendAlarmMsg(msg, 3)
  25. }
  26. }()
  27. fileName, err := GetSourceChangesVisitorsCovid()
  28. if err != nil {
  29. err = errors.New("爬取谷歌出行记录失败" + err.Error())
  30. return
  31. }
  32. if fileName == "" {
  33. err = errors.New("爬取谷歌出行记录失败, 文件下载失败")
  34. return
  35. }
  36. fs, err := os.Open(fileName)
  37. if err != nil {
  38. err = errors.New("打开文件失败" + err.Error())
  39. return
  40. }
  41. defer fs.Close()
  42. r := csv.NewReader(fs)
  43. //针对大文件,一行一行的读取文件
  44. count := 0
  45. var list []*models.BaseFromChangesVisitorsCovid
  46. now := time.Now()
  47. lastItem, err := models.GetLatestBaseFromChangesVisitorsCovid()
  48. if err != nil {
  49. if err.Error() != utils.ErrNoRow() {
  50. err = errors.New("查询最新的记录失败" + err.Error())
  51. return
  52. } else {
  53. err = nil
  54. }
  55. }
  56. var before20 time.Time
  57. var lastDay time.Time
  58. if lastItem != nil {
  59. lastDay = lastItem.Day
  60. before20 = lastItem.Day.AddDate(0, 0, -20)
  61. }
  62. for {
  63. row, tErr := r.Read()
  64. if tErr != nil && tErr != io.EOF {
  65. err = errors.New("读取内容失败 " + tErr.Error())
  66. return
  67. }
  68. if tErr == io.EOF {
  69. break
  70. }
  71. if count >= 1000 {
  72. //批量新增
  73. count = 0
  74. tErr = models.AddBaseFromChangesVisitorsCovidMulti(list)
  75. if tErr != nil {
  76. err = errors.New("批量新增失败 " + tErr.Error())
  77. return
  78. }
  79. list = make([]*models.BaseFromChangesVisitorsCovid, 0)
  80. }
  81. if len(row) >= 9 {
  82. tmp := new(models.BaseFromChangesVisitorsCovid)
  83. tmp.Entity = row[0]
  84. if !filterCountry(tmp.Entity) { //只需要五个国家的数据
  85. continue
  86. }
  87. tmp.Code = row[1]
  88. tmp.EdbCode = tmp.Code + "-TravelIndex"
  89. day, tErr := time.Parse(utils.FormatDate, row[2])
  90. if tErr != nil {
  91. continue
  92. }
  93. tmp.Day = day
  94. if day.Before(before20) && lastItem != nil {
  95. // 丢弃20天前的数据,只处理增量的数据
  96. continue
  97. }
  98. tmp.RetailAndRecreation = row[3]
  99. retailAndRecreation, _ := decimal.NewFromString(tmp.RetailAndRecreation)
  100. tmp.GroceryAndPharmacy = row[4]
  101. groceryAndPharmacy, _ := decimal.NewFromString(tmp.GroceryAndPharmacy)
  102. tmp.Residential = row[5]
  103. residential, _ := decimal.NewFromString(tmp.Residential)
  104. tmp.TransitStations = row[6]
  105. transitStations, _ := decimal.NewFromString(tmp.TransitStations)
  106. tmp.Parks = row[7]
  107. parks, _ := decimal.NewFromString(tmp.Parks)
  108. tmp.Workplaces = row[8]
  109. workplaces, _ := decimal.NewFromString(tmp.Workplaces)
  110. total := retailAndRecreation.Add(groceryAndPharmacy).Add(residential).Add(transitStations).Add(parks).Add(workplaces)
  111. tmp.Total = total.String()
  112. tmp.CreateTime = now
  113. tmp.ModifyTime = now
  114. if day.Format(utils.FormatDate) <= lastDay.Format(utils.FormatDate) && lastItem != nil { //如果是10天内的数据判断数据库中是否已存在
  115. _, tErr = models.GetBaseFromChangesVisitorsCovidByEntityDay(tmp.Entity, row[2])
  116. if tErr == nil {
  117. //已存在记录,则跳过
  118. continue
  119. }
  120. }
  121. list = append(list, tmp)
  122. count++
  123. }
  124. }
  125. if len(list) > 0 {
  126. //批量新增
  127. tErr := models.AddBaseFromChangesVisitorsCovidMulti(list)
  128. if tErr != nil {
  129. err = errors.New("批量新增失败 " + tErr.Error())
  130. return
  131. }
  132. }
  133. //处理文件后删除下载的内容
  134. err = os.Remove(fileName)
  135. if err != nil {
  136. err = errors.New("删除文件失败 " + err.Error())
  137. return
  138. }
  139. utils.FileLog.Info("爬取谷歌出行记录 入库成功")
  140. return
  141. }
  142. // GetSourceChangesVisitorsCovid 爬取谷歌出行记录
  143. func GetSourceChangesVisitorsCovid() (filePathStr string, err error) {
  144. options := []chromedp.ExecAllocatorOption{
  145. chromedp.WindowSize(1920, 1080),
  146. chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
  147. }
  148. options = append(chromedp.DefaultExecAllocatorOptions[:], options...)
  149. //创建chrome窗口
  150. allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...)
  151. // create chrome instance
  152. ctx, cancel := chromedp.NewContext(
  153. allocCtx,
  154. )
  155. defer cancel()
  156. // create a timeout
  157. ctx, cancel = context.WithTimeout(ctx, 200*time.Second)
  158. defer cancel()
  159. // set up a channel so we can block later while we monitor the download
  160. // progress
  161. done := make(chan string, 1)
  162. canceled := make(chan bool, 1)
  163. // set up a listener to watch the download events and close the channel
  164. // when complete this could be expanded to handle multiple downloads
  165. // through creating a guid map, monitor download urls via
  166. // EventDownloadWillBegin, etc
  167. chromedp.ListenTarget(ctx, func(v interface{}) {
  168. if ev, ok := v.(*browser.EventDownloadProgress); ok {
  169. completed := "(unknown)"
  170. if ev.TotalBytes != 0 {
  171. completed = fmt.Sprintf("%0.2f%%", ev.ReceivedBytes/ev.TotalBytes*100.0)
  172. }
  173. utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 state: %s, completed: %s\n", ev.State.String(), completed))
  174. if ev.State == browser.DownloadProgressStateCompleted {
  175. done <- ev.GUID
  176. close(done)
  177. utils.FileLog.Info("爬取谷歌出行记录 download finished")
  178. return
  179. }
  180. if ev.State == browser.DownloadProgressStateCanceled {
  181. canceled <- true
  182. close(canceled)
  183. utils.FileLog.Info("爬取谷歌出行记录 download canceled")
  184. return
  185. }
  186. }
  187. })
  188. dirPath, err := os.Getwd()
  189. if err != nil {
  190. //log.Fatal(err)
  191. return
  192. }
  193. dirPath += "/download_file/changes_visitors_covid"
  194. //log.Print("start time"+time.Now().Format("2006-01-02_15:04:05.999"))
  195. // navigate to a page, wait for an element, click
  196. err = chromedp.Run(ctx,
  197. chromedp.Navigate(`https://ourworldindata.org/grapher/changes-visitors-covid`),
  198. // wait for footer element is visible (ie, page is loaded)
  199. chromedp.WaitVisible(`.GrapherComponent`),
  200. chromedp.ScrollIntoView(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart_click_download")]`),
  201. chromedp.Click(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart_click_download")]`, chromedp.NodeReady),
  202. // configure headless browser downloads. note that
  203. // SetDownloadBehaviorBehaviorAllowAndName is preferred here over
  204. // SetDownloadBehaviorBehaviorAllow so that the file will be named as
  205. // the GUID. please note that it only works with 92.0.4498.0 or later
  206. // due to issue 1204880, see https://bugs.chromium.org/p/chromium/issues/detail?id=1204880
  207. browser.SetDownloadBehavior(browser.SetDownloadBehaviorBehaviorAllowAndName).
  208. WithDownloadPath(dirPath).
  209. WithEventsEnabled(true),
  210. chromedp.Click(`//div//button[contains(@data-track-note, "chart_download_csv")]`, chromedp.NodeVisible),
  211. )
  212. if err != nil {
  213. //log.Fatal(err)
  214. return
  215. }
  216. //log.Print("end time"+time.Now().Format("2006-01-02_15:04:05.999"))
  217. // This will block until the chromedp listener closes the channel
  218. //guid := <-done
  219. var guid string
  220. select {
  221. case <-canceled:
  222. err = fmt.Errorf("download canceled")
  223. return
  224. case p := <-done:
  225. guid = p
  226. }
  227. // We can predict the exact file location and name here because of how we
  228. // configured SetDownloadBehavior and WithDownloadPath
  229. filePathStr = filepath.Join(dirPath, guid)
  230. utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 wrote %s", filePathStr))
  231. return
  232. }
  233. func filterCountry(entity string) bool {
  234. switch entity {
  235. case "United States":
  236. return true
  237. case "United Kingdom":
  238. return true
  239. case "France":
  240. return true
  241. case "Spain":
  242. return true
  243. case "Italy":
  244. return true
  245. default:
  246. return false
  247. }
  248. }