123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- package services
- import (
- "context"
- "encoding/csv"
- "errors"
- "eta/eta_crawler/models"
- "eta/eta_crawler/services/alarm_msg"
- "eta/eta_crawler/utils"
- "fmt"
- "github.com/chromedp/cdproto/browser"
- "github.com/chromedp/chromedp"
- "github.com/shopspring/decimal"
- "io"
- "os"
- "path/filepath"
- "time"
- )
- func AddSourceChangesVisitorsCovid() (err error) {
- utils.FileLog.Info("爬取谷歌出行记录 开始爬取:")
- defer func() {
- if err != nil {
- utils.FileLog.Info("爬取谷歌出行记录失败 Err:" + err.Error())
- msg := "失败提醒" + "AddSourceChangesVisitorsCovid ErrMsg:" + err.Error()
- go alarm_msg.SendAlarmMsg(msg, 3)
- }
- }()
- fileName, err := GetSourceChangesVisitorsCovid()
- if err != nil {
- err = errors.New("爬取谷歌出行记录失败" + err.Error())
- return
- }
- if fileName == "" {
- err = errors.New("爬取谷歌出行记录失败, 文件下载失败")
- return
- }
- fs, err := os.Open(fileName)
- if err != nil {
- err = errors.New("打开文件失败" + err.Error())
- return
- }
- defer fs.Close()
- r := csv.NewReader(fs)
- //针对大文件,一行一行的读取文件
- count := 0
- var list []*models.BaseFromChangesVisitorsCovid
- now := time.Now()
- lastItem, err := models.GetLatestBaseFromChangesVisitorsCovid()
- if err != nil {
- if err.Error() != utils.ErrNoRow() {
- err = errors.New("查询最新的记录失败" + err.Error())
- return
- } else {
- err = nil
- }
- }
- var before20 time.Time
- var lastDay time.Time
- if lastItem != nil {
- lastDay = lastItem.Day
- before20 = lastItem.Day.AddDate(0, 0, -20)
- }
- for {
- row, tErr := r.Read()
- if tErr != nil && tErr != io.EOF {
- err = errors.New("读取内容失败 " + tErr.Error())
- return
- }
- if tErr == io.EOF {
- break
- }
- if count >= 1000 {
- //批量新增
- count = 0
- tErr = models.AddBaseFromChangesVisitorsCovidMulti(list)
- if tErr != nil {
- err = errors.New("批量新增失败 " + tErr.Error())
- return
- }
- list = make([]*models.BaseFromChangesVisitorsCovid, 0)
- }
- if len(row) >= 9 {
- tmp := new(models.BaseFromChangesVisitorsCovid)
- tmp.Entity = row[0]
- if !filterCountry(tmp.Entity) { //只需要五个国家的数据
- continue
- }
- tmp.Code = row[1]
- tmp.EdbCode = tmp.Code + "-TravelIndex"
- day, tErr := time.Parse(utils.FormatDate, row[2])
- if tErr != nil {
- continue
- }
- tmp.Day = day
- if day.Before(before20) && lastItem != nil {
- // 丢弃20天前的数据,只处理增量的数据
- continue
- }
- tmp.RetailAndRecreation = row[3]
- retailAndRecreation, _ := decimal.NewFromString(tmp.RetailAndRecreation)
- tmp.GroceryAndPharmacy = row[4]
- groceryAndPharmacy, _ := decimal.NewFromString(tmp.GroceryAndPharmacy)
- tmp.Residential = row[5]
- residential, _ := decimal.NewFromString(tmp.Residential)
- tmp.TransitStations = row[6]
- transitStations, _ := decimal.NewFromString(tmp.TransitStations)
- tmp.Parks = row[7]
- parks, _ := decimal.NewFromString(tmp.Parks)
- tmp.Workplaces = row[8]
- workplaces, _ := decimal.NewFromString(tmp.Workplaces)
- total := retailAndRecreation.Add(groceryAndPharmacy).Add(residential).Add(transitStations).Add(parks).Add(workplaces)
- tmp.Total = total.String()
- tmp.CreateTime = now
- tmp.ModifyTime = now
- if day.Format(utils.FormatDate) <= lastDay.Format(utils.FormatDate) && lastItem != nil { //如果是10天内的数据判断数据库中是否已存在
- _, tErr = models.GetBaseFromChangesVisitorsCovidByEntityDay(tmp.Entity, row[2])
- if tErr == nil {
- //已存在记录,则跳过
- continue
- }
- }
- list = append(list, tmp)
- count++
- }
- }
- if len(list) > 0 {
- //批量新增
- tErr := models.AddBaseFromChangesVisitorsCovidMulti(list)
- if tErr != nil {
- err = errors.New("批量新增失败 " + tErr.Error())
- return
- }
- }
- //处理文件后删除下载的内容
- err = os.Remove(fileName)
- if err != nil {
- err = errors.New("删除文件失败 " + err.Error())
- return
- }
- utils.FileLog.Info("爬取谷歌出行记录 入库成功")
- return
- }
- // GetSourceChangesVisitorsCovid 爬取谷歌出行记录
- func GetSourceChangesVisitorsCovid() (filePathStr string, err error) {
- options := []chromedp.ExecAllocatorOption{
- chromedp.WindowSize(1920, 1080),
- chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
- }
- options = append(chromedp.DefaultExecAllocatorOptions[:], options...)
- //创建chrome窗口
- allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...)
- // create chrome instance
- ctx, cancel := chromedp.NewContext(
- allocCtx,
- )
- defer cancel()
- // create a timeout
- ctx, cancel = context.WithTimeout(ctx, 200*time.Second)
- defer cancel()
- // set up a channel so we can block later while we monitor the download
- // progress
- done := make(chan string, 1)
- canceled := make(chan bool, 1)
- // set up a listener to watch the download events and close the channel
- // when complete this could be expanded to handle multiple downloads
- // through creating a guid map, monitor download urls via
- // EventDownloadWillBegin, etc
- chromedp.ListenTarget(ctx, func(v interface{}) {
- if ev, ok := v.(*browser.EventDownloadProgress); ok {
- completed := "(unknown)"
- if ev.TotalBytes != 0 {
- completed = fmt.Sprintf("%0.2f%%", ev.ReceivedBytes/ev.TotalBytes*100.0)
- }
- utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 state: %s, completed: %s\n", ev.State.String(), completed))
- if ev.State == browser.DownloadProgressStateCompleted {
- done <- ev.GUID
- close(done)
- utils.FileLog.Info("爬取谷歌出行记录 download finished")
- return
- }
- if ev.State == browser.DownloadProgressStateCanceled {
- canceled <- true
- close(canceled)
- utils.FileLog.Info("爬取谷歌出行记录 download canceled")
- return
- }
- }
- })
- dirPath, err := os.Getwd()
- if err != nil {
- //log.Fatal(err)
- return
- }
- dirPath += "/download_file/changes_visitors_covid"
- //log.Print("start time"+time.Now().Format("2006-01-02_15:04:05.999"))
- // navigate to a page, wait for an element, click
- err = chromedp.Run(ctx,
- chromedp.Navigate(`https://ourworldindata.org/grapher/changes-visitors-covid`),
- // wait for footer element is visible (ie, page is loaded)
- chromedp.WaitVisible(`.GrapherComponent`),
- chromedp.ScrollIntoView(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart_click_download")]`),
- chromedp.Click(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart_click_download")]`, chromedp.NodeReady),
- // configure headless browser downloads. note that
- // SetDownloadBehaviorBehaviorAllowAndName is preferred here over
- // SetDownloadBehaviorBehaviorAllow so that the file will be named as
- // the GUID. please note that it only works with 92.0.4498.0 or later
- // due to issue 1204880, see https://bugs.chromium.org/p/chromium/issues/detail?id=1204880
- browser.SetDownloadBehavior(browser.SetDownloadBehaviorBehaviorAllowAndName).
- WithDownloadPath(dirPath).
- WithEventsEnabled(true),
- chromedp.Click(`//div//button[contains(@data-track-note, "chart_download_csv")]`, chromedp.NodeVisible),
- )
- if err != nil {
- //log.Fatal(err)
- return
- }
- //log.Print("end time"+time.Now().Format("2006-01-02_15:04:05.999"))
- // This will block until the chromedp listener closes the channel
- //guid := <-done
- var guid string
- select {
- case <-canceled:
- err = fmt.Errorf("download canceled")
- return
- case p := <-done:
- guid = p
- }
- // We can predict the exact file location and name here because of how we
- // configured SetDownloadBehavior and WithDownloadPath
- filePathStr = filepath.Join(dirPath, guid)
- utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 wrote %s", filePathStr))
- return
- }
- func filterCountry(entity string) bool {
- switch entity {
- case "United States":
- return true
- case "United Kingdom":
- return true
- case "France":
- return true
- case "Spain":
- return true
- case "Italy":
- return true
- default:
- return false
- }
- }
|