data_processor.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Package fenwei
  2. // @Author gmy 2024/8/20 14:47:00
  3. package fenwei
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/json"
  8. "eta/eta_data_analysis/models"
  9. "eta/eta_data_analysis/services"
  10. "eta/eta_data_analysis/utils"
  11. "fmt"
  12. "github.com/chromedp/cdproto/network"
  13. "github.com/chromedp/chromedp"
  14. "io"
  15. "log"
  16. "net/http"
  17. "sync"
  18. "time"
  19. )
  20. type DataProcessor interface {
  21. FetchAndProcess() error
  22. GenerateRequestParams(string) map[string]string
  23. }
  24. type BaseProcessor struct {
  25. URL string
  26. }
  27. var (
  28. authorization string
  29. authLock sync.RWMutex
  30. )
  31. func (p *BaseProcessor) FetchAndProcess() error {
  32. // 获取当前时间 yyyy-MM-dd
  33. now := time.Now()
  34. currentTime := now.Format(utils.FormatDateUnSpace)
  35. // 请求参数
  36. params := p.GenerateRequestParams(currentTime)
  37. req, err := http.NewRequest("POST", p.URL, bytes.NewBufferString(params["params"]))
  38. if err != nil {
  39. return err
  40. }
  41. // 设置请求头
  42. req.Header.Set("Content-Type", "application/json")
  43. req.Header.Set("accept-language", "zh-CN,zh;q=0.9")
  44. authLock.RLock()
  45. req.Header.Set("Authorization", authorization)
  46. authLock.RUnlock()
  47. client := &http.Client{}
  48. resp, err := client.Do(req)
  49. if err != nil {
  50. return err
  51. }
  52. defer resp.Body.Close()
  53. body, err := io.ReadAll(resp.Body)
  54. if err != nil {
  55. return err
  56. }
  57. if checkResp(string(body)) {
  58. authLock.Lock()
  59. // 登录获取Authorization
  60. authorization, err = getAuthorizationByChrome()
  61. if err != nil {
  62. authLock.Unlock()
  63. return err
  64. }
  65. authLock.Unlock()
  66. // 重新请求
  67. req.Header.Set("Authorization", authorization)
  68. resp, err = client.Do(req)
  69. if err != nil {
  70. return err
  71. }
  72. }
  73. // 数据处理
  74. response, err := p.ProcessResponse(string(body))
  75. if err != nil {
  76. return err
  77. }
  78. utils.FileLog.Info(fmt.Sprintf("response: %v", response))
  79. // 请求lib应用入库
  80. paramsLib := make(map[string]interface{})
  81. paramsLib["List"] = response
  82. paramsLib["TerminalCode"] = utils.TerminalCode
  83. services.PostEdbLib(paramsLib, utils.LIB_ROUTE_FENWEI_NET_DATA_HANDLE)
  84. return nil
  85. }
  86. // resp响应参数检测 code or message 判断是否需要重新登录
  87. func checkResp(resp string) bool {
  88. if resp == "" {
  89. return true
  90. }
  91. var responseObj models.Response
  92. err := json.Unmarshal([]byte(resp), &responseObj)
  93. if err != nil {
  94. return false
  95. }
  96. if responseObj.Code != 200 || responseObj.Message != "成功!" {
  97. return true
  98. }
  99. return false
  100. }
  101. // GenerateRequestParams 让子类来实现这个方法
  102. func (p *BaseProcessor) GenerateRequestParams(string) map[string]string {
  103. return map[string]string{}
  104. }
  105. func (p *BaseProcessor) ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error) {
  106. return nil, nil
  107. }
  108. // GetAuthorizationByChrome 获取Authorization
  109. func getAuthorizationByChrome() (authorization string, err error) {
  110. // 读取Cookie
  111. if utils.FenweiNetUseName == "" {
  112. err = fmt.Errorf("汾渭账号未设置")
  113. return
  114. }
  115. if utils.FenweiNetPassword == "" {
  116. err = fmt.Errorf("汾渭密码未设置")
  117. return
  118. }
  119. opts := append(
  120. chromedp.DefaultExecAllocatorOptions[:],
  121. chromedp.Flag("headless", false),
  122. )
  123. allocCtx, cancel1 := chromedp.NewExecAllocator(context.Background(), opts...)
  124. defer cancel1()
  125. // 创建chrome实例
  126. ctx, cancel2 := chromedp.NewContext(
  127. allocCtx,
  128. chromedp.WithLogf(log.Printf),
  129. )
  130. defer cancel2()
  131. err = chromedp.Run(ctx,
  132. chromedp.Navigate(`https://www.sxcoal.com/`),
  133. chromedp.Click(`.pc_content__jO_mq`, chromedp.ByQuery),
  134. chromedp.Sleep(2*time.Second),
  135. chromedp.SetValue(`div.Sign_username__7eYwE input[type="text"]`, utils.FenweiNetUseName, chromedp.ByQuery),
  136. chromedp.SetValue(`div.Sign_password__dwxMn input[type="password"]`, utils.FenweiNetPassword, chromedp.ByQuery),
  137. chromedp.Sleep(2*time.Second),
  138. // 登录
  139. chromedp.Click(`.Button_text__D25sy`, chromedp.ByQuery),
  140. chromedp.ActionFunc(func(ctx context.Context) error {
  141. /*cookies, err := network.GetCookies().Do(ctx)
  142. if err != nil {
  143. return err
  144. }
  145. for _, v := range cookies {
  146. cookieStr = cookieStr + v.Name + "=" + v.Value + ";"
  147. }
  148. utils.FileLog.Info("header cookie:" + cookieStr)*/
  149. // 监听并处理请求事件
  150. chromedp.ListenTarget(ctx, func(ev interface{}) {
  151. if ev, ok := ev.(*network.EventRequestWillBeSent); ok {
  152. for _, header := range ev.Request.Headers {
  153. if authHeader, ok := header.(string); ok && authHeader == "Authorization" {
  154. authorization = authHeader
  155. utils.FileLog.Info("Authorization header found: " + authorization)
  156. }
  157. }
  158. }
  159. })
  160. return nil
  161. }),
  162. )
  163. return
  164. }