package services import ( "context" "encoding/csv" "errors" "eta/eta_crawler/models" "eta/eta_crawler/services/alarm_msg" "eta/eta_crawler/utils" "fmt" "github.com/chromedp/cdproto/browser" "github.com/chromedp/chromedp" "github.com/shopspring/decimal" "io" "os" "path/filepath" "time" ) func AddSourceChangesVisitorsCovid() (err error) { utils.FileLog.Info("爬取谷歌出行记录 开始爬取:") defer func() { if err != nil { utils.FileLog.Info("爬取谷歌出行记录失败 Err:" + err.Error()) msg := "失败提醒" + "AddSourceChangesVisitorsCovid ErrMsg:" + err.Error() go alarm_msg.SendAlarmMsg(msg, 3) } }() fileName, err := GetSourceChangesVisitorsCovid() if err != nil { err = errors.New("爬取谷歌出行记录失败" + err.Error()) return } if fileName == "" { err = errors.New("爬取谷歌出行记录失败, 文件下载失败") return } fs, err := os.Open(fileName) if err != nil { err = errors.New("打开文件失败" + err.Error()) return } defer fs.Close() r := csv.NewReader(fs) //针对大文件,一行一行的读取文件 count := 0 var list []*models.BaseFromChangesVisitorsCovid now := time.Now() lastItem, err := models.GetLatestBaseFromChangesVisitorsCovid() if err != nil { if err.Error() != utils.ErrNoRow() { err = errors.New("查询最新的记录失败" + err.Error()) return } else { err = nil } } var before20 time.Time var lastDay time.Time if lastItem != nil { lastDay = lastItem.Day before20 = lastItem.Day.AddDate(0, 0, -20) } for { row, tErr := r.Read() if tErr != nil && tErr != io.EOF { err = errors.New("读取内容失败 " + tErr.Error()) return } if tErr == io.EOF { break } if count >= 1000 { //批量新增 count = 0 tErr = models.AddBaseFromChangesVisitorsCovidMulti(list) if tErr != nil { err = errors.New("批量新增失败 " + tErr.Error()) return } list = make([]*models.BaseFromChangesVisitorsCovid, 0) } if len(row) >= 9 { tmp := new(models.BaseFromChangesVisitorsCovid) tmp.Entity = row[0] if !filterCountry(tmp.Entity) { //只需要五个国家的数据 continue } tmp.Code = row[1] tmp.EdbCode = tmp.Code + "-TravelIndex" day, tErr := time.Parse(utils.FormatDate, row[2]) if tErr != nil { continue } tmp.Day = day if day.Before(before20) && lastItem != nil { // 丢弃20天前的数据,只处理增量的数据 continue } tmp.RetailAndRecreation = row[3] retailAndRecreation, _ := decimal.NewFromString(tmp.RetailAndRecreation) tmp.GroceryAndPharmacy = row[4] groceryAndPharmacy, _ := decimal.NewFromString(tmp.GroceryAndPharmacy) tmp.Residential = row[5] residential, _ := decimal.NewFromString(tmp.Residential) tmp.TransitStations = row[6] transitStations, _ := decimal.NewFromString(tmp.TransitStations) tmp.Parks = row[7] parks, _ := decimal.NewFromString(tmp.Parks) tmp.Workplaces = row[8] workplaces, _ := decimal.NewFromString(tmp.Workplaces) total := retailAndRecreation.Add(groceryAndPharmacy).Add(residential).Add(transitStations).Add(parks).Add(workplaces) tmp.Total = total.String() tmp.CreateTime = now tmp.ModifyTime = now if day.Format(utils.FormatDate) <= lastDay.Format(utils.FormatDate) && lastItem != nil { //如果是10天内的数据判断数据库中是否已存在 _, tErr = models.GetBaseFromChangesVisitorsCovidByEntityDay(tmp.Entity, row[2]) if tErr == nil { //已存在记录,则跳过 continue } } list = append(list, tmp) count++ } } if len(list) > 0 { //批量新增 tErr := models.AddBaseFromChangesVisitorsCovidMulti(list) if tErr != nil { err = errors.New("批量新增失败 " + tErr.Error()) return } } //处理文件后删除下载的内容 err = os.Remove(fileName) if err != nil { err = errors.New("删除文件失败 " + err.Error()) return } utils.FileLog.Info("爬取谷歌出行记录 入库成功") return } // GetSourceChangesVisitorsCovid 爬取谷歌出行记录 func GetSourceChangesVisitorsCovid() (filePathStr string, err error) { options := []chromedp.ExecAllocatorOption{ chromedp.WindowSize(1920, 1080), chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`), } options = append(chromedp.DefaultExecAllocatorOptions[:], options...) //创建chrome窗口 allocCtx, cancel := chromedp.NewExecAllocator(context.Background(), options...) // create chrome instance ctx, cancel := chromedp.NewContext( allocCtx, ) defer cancel() // create a timeout ctx, cancel = context.WithTimeout(ctx, 200*time.Second) defer cancel() // set up a channel so we can block later while we monitor the download // progress done := make(chan string, 1) canceled := make(chan bool, 1) // set up a listener to watch the download events and close the channel // when complete this could be expanded to handle multiple downloads // through creating a guid map, monitor download urls via // EventDownloadWillBegin, etc chromedp.ListenTarget(ctx, func(v interface{}) { if ev, ok := v.(*browser.EventDownloadProgress); ok { completed := "(unknown)" if ev.TotalBytes != 0 { completed = fmt.Sprintf("%0.2f%%", ev.ReceivedBytes/ev.TotalBytes*100.0) } utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 state: %s, completed: %s\n", ev.State.String(), completed)) if ev.State == browser.DownloadProgressStateCompleted { done <- ev.GUID close(done) utils.FileLog.Info("爬取谷歌出行记录 download finished") return } if ev.State == browser.DownloadProgressStateCanceled { canceled <- true close(canceled) utils.FileLog.Info("爬取谷歌出行记录 download canceled") return } } }) dirPath, err := os.Getwd() if err != nil { //log.Fatal(err) return } dirPath += "/download_file/changes_visitors_covid" //log.Print("start time"+time.Now().Format("2006-01-02_15:04:05.999")) // navigate to a page, wait for an element, click err = chromedp.Run(ctx, chromedp.Navigate(`https://ourworldindata.org/grapher/changes-visitors-covid`), // wait for footer element is visible (ie, page is loaded) chromedp.WaitVisible(`.GrapherComponent`), chromedp.ScrollIntoView(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart_click_download")]`), chromedp.Click(`//figure[contains(@data-grapher-src,"changes-visitors-covid")]//ul//li//a[contains(@data-track-note, "chart_click_download")]`, chromedp.NodeReady), // configure headless browser downloads. note that // SetDownloadBehaviorBehaviorAllowAndName is preferred here over // SetDownloadBehaviorBehaviorAllow so that the file will be named as // the GUID. please note that it only works with 92.0.4498.0 or later // due to issue 1204880, see https://bugs.chromium.org/p/chromium/issues/detail?id=1204880 browser.SetDownloadBehavior(browser.SetDownloadBehaviorBehaviorAllowAndName). WithDownloadPath(dirPath). WithEventsEnabled(true), chromedp.Click(`//div//button[contains(@data-track-note, "chart_download_csv")]`, chromedp.NodeVisible), ) if err != nil { //log.Fatal(err) return } //log.Print("end time"+time.Now().Format("2006-01-02_15:04:05.999")) // This will block until the chromedp listener closes the channel //guid := <-done var guid string select { case <-canceled: err = fmt.Errorf("download canceled") return case p := <-done: guid = p } // We can predict the exact file location and name here because of how we // configured SetDownloadBehavior and WithDownloadPath filePathStr = filepath.Join(dirPath, guid) utils.FileLog.Info(fmt.Sprintf("爬取谷歌出行记录 wrote %s", filePathStr)) return } func filterCountry(entity string) bool { switch entity { case "United States": return true case "United Kingdom": return true case "France": return true case "Spain": return true case "Italy": return true default: return false } }