package kpler import ( "eta/eta_data_analysis/models" "eta/eta_data_analysis/services/alarm_msg" "eta/eta_data_analysis/utils" "fmt" "time" "github.com/patrickmn/go-cache" ) func GetProducts(req models.KplerProductLibReq) (products []models.KplerProduct, err error) { token, err := GetKplerAccessToken(false) if err != nil { return nil, err } products, err = GetProductsByApi(req, token) if err != nil { if err.Error() == "Unauthorized" { token, err = GetKplerAccessToken(true) if err != nil { return } products, err = GetProductsByApi(req, token) if err != nil { return } return } return } return } // token := "" func GetKplerData(req models.KplerFlowDataLibReq) (ret *models.KplerFlowDataResp, err error) { // token := "" // flowDirection := "import" // granularity := "monthly" // split := "Destination%20Countries" // withIntraRegion := "true" // startDate := "2024-01-01" // endDate := "2025-06-30" // unit := "kbd" // products := "CPC%20Russia,Eastern%20Russia%20Crude,Western%20Russia%20Crude" // fromZones := "" // toZones := "" // onlyRealized := "true" // req = models.KplerFlowDataLibReq{ // Granularity: granularity, // Split: split, // Unit: unit, // FlowDirection: flowDirection, // FromZones: fromZones, // ToZones: toZones, // OnlyRealized: onlyRealized, // WithIntraRegion: withIntraRegion, // StartDate: startDate, // EndDate: endDate, // Products: products, // } token, err := GetKplerAccessToken(false) if err != nil { return nil, err } ret, err = GetKplerDataByApi(req, token) if err != nil { fmt.Println("GetKplerDataByApi error", err) if err.Error() == "Unauthorized" { token, err = GetKplerAccessToken(true) if err != nil { err = fmt.Errorf("获取开普勒API-AccessToken失败, %v", err) return } ret, err = GetKplerDataByApi(req, token) if err != nil { fmt.Println("GetKplerDataByApi error", err) return nil, err } return } return nil, err } return } // GetKplerAccessToken 获取登录凭证 func GetKplerAccessToken(forceRefresh bool) (token string, err error) { defer func() { if err != nil { go alarm_msg.SendAlarmMsg("获取开普勒的登录凭证失败,ERR:"+err.Error(), 3) } }() redisKey := "kpler_access_token" cacheClient := utils.CacheClient tokenTmp, ok := cacheClient.Get(redisKey) //如果从redis中accessToken 获取失败或者token为空了,再或者需要强制刷新了,那么重新获取accessToken if !ok || forceRefresh { token, err = refreshKplerAccessToken(cacheClient,redisKey) return } fmt.Println("tokenTmp", tokenTmp) if tokenTmp == nil { token, err = refreshKplerAccessToken(cacheClient,redisKey) return } token = tokenTmp.(string) return } // refreshKplerAccessToken 强制刷新获取登录凭证 func refreshKplerAccessToken(cacheClient *cache.Cache, redisKey string) (token string, err error) { defer func() { if err != nil { go alarm_msg.SendAlarmMsg("获取开普勒的登录凭证失败;ERR:"+err.Error(), 3) } }() token, tmpErr := login() if tmpErr != nil { err = tmpErr return } expireTime := time.Now().Add(time.Hour * 24 * 30) //token存入redis //err = utils.Rc.Put(tokenRedisKey, token, time.Duration(expireTime.Unix()-600)*time.Second) // 本来是要设置下600s的过期时间,但因为不是强制刷新token,就不获取了 cacheClient.Set(redisKey, token, time.Duration(expireTime.Unix())*time.Second) if err != nil { err = fmt.Errorf("获取开普勒的登录凭证成功;开普勒登录凭证存入redis失败,ERR:%s", err.Error()) return } return }