data_processor.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Package fenwei
  2. // @Author gmy 2024/8/20 14:47:00
  3. package main
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/json"
  8. "eta/eta_data_analysis/models"
  9. "eta/eta_data_analysis/services"
  10. "eta/eta_data_analysis/utils"
  11. "fmt"
  12. "github.com/chromedp/cdproto/network"
  13. "github.com/chromedp/chromedp"
  14. "io"
  15. "log"
  16. "net/http"
  17. "sync"
  18. "time"
  19. )
  20. type DataProcessor interface {
  21. FetchAndProcess(DataProcessor) error
  22. GenerateRequestParams(currentTime string) map[string]string
  23. ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error)
  24. }
  25. type BaseProcessor struct {
  26. URL string
  27. }
  28. var (
  29. authorization string
  30. authLock sync.RWMutex
  31. )
  32. func (p *BaseProcessor) FetchAndProcess(processor DataProcessor) error {
  33. // 获取当前时间 yyyy-MM-dd
  34. now := time.Now()
  35. currentTime := now.Format(utils.FormatDateUnSpace)
  36. // 请求参数
  37. params := processor.GenerateRequestParams(currentTime)
  38. // 保存请求参数
  39. originalRequestBody := params["params"] // 保存原始请求数据
  40. requestBody := bytes.NewBufferString(originalRequestBody)
  41. req, err := http.NewRequest("POST", p.URL, requestBody)
  42. if err != nil {
  43. return err
  44. }
  45. // 设置请求头
  46. req.Header.Set("Content-Type", "application/json")
  47. req.Header.Set("accept-language", "zh-CN,zh;q=0.9")
  48. authLock.RLock()
  49. req.Header.Set("Authorization", authorization)
  50. authLock.RUnlock()
  51. client := &http.Client{}
  52. resp, err := client.Do(req)
  53. if err != nil {
  54. return err
  55. }
  56. defer resp.Body.Close()
  57. body, err := io.ReadAll(resp.Body)
  58. if err != nil {
  59. return err
  60. }
  61. if checkResp(string(body)) {
  62. authLock.Lock()
  63. // 登录获取Authorization
  64. authorization, err = getAuthorizationByChrome()
  65. if err != nil {
  66. authLock.Unlock()
  67. return err
  68. }
  69. authLock.Unlock()
  70. // 重新创建请求对象
  71. requestBody = bytes.NewBufferString(originalRequestBody)
  72. req, err = http.NewRequest("POST", p.URL, requestBody)
  73. if err != nil {
  74. return err
  75. }
  76. // 重新设置请求头
  77. req.Header.Set("Content-Type", "application/json")
  78. req.Header.Set("accept-language", "zh-CN,zh;q=0.9")
  79. req.Header.Set("Authorization", authorization)
  80. // 重新请求
  81. resp, err = client.Do(req)
  82. if err != nil {
  83. return err
  84. }
  85. defer resp.Body.Close()
  86. body, err = io.ReadAll(resp.Body)
  87. if err != nil {
  88. return err
  89. }
  90. }
  91. // 数据处理
  92. response, err := processor.ProcessResponse(string(body))
  93. if err != nil {
  94. return err
  95. }
  96. log.Printf("response size: %v", len(response))
  97. utils.FileLog.Info(fmt.Sprintf("response: %v", response))
  98. // 请求lib应用入库
  99. paramsLib := make(map[string]interface{})
  100. paramsLib["List"] = response
  101. paramsLib["TerminalCode"] = utils.TerminalCode
  102. postEdbLib, err := services.PostEdbLib(paramsLib, utils.LIB_ROUTE_FENWEI_NET_DATA_HANDLE)
  103. if err != nil {
  104. // 有错误就不继续执行
  105. log.Printf("postEdbLib err: %v", err)
  106. return err
  107. }
  108. log.Printf("postEdbLib size: %v", len(postEdbLib))
  109. utils.FileLog.Info(fmt.Sprintf("postEdbLib: %v", string(postEdbLib)))
  110. return nil
  111. }
  112. // resp响应参数检测 code or message 判断是否需要重新登录
  113. func checkResp(resp string) bool {
  114. if resp == "" {
  115. return true
  116. }
  117. var responseObj models.Response
  118. err := json.Unmarshal([]byte(resp), &responseObj)
  119. if err != nil {
  120. return false
  121. }
  122. if responseObj.Code != 200 || responseObj.Message != "成功!" {
  123. return true
  124. }
  125. return false
  126. }
  127. // GenerateRequestParams 让子类来实现这个方法
  128. func (p *BaseProcessor) GenerateRequestParams(currentTime string) map[string]string {
  129. return map[string]string{}
  130. }
  131. func (p *BaseProcessor) ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error) {
  132. return nil, nil
  133. }
  134. // GetAuthorizationByChrome 获取Authorization
  135. func getAuthorizationByChrome() (authorization string, err error) {
  136. // 检查账号和密码是否设置
  137. if utils.FenweiNetUseName == "" {
  138. return "", fmt.Errorf("汾渭账号未设置")
  139. }
  140. if utils.FenweiNetPassword == "" {
  141. return "", fmt.Errorf("汾渭密码未设置")
  142. }
  143. opts := append(
  144. chromedp.DefaultExecAllocatorOptions[:],
  145. chromedp.Flag("headless", false),
  146. )
  147. allocCtx, cancel1 := chromedp.NewExecAllocator(context.Background(), opts...)
  148. defer cancel1()
  149. // 创建chrome实例
  150. ctx, cancel2 := chromedp.NewContext(
  151. allocCtx,
  152. chromedp.WithLogf(log.Printf),
  153. )
  154. defer cancel2()
  155. // 提前设置监听器
  156. authorizationChan := make(chan string, 1) // 使用channel来确保监听到的Authorization可以安全传递
  157. chromedp.ListenTarget(ctx, func(ev interface{}) {
  158. if ev, ok := ev.(*network.EventRequestWillBeSent); ok {
  159. if authHeader, found := ev.Request.Headers["Authorization"]; found {
  160. if authStr, ok := authHeader.(string); ok {
  161. select {
  162. case authorizationChan <- authStr: // 将Authorization放入channel
  163. default:
  164. }
  165. utils.FileLog.Info("Authorization header found: " + authStr)
  166. }
  167. }
  168. }
  169. })
  170. // 运行浏览器操作
  171. err = chromedp.Run(ctx,
  172. chromedp.Navigate(`https://www.sxcoal.com/`),
  173. chromedp.Click(`.pc_content__jO_mq`, chromedp.ByQuery),
  174. chromedp.Sleep(2*time.Second),
  175. chromedp.SetValue(`div.Sign_username__7eYwE input[type="text"]`, utils.FenweiNetUseName, chromedp.ByQuery),
  176. chromedp.SetValue(`div.Sign_password__dwxMn input[type="password"]`, utils.FenweiNetPassword, chromedp.ByQuery),
  177. chromedp.Sleep(2*time.Second),
  178. // 定位并点击指定按钮(class属性为 'Button_btn__xbZjp Button_black__X_jwF Button_mediu__ZVHO_',并且 span 标签内容为 '登录')
  179. chromedp.Click(`//button[contains(@class, 'Button_btn__xbZjp') and contains(@class, 'Button_black__X_jwF') and contains(@class, 'Button_mediu__ZVHO_')]/span[text()='登录']`, chromedp.BySearch),
  180. // 添加延迟,确保捕获到请求头
  181. chromedp.Sleep(8*time.Second),
  182. )
  183. // 从channel中获取authorization
  184. select {
  185. case authorization = <-authorizationChan:
  186. case <-time.After(10 * time.Second): // 超时时间,可以根据实际情况调整
  187. err = fmt.Errorf("未能获取到Authorization")
  188. }
  189. return
  190. }