Sfoglia il codice sorgente

汾渭数据源-接口数据爬取

gmy 6 mesi fa
parent
commit
c9d2bcdde4

+ 12 - 5
services/fenwei/base_from_fenwei_service.go

@@ -1,6 +1,6 @@
 // Package fenwei
 // @Author gmy 2024/8/20 15:06:00
-package fenwei
+package main
 
 import (
 	"encoding/json"
@@ -11,13 +11,14 @@ import (
 )
 
 // FenWeiNetDataDeal 汾渭网络数据处理
-func FenWeiNetDataDeal(err error) {
-	defer func() {
+// func FenWeiNetDataDeal(context.Context) (err error) {
+func main() {
+	/*defer func() {
 		if err != nil {
 			fmt.Println("FenWeiNetDataDeal Err:" + err.Error())
 			utils.FileLog.Info(fmt.Sprintf("FenWeiNetDataDeal Err: %s", err.Error()))
 		}
-	}()
+	}()*/
 	utils.FileLog.Info("FenWeiNetDataDeal start")
 	// 读取配置
 	configFile, err := os.ReadFile(utils.FenweiNetJsonPath)
@@ -39,6 +40,12 @@ func FenWeiNetDataDeal(err error) {
 	factory := ProcessorFactory{}
 	// 遍历调用对应的处理方法
 	for _, v := range config.Data {
-		factory.CreateProcessor(v).FetchAndProcess()
+		processor := factory.CreateProcessor(v)
+		err = processor.FetchAndProcess(processor)
+		if err != nil {
+			utils.FileLog.Info(fmt.Sprintf("处理数据错误: %v", err))
+			return
+		}
 	}
+	utils.FileLog.Info("FenWeiNetDataDeal end")
 }

+ 72 - 42
services/fenwei/data_processor.go

@@ -1,6 +1,6 @@
 // Package fenwei
 // @Author gmy 2024/8/20 14:47:00
-package fenwei
+package main
 
 import (
 	"bytes"
@@ -20,8 +20,9 @@ import (
 )
 
 type DataProcessor interface {
-	FetchAndProcess() error
-	GenerateRequestParams(string) map[string]string
+	FetchAndProcess(DataProcessor) error
+	GenerateRequestParams(currentTime string) map[string]string
+	ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error)
 }
 
 type BaseProcessor struct {
@@ -33,16 +34,19 @@ var (
 	authLock      sync.RWMutex
 )
 
-func (p *BaseProcessor) FetchAndProcess() error {
+func (p *BaseProcessor) FetchAndProcess(processor DataProcessor) error {
 
 	// 获取当前时间 yyyy-MM-dd
 	now := time.Now()
 	currentTime := now.Format(utils.FormatDateUnSpace)
 
 	// 请求参数
-	params := p.GenerateRequestParams(currentTime)
+	params := processor.GenerateRequestParams(currentTime)
+	// 保存请求参数
+	originalRequestBody := params["params"] // 保存原始请求数据
 
-	req, err := http.NewRequest("POST", p.URL, bytes.NewBufferString(params["params"]))
+	requestBody := bytes.NewBufferString(originalRequestBody)
+	req, err := http.NewRequest("POST", p.URL, requestBody)
 	if err != nil {
 		return err
 	}
@@ -77,26 +81,51 @@ func (p *BaseProcessor) FetchAndProcess() error {
 		}
 		authLock.Unlock()
 
-		// 重新请求
+		// 重新创建请求对象
+		requestBody = bytes.NewBufferString(originalRequestBody)
+		req, err = http.NewRequest("POST", p.URL, requestBody)
+		if err != nil {
+			return err
+		}
+
+		// 重新设置请求头
+		req.Header.Set("Content-Type", "application/json")
+		req.Header.Set("accept-language", "zh-CN,zh;q=0.9")
 		req.Header.Set("Authorization", authorization)
+
+		// 重新请求
 		resp, err = client.Do(req)
 		if err != nil {
 			return err
 		}
+		defer resp.Body.Close()
+
+		body, err = io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
 	}
 
 	// 数据处理
-	response, err := p.ProcessResponse(string(body))
+	response, err := processor.ProcessResponse(string(body))
 	if err != nil {
 		return err
 	}
+	log.Printf("response size: %v", len(response))
 	utils.FileLog.Info(fmt.Sprintf("response: %v", response))
 
 	// 请求lib应用入库
 	paramsLib := make(map[string]interface{})
 	paramsLib["List"] = response
 	paramsLib["TerminalCode"] = utils.TerminalCode
-	services.PostEdbLib(paramsLib, utils.LIB_ROUTE_FENWEI_NET_DATA_HANDLE)
+	postEdbLib, err := services.PostEdbLib(paramsLib, utils.LIB_ROUTE_FENWEI_NET_DATA_HANDLE)
+	if err != nil {
+		// 有错误就不继续执行
+		log.Printf("postEdbLib err: %v", err)
+		return err
+	}
+	log.Printf("postEdbLib size: %v", len(postEdbLib))
+	utils.FileLog.Info(fmt.Sprintf("postEdbLib: %v", string(postEdbLib)))
 	return nil
 }
 
@@ -118,7 +147,7 @@ func checkResp(resp string) bool {
 }
 
 // GenerateRequestParams 让子类来实现这个方法
-func (p *BaseProcessor) GenerateRequestParams(string) map[string]string {
+func (p *BaseProcessor) GenerateRequestParams(currentTime string) map[string]string {
 	return map[string]string{}
 }
 
@@ -129,15 +158,14 @@ func (p *BaseProcessor) ProcessResponse(data string) ([]models.FenWeiNetIndexInf
 
 // GetAuthorizationByChrome 获取Authorization
 func getAuthorizationByChrome() (authorization string, err error) {
-	// 读取Cookie
+	// 检查账号和密码是否设置
 	if utils.FenweiNetUseName == "" {
-		err = fmt.Errorf("汾渭账号未设置")
-		return
+		return "", fmt.Errorf("汾渭账号未设置")
 	}
 	if utils.FenweiNetPassword == "" {
-		err = fmt.Errorf("汾渭密码未设置")
-		return
+		return "", fmt.Errorf("汾渭密码未设置")
 	}
+
 	opts := append(
 		chromedp.DefaultExecAllocatorOptions[:],
 		chromedp.Flag("headless", false),
@@ -151,6 +179,24 @@ func getAuthorizationByChrome() (authorization string, err error) {
 		chromedp.WithLogf(log.Printf),
 	)
 	defer cancel2()
+
+	// 提前设置监听器
+	authorizationChan := make(chan string, 1) // 使用channel来确保监听到的Authorization可以安全传递
+	chromedp.ListenTarget(ctx, func(ev interface{}) {
+		if ev, ok := ev.(*network.EventRequestWillBeSent); ok {
+			if authHeader, found := ev.Request.Headers["Authorization"]; found {
+				if authStr, ok := authHeader.(string); ok {
+					select {
+					case authorizationChan <- authStr: // 将Authorization放入channel
+					default:
+					}
+					utils.FileLog.Info("Authorization header found: " + authStr)
+				}
+			}
+		}
+	})
+
+	// 运行浏览器操作
 	err = chromedp.Run(ctx,
 		chromedp.Navigate(`https://www.sxcoal.com/`),
 		chromedp.Click(`.pc_content__jO_mq`, chromedp.ByQuery),
@@ -160,35 +206,19 @@ func getAuthorizationByChrome() (authorization string, err error) {
 		chromedp.SetValue(`div.Sign_password__dwxMn input[type="password"]`, utils.FenweiNetPassword, chromedp.ByQuery),
 		chromedp.Sleep(2*time.Second),
 
-		// 登录
-		chromedp.Click(`.Button_text__D25sy`, chromedp.ByQuery),
-
-		chromedp.ActionFunc(func(ctx context.Context) error {
-			/*cookies, err := network.GetCookies().Do(ctx)
-			if err != nil {
-				return err
-			}
+		// 定位并点击指定按钮(class属性为 'Button_btn__xbZjp Button_black__X_jwF Button_mediu__ZVHO_',并且 span 标签内容为 '登录')
+		chromedp.Click(`//button[contains(@class, 'Button_btn__xbZjp') and contains(@class, 'Button_black__X_jwF') and contains(@class, 'Button_mediu__ZVHO_')]/span[text()='登录']`, chromedp.BySearch),
 
-			for _, v := range cookies {
-				cookieStr = cookieStr + v.Name + "=" + v.Value + ";"
-			}
-			utils.FileLog.Info("header cookie:" + cookieStr)*/
-
-			// 监听并处理请求事件
-			chromedp.ListenTarget(ctx, func(ev interface{}) {
-				if ev, ok := ev.(*network.EventRequestWillBeSent); ok {
-					for _, header := range ev.Request.Headers {
-						if authHeader, ok := header.(string); ok && authHeader == "Authorization" {
-							authorization = authHeader
-							utils.FileLog.Info("Authorization header found: " + authorization)
-						}
-					}
-				}
-			})
-
-			return nil
-		}),
+		// 添加延迟,确保捕获到请求头
+		chromedp.Sleep(8*time.Second),
 	)
 
+	// 从channel中获取authorization
+	select {
+	case authorization = <-authorizationChan:
+	case <-time.After(10 * time.Second): // 超时时间,可以根据实际情况调整
+		err = fmt.Errorf("未能获取到Authorization")
+	}
+
 	return
 }

+ 1 - 1
services/fenwei/processor_business_logic.go

@@ -1,6 +1,6 @@
 // Package fenwei
 // @Author gmy 2024/8/20 14:47:00
-package fenwei
+package main
 
 import (
 	"encoding/json"

+ 1 - 1
services/fenwei/processor_factory.go

@@ -1,6 +1,6 @@
 // Package fenwei
 // @Author gmy 2024/8/20 14:50:00
-package fenwei
+package main
 
 const (
 	url   = "https://www.sxcoal.com/api/coalresource-adhoc/queryV1/data"

+ 5 - 0
services/task.go

@@ -45,6 +45,11 @@ func Task() {
 		// 每5分钟检测一次目录是否有新文件
 		fenWeiReadWatchIndexFile := task.NewTask("fenWeiReadWatchIndexFile", "0 */5 * * * *", FenweiReadWatchIndexFile)
 		task.AddTask("汾渭数据指标文件检测", fenWeiReadWatchIndexFile)
+
+		// 汾渭网络数据处理 todo 上线放开
+		/*fenWeiNetDataDeal := task.NewTask("fenWeiNetDataDeal", "0 0 1 * * *", fenwei.FenWeiNetDataDeal)
+		task.AddTask("汾渭网络数据处理", fenWeiNetDataDeal)*/
+
 	}
 
 	if utils.MtjhOpen == "1" {