Browse Source

设置每天临晨爬取谷歌出行记录

xiexiaoyuan 2 years ago
parent
commit
0723cb0138
3 changed files with 154 additions and 7 deletions
  1. 1 1
      models/db.go
  2. 146 6
      services/source_changes_visitors_covid.go
  3. 7 0
      services/task.go

+ 1 - 1
models/db.go

@@ -34,7 +34,7 @@ func init() {
 		new(BaseFromTradeEicIndex),
 		new(BaseFromTradeEicIndexV2),
 		new(BaseFromTradeMapping),
-		//
+		new(BaseFromChangesVisitorsCovid),
 		new(BaseFromTradeDalianIndex),
 		new(BaseFromCoalmineMapping),
 		new(BaseFromCoalmineJsmIndex),

+ 146 - 6
services/source_changes_visitors_covid.go

@@ -2,16 +2,155 @@ package services
 
 import (
 	"context"
+	"encoding/csv"
+	"errors"
 	"fmt"
 	"github.com/chromedp/cdproto/browser"
 	"github.com/chromedp/chromedp"
+	"hongze/hongze_data_crawler/models"
+	"hongze/hongze_data_crawler/services/alarm_msg"
+	"hongze/hongze_data_crawler/utils"
+	"io"
 	"log"
 	"os"
 	"path/filepath"
+	"strconv"
 	"time"
 )
 
-func GetSourceChangesVisitorsCovid(dirPath string) (filePathStr string, err error) {
+
+func AddSourceChangesVisitorsCovid() (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println("爬取谷歌出行记录失败 Err:" + err.Error())
+			msg := "失败提醒"+"谷歌出行记录失败 ErrMsg:"+err.Error()
+			go alarm_msg.SendAlarmMsg(msg, 3)
+		}
+	}()
+
+	fileName, err :=GetSourceChangesVisitorsCovid()
+	if err != nil {
+		err = errors.New("爬取谷歌出行记录失败"+err.Error())
+		return
+	}
+
+	fs, err := os.Open(fileName)
+	if err != nil {
+		err = errors.New("打开文件失败"+err.Error())
+		return
+	}
+
+	defer fs.Close()
+
+	r := csv.NewReader(fs)
+
+	//针对大文件,一行一行的读取文件
+	count := 0
+	var list []*models.BaseFromChangesVisitorsCovid
+	now := time.Now()
+	lastItem, err := models.GetLatestBaseFromChangesVisitorsCovid()
+	if err != nil {
+		if err.Error() != utils.ErrNoRow() {
+			err = errors.New("查询最新的记录失败"+err.Error())
+			return
+		}else{
+			err = nil
+		}
+	}
+	var before10 time.Time
+	var lastDay time.Time
+	if lastItem != nil {
+		lastDay = lastItem.Day
+		before10 = lastItem.Day.AddDate(0, 0, -10)
+	}
+
+	for {
+
+		row, tErr := r.Read()
+
+		if tErr != nil && tErr != io.EOF {
+			err = errors.New("读取内容失败 "+ tErr.Error())
+			return
+		}
+
+		if tErr == io.EOF {
+			break
+		}
+		if count >= 1000 {
+			//批量新增
+			count = 0
+			tErr = models.AddBaseFromChangesVisitorsCovidMulti(list)
+			if tErr != nil {
+				err = errors.New("批量新增失败 "+ tErr.Error())
+				return
+
+			}
+			list = make([]*models.BaseFromChangesVisitorsCovid, 0)
+		}
+		if len(row) >= 9{
+			tmp := new(models.BaseFromChangesVisitorsCovid)
+			tmp.Entity = row[0]
+			tmp.Code = row[1]
+			tmp.EdbCode = tmp.Code + "-TravelIndex"
+			day, tErr := time.Parse(utils.FormatDate, row[2])
+			if tErr != nil {
+				continue
+			}
+			tmp.Day = day
+			if day.Before(before10) && lastItem != nil {
+				// 丢弃10天前的数据,只处理增量的数据
+				continue
+			}
+
+			tmp.RetailAndRecreation = row[3]
+			retailAndRecreation, _ := strconv.ParseFloat(tmp.RetailAndRecreation, 32)
+			tmp.GroceryAndPharmacy = row[4]
+			groceryAndPharmacy, _ := strconv.ParseFloat(tmp.GroceryAndPharmacy, 32)
+			tmp.Residential = row[5]
+			residential, _ := strconv.ParseFloat(tmp.Residential, 32)
+			tmp.TransitStations = row[6]
+			transitStations, _ := strconv.ParseFloat(tmp.TransitStations, 32)
+			tmp.Parks = row[7]
+			parks, _ := strconv.ParseFloat(tmp.Parks, 32)
+			tmp.Workplaces = row[8]
+			workplaces, _ := strconv.ParseFloat(tmp.Workplaces, 32)
+			total := retailAndRecreation + groceryAndPharmacy + residential + transitStations + parks + workplaces
+			tmp.Total = strconv.FormatFloat(total,'f',5,32)
+			tmp.CreateTime = now
+			tmp.ModifyTime = now
+
+			if day.Format(utils.FormatDate) <= lastDay.Format(utils.FormatDate) && lastItem != nil {  //如果是10天内的数据判断数据库中是否已存在
+				_, tErr = models.GetBaseFromChangesVisitorsCovidByEntityDay(tmp.Entity, row[2])
+				if tErr == nil {
+					//已存在记录,则跳过
+					continue
+				}
+			}
+			list = append(list, tmp)
+			count ++
+		}
+	}
+
+	if len(list) > 0 {
+		//批量新增
+		tErr := models.AddBaseFromChangesVisitorsCovidMulti(list)
+		if tErr != nil {
+			err = errors.New("批量新增失败 "+ tErr.Error())
+			return
+		}
+	}
+
+	//处理文件后删除下载的内容
+	/*err = os.Remove(fileName)
+	if err != nil {
+		err = errors.New("删除文件失败 "+ err.Error())
+		return
+	}*/
+	return
+}
+
+// GetSourceChangesVisitorsCovid 爬取谷歌出行记录
+func GetSourceChangesVisitorsCovid() (filePathStr string, err error) {
 	options := []chromedp.ExecAllocatorOption{
 		chromedp.WindowSize(1920,1080),
 		chromedp.UserAgent(`Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.80 Safari/537.36`),
@@ -54,12 +193,12 @@ func GetSourceChangesVisitorsCovid(dirPath string) (filePathStr string, err erro
 		}
 	})
 
-	if dirPath == "" {
-		dirPath, err = os.Getwd()
-		if err != nil {
-			log.Fatal(err)
-		}
+	dirPath, err := os.Getwd()
+	if err != nil {
+		//log.Fatal(err)
+		return
 	}
+	dirPath += "/download_file/changes_visitors_covid"
 
 	//log.Print("start time"+time.Now().Format("2006-01-02_15:04:05.999"))
 
@@ -84,6 +223,7 @@ func GetSourceChangesVisitorsCovid(dirPath string) (filePathStr string, err erro
 	)
 	if err != nil {
 		log.Fatal(err)
+		return
 	}
 	//log.Print("end time"+time.Now().Format("2006-01-02_15:04:05.999"))
 

+ 7 - 0
services/task.go

@@ -11,9 +11,11 @@ func Task() {
 	refreshData := task.NewTask("refreshData", "0 0,30 16-20 * * *", RefreshData)
 	refreshEic := task.NewTask("refreshData", "0 0 2,6 * * *", RefreshEic)
 	refreshCoal := task.NewTask("refreshData", "0 0,30 17-23 * * *", RefreshCoal)
+	refreshVisitors := task.NewTask("RefreshChangesVisitorsCovid","0 30 2 * * *", RefreshChangesVisitorsCovid )
 	task.AddTask("数据爬取", refreshData)
 	task.AddTask("欧洲天然气爬取", refreshEic)
 	task.AddTask("中国煤炭网爬取", refreshCoal)
+	task.AddTask("谷歌出行指数爬取", refreshVisitors)
 	task.StartTask()
 	//FileCoalJsm()
 	//FileCoalFirm()
@@ -46,4 +48,9 @@ func RefreshCoal(cont context.Context) (err error) {
 	FileCoalCoastal()
 	FileCoalInland()
 	return
+}
+
+func RefreshChangesVisitorsCovid(cont context.Context) (err error) {
+	err = AddSourceChangesVisitorsCovid()
+	return
 }