// Package fenwei // @Author gmy 2024/8/20 14:47:00 package fenwei import ( "bytes" "context" "encoding/json" "eta/eta_data_analysis/models" "eta/eta_data_analysis/utils" "fmt" "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "io" "io/ioutil" "log" "net/http" "strings" "sync" "time" ) type DataProcessor interface { FetchAndProcess(DataProcessor) error GenerateRequestParams(currentTime string) map[string]string ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error) } type BaseProcessor struct { URL string } var ( authorization string authLock sync.RWMutex ) func (p *BaseProcessor) FetchAndProcess(processor DataProcessor) error { // 获取当前时间 yyyy-MM-dd now := time.Now() currentTime := now.Format(utils.FormatDateUnSpace) // 请求参数 params := processor.GenerateRequestParams(currentTime) // 保存请求参数 originalRequestBody := params["params"] // 保存原始请求数据 requestBody := bytes.NewBufferString(originalRequestBody) req, err := http.NewRequest("POST", p.URL, requestBody) if err != nil { return err } // 设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("accept-language", "zh-CN,zh;q=0.9") authLock.RLock() req.Header.Set("Authorization", authorization) authLock.RUnlock() client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err } if checkResp(string(body)) { authLock.Lock() // 登录获取Authorization authorization, err = getAuthorizationByChrome() if err != nil { authLock.Unlock() return err } authLock.Unlock() // 重新创建请求对象 requestBody = bytes.NewBufferString(originalRequestBody) req, err = http.NewRequest("POST", p.URL, requestBody) if err != nil { return err } // 重新设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("accept-language", "zh-CN,zh;q=0.9") req.Header.Set("Authorization", authorization) // 重新请求 resp, err = client.Do(req) if err != nil { return err } defer resp.Body.Close() body, err = io.ReadAll(resp.Body) if err != nil { return err } } // 数据处理 response, err := processor.ProcessResponse(string(body)) if err != nil { return err } log.Printf("response size: %v", len(response)) utils.FileLog.Info(fmt.Sprintf("response: %v", response)) // 请求lib应用入库 paramsLib := make(map[string]interface{}) paramsLib["List"] = response paramsLib["TerminalCode"] = utils.TerminalCode postEdbLib, err := postEdbLib(paramsLib, utils.LIB_ROUTE_FENWEI_NET_DATA_HANDLE) if err != nil { // 有错误就不继续执行 log.Printf("postEdbLib err: %v", err) return err } log.Printf("postEdbLib size: %v", len(postEdbLib)) utils.FileLog.Info(fmt.Sprintf("postEdbLib: %v", string(postEdbLib))) return nil } // PostEdbLib 调用指标接口 func postEdbLib(param map[string]interface{}, method string) (result []byte, err error) { postUrl := utils.EDB_LIB_URL + method postData, err := json.Marshal(param) if err != nil { return } result, err = httpPost(postUrl, string(postData), "application/json") if err != nil { return } return } func httpPost(url, postData string, params ...string) ([]byte, error) { fmt.Println("HttpPost Url:" + url) body := ioutil.NopCloser(strings.NewReader(postData)) client := &http.Client{} req, err := http.NewRequest("POST", url, body) if err != nil { return nil, err } contentType := "application/x-www-form-urlencoded;charset=utf-8" if len(params) > 0 && params[0] != "" { contentType = params[0] } req.Header.Set("Content-Type", contentType) req.Header.Set("authorization", utils.MD5(utils.APP_EDB_LIB_NAME_EN+utils.EDB_LIB_Md5_KEY)) resp, err := client.Do(req) if err != nil { fmt.Println("client.Do err:" + err.Error()) return nil, err } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println("HttpPost:" + string(b)) } return b, err } // resp响应参数检测 code or message 判断是否需要重新登录 func checkResp(resp string) bool { if resp == "" { return true } var responseObj models.Response err := json.Unmarshal([]byte(resp), &responseObj) if err != nil { return false } if responseObj.Code != 200 || responseObj.Message != "成功!" { return true } return false } // GenerateRequestParams 让子类来实现这个方法 func (p *BaseProcessor) GenerateRequestParams(currentTime string) map[string]string { return map[string]string{} } func (p *BaseProcessor) ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error) { return nil, nil } // GetAuthorizationByChrome 获取Authorization func getAuthorizationByChrome() (authorization string, err error) { // 检查账号和密码是否设置 if utils.FenweiNetUseName == "" { return "", fmt.Errorf("汾渭账号未设置") } if utils.FenweiNetPassword == "" { return "", fmt.Errorf("汾渭密码未设置") } opts := append( chromedp.DefaultExecAllocatorOptions[:], chromedp.Flag("headless", false), ) allocCtx, cancel1 := chromedp.NewExecAllocator(context.Background(), opts...) defer cancel1() // 创建chrome实例 ctx, cancel2 := chromedp.NewContext( allocCtx, chromedp.WithLogf(log.Printf), ) defer cancel2() // 提前设置监听器 authorizationChan := make(chan string, 1) // 使用channel来确保监听到的Authorization可以安全传递 chromedp.ListenTarget(ctx, func(ev interface{}) { if ev, ok := ev.(*network.EventRequestWillBeSent); ok { if authHeader, found := ev.Request.Headers["Authorization"]; found { if authStr, ok := authHeader.(string); ok { select { case authorizationChan <- authStr: // 将Authorization放入channel default: } utils.FileLog.Info("Authorization header found: " + authStr) } } } }) // 运行浏览器操作 err = chromedp.Run(ctx, chromedp.Navigate(`https://www.sxcoal.com/`), chromedp.Click(`.pc_content__jO_mq`, chromedp.ByQuery), chromedp.Sleep(2*time.Second), chromedp.SetValue(`div.Sign_username__7eYwE input[type="text"]`, utils.FenweiNetUseName, chromedp.ByQuery), chromedp.SetValue(`div.Sign_password__dwxMn input[type="password"]`, utils.FenweiNetPassword, chromedp.ByQuery), chromedp.Sleep(2*time.Second), // 定位并点击指定按钮(class属性为 'Button_btn__xbZjp Button_black__X_jwF Button_mediu__ZVHO_',并且 span 标签内容为 '登录') chromedp.Click(`//button[contains(@class, 'Button_btn__xbZjp') and contains(@class, 'Button_black__X_jwF') and contains(@class, 'Button_mediu__ZVHO_')]/span[text()='登录']`, chromedp.BySearch), // 添加延迟,确保捕获到请求头 chromedp.Sleep(8*time.Second), ) // 从channel中获取authorization select { case authorization = <-authorizationChan: case <-time.After(10 * time.Second): // 超时时间,可以根据实际情况调整 err = fmt.Errorf("未能获取到Authorization") } return }