data_processor.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. // Package fenwei
  2. // @Author gmy 2024/8/20 14:47:00
  3. package fenwei
  4. import (
  5. "bytes"
  6. "context"
  7. "encoding/json"
  8. "eta/eta_data_analysis/models"
  9. "eta/eta_data_analysis/utils"
  10. "fmt"
  11. "github.com/chromedp/cdproto/network"
  12. "github.com/chromedp/chromedp"
  13. "io"
  14. "io/ioutil"
  15. "log"
  16. "net/http"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. type DataProcessor interface {
  22. FetchAndProcess(DataProcessor) error
  23. GenerateRequestParams(currentTime string) map[string]string
  24. ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error)
  25. }
  26. type BaseProcessor struct {
  27. URL string
  28. }
  29. var (
  30. authorization string
  31. authLock sync.RWMutex
  32. )
  33. func (p *BaseProcessor) FetchAndProcess(processor DataProcessor) error {
  34. // 获取当前时间 yyyy-MM-dd
  35. now := time.Now()
  36. currentTime := now.Format(utils.FormatDateUnSpace)
  37. // 请求参数
  38. params := processor.GenerateRequestParams(currentTime)
  39. // 保存请求参数
  40. originalRequestBody := params["params"] // 保存原始请求数据
  41. requestBody := bytes.NewBufferString(originalRequestBody)
  42. req, err := http.NewRequest("POST", p.URL, requestBody)
  43. if err != nil {
  44. return err
  45. }
  46. // 设置请求头
  47. req.Header.Set("Content-Type", "application/json")
  48. req.Header.Set("accept-language", "zh-CN,zh;q=0.9")
  49. authLock.RLock()
  50. req.Header.Set("Authorization", authorization)
  51. authLock.RUnlock()
  52. client := &http.Client{}
  53. resp, err := client.Do(req)
  54. if err != nil {
  55. return err
  56. }
  57. defer resp.Body.Close()
  58. body, err := io.ReadAll(resp.Body)
  59. if err != nil {
  60. return err
  61. }
  62. if checkResp(string(body)) {
  63. authLock.Lock()
  64. // 登录获取Authorization
  65. authorization, err = getAuthorizationByChrome()
  66. if err != nil {
  67. authLock.Unlock()
  68. return err
  69. }
  70. authLock.Unlock()
  71. // 重新创建请求对象
  72. requestBody = bytes.NewBufferString(originalRequestBody)
  73. req, err = http.NewRequest("POST", p.URL, requestBody)
  74. if err != nil {
  75. return err
  76. }
  77. // 重新设置请求头
  78. req.Header.Set("Content-Type", "application/json")
  79. req.Header.Set("accept-language", "zh-CN,zh;q=0.9")
  80. req.Header.Set("Authorization", authorization)
  81. // 重新请求
  82. resp, err = client.Do(req)
  83. if err != nil {
  84. return err
  85. }
  86. defer resp.Body.Close()
  87. body, err = io.ReadAll(resp.Body)
  88. if err != nil {
  89. return err
  90. }
  91. }
  92. // 数据处理
  93. response, err := processor.ProcessResponse(string(body))
  94. if err != nil {
  95. return err
  96. }
  97. log.Printf("response size: %v", len(response))
  98. utils.FileLog.Info(fmt.Sprintf("response: %v", response))
  99. // 请求lib应用入库
  100. paramsLib := make(map[string]interface{})
  101. paramsLib["List"] = response
  102. paramsLib["TerminalCode"] = utils.TerminalCode
  103. postEdbLib, err := postEdbLib(paramsLib, utils.LIB_ROUTE_FENWEI_NET_DATA_HANDLE)
  104. if err != nil {
  105. // 有错误就不继续执行
  106. log.Printf("postEdbLib err: %v", err)
  107. return err
  108. }
  109. log.Printf("postEdbLib size: %v", len(postEdbLib))
  110. utils.FileLog.Info(fmt.Sprintf("postEdbLib: %v", string(postEdbLib)))
  111. return nil
  112. }
  113. // PostEdbLib 调用指标接口
  114. func postEdbLib(param map[string]interface{}, method string) (result []byte, err error) {
  115. postUrl := utils.EDB_LIB_URL + method
  116. postData, err := json.Marshal(param)
  117. if err != nil {
  118. return
  119. }
  120. result, err = httpPost(postUrl, string(postData), "application/json")
  121. if err != nil {
  122. return
  123. }
  124. return
  125. }
  126. func httpPost(url, postData string, params ...string) ([]byte, error) {
  127. fmt.Println("HttpPost Url:" + url)
  128. body := ioutil.NopCloser(strings.NewReader(postData))
  129. client := &http.Client{}
  130. req, err := http.NewRequest("POST", url, body)
  131. if err != nil {
  132. return nil, err
  133. }
  134. contentType := "application/x-www-form-urlencoded;charset=utf-8"
  135. if len(params) > 0 && params[0] != "" {
  136. contentType = params[0]
  137. }
  138. req.Header.Set("Content-Type", contentType)
  139. req.Header.Set("authorization", utils.MD5(utils.APP_EDB_LIB_NAME_EN+utils.EDB_LIB_Md5_KEY))
  140. resp, err := client.Do(req)
  141. if err != nil {
  142. fmt.Println("client.Do err:" + err.Error())
  143. return nil, err
  144. }
  145. defer resp.Body.Close()
  146. b, err := ioutil.ReadAll(resp.Body)
  147. if err != nil {
  148. fmt.Println("HttpPost:" + string(b))
  149. }
  150. return b, err
  151. }
  152. // resp响应参数检测 code or message 判断是否需要重新登录
  153. func checkResp(resp string) bool {
  154. if resp == "" {
  155. return true
  156. }
  157. var responseObj models.Response
  158. err := json.Unmarshal([]byte(resp), &responseObj)
  159. if err != nil {
  160. return false
  161. }
  162. if responseObj.Code != 200 || responseObj.Message != "成功!" {
  163. return true
  164. }
  165. return false
  166. }
  167. // GenerateRequestParams 让子类来实现这个方法
  168. func (p *BaseProcessor) GenerateRequestParams(currentTime string) map[string]string {
  169. return map[string]string{}
  170. }
  171. func (p *BaseProcessor) ProcessResponse(data string) ([]models.FenWeiNetIndexInfo, error) {
  172. return nil, nil
  173. }
  174. // GetAuthorizationByChrome 获取Authorization
  175. func getAuthorizationByChrome() (authorization string, err error) {
  176. // 检查账号和密码是否设置
  177. if utils.FenweiNetUseName == "" {
  178. return "", fmt.Errorf("汾渭账号未设置")
  179. }
  180. if utils.FenweiNetPassword == "" {
  181. return "", fmt.Errorf("汾渭密码未设置")
  182. }
  183. opts := append(
  184. chromedp.DefaultExecAllocatorOptions[:],
  185. chromedp.Flag("headless", false),
  186. )
  187. allocCtx, cancel1 := chromedp.NewExecAllocator(context.Background(), opts...)
  188. defer cancel1()
  189. // 创建chrome实例
  190. ctx, cancel2 := chromedp.NewContext(
  191. allocCtx,
  192. chromedp.WithLogf(log.Printf),
  193. )
  194. defer cancel2()
  195. // 提前设置监听器
  196. authorizationChan := make(chan string, 1) // 使用channel来确保监听到的Authorization可以安全传递
  197. chromedp.ListenTarget(ctx, func(ev interface{}) {
  198. if ev, ok := ev.(*network.EventRequestWillBeSent); ok {
  199. if authHeader, found := ev.Request.Headers["Authorization"]; found {
  200. if authStr, ok := authHeader.(string); ok {
  201. select {
  202. case authorizationChan <- authStr: // 将Authorization放入channel
  203. default:
  204. }
  205. utils.FileLog.Info("Authorization header found: " + authStr)
  206. }
  207. }
  208. }
  209. })
  210. // 运行浏览器操作
  211. err = chromedp.Run(ctx,
  212. chromedp.Navigate(`https://www.sxcoal.com/`),
  213. chromedp.Click(`.pc_content__jO_mq`, chromedp.ByQuery),
  214. chromedp.Sleep(2*time.Second),
  215. chromedp.SetValue(`div.Sign_username__7eYwE input[type="text"]`, utils.FenweiNetUseName, chromedp.ByQuery),
  216. chromedp.SetValue(`div.Sign_password__dwxMn input[type="password"]`, utils.FenweiNetPassword, chromedp.ByQuery),
  217. chromedp.Sleep(2*time.Second),
  218. // 定位并点击指定按钮(class属性为 'Button_btn__xbZjp Button_black__X_jwF Button_mediu__ZVHO_',并且 span 标签内容为 '登录')
  219. chromedp.Click(`//button[contains(@class, 'Button_btn__xbZjp') and contains(@class, 'Button_black__X_jwF') and contains(@class, 'Button_mediu__ZVHO_')]/span[text()='登录']`, chromedp.BySearch),
  220. // 添加延迟,确保捕获到请求头
  221. chromedp.Sleep(8*time.Second),
  222. )
  223. // 从channel中获取authorization
  224. select {
  225. case authorization = <-authorizationChan:
  226. case <-time.After(10 * time.Second): // 超时时间,可以根据实际情况调整
  227. err = fmt.Errorf("未能获取到Authorization")
  228. }
  229. return
  230. }