Browse Source

Merge branch 'bug/python_mongo' into debug

Roc 9 tháng trước cách đây
mục cha
commit
fced45c72d

+ 45 - 4
controllers/base_from_bloomberg.go

@@ -179,7 +179,7 @@ func (this *BloombergController) PCSGRefreshDaily() {
 	}
 
 	// 写入数据
-	if e = services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+	if e = services.PCSGWrite2BaseBloomberg(indexes, false); e != nil {
 		br.Msg = "刷新失败"
 		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
 		return
@@ -220,7 +220,7 @@ func (this *BloombergController) PCSGRefreshWeekly() {
 	}
 
 	// 写入数据
-	if e = services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+	if e = services.PCSGWrite2BaseBloomberg(indexes, false); e != nil {
 		br.Msg = "刷新失败"
 		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
 		return
@@ -261,7 +261,48 @@ func (this *BloombergController) PCSGRefreshMonthly() {
 	}
 
 	// 写入数据
-	if e = services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+	if e = services.PCSGWrite2BaseBloomberg(indexes, false); e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
+		return
+	}
+
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "操作成功"
+}
+
+// PCSGRefreshDailyRun3
+// @Title 中石油新加坡-刷新日度指标
+// @Description  中石油新加坡-刷新日度指标
+// @Success 200 {object} models.AddEdbInfoReq
+// @router /pcsg/refresh_daily_run3 [post]
+func (this *BloombergController) PCSGRefreshDailyRun3() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		if br.ErrMsg == "" {
+			br.IsSendEmail = false
+		}
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+
+	// 获取数据
+	indexes, e := services.GetPCSGBloombergDailyFromBridgeRun3()
+	if e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "Bridge-获取PCSG彭博日度指标失败, Err: " + e.Error()
+		return
+	}
+	if len(indexes) == 0 {
+		br.Ret = 200
+		br.Success = true
+		br.Msg = "操作成功"
+		return
+	}
+
+	// 写入数据
+	if e = services.PCSGWrite2BaseBloomberg(indexes, true); e != nil {
 		br.Msg = "刷新失败"
 		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
 		return
@@ -307,7 +348,7 @@ func (this *BloombergController) PCSGImportHistoryData() {
 	indexes = append(indexes, index)
 
 	// 写入数据
-	if e := services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+	if e := services.PCSGWrite2BaseBloomberg(indexes, req.IsVCode); e != nil {
 		br.Msg = "刷新失败"
 		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
 		return

+ 1 - 0
models/base_from_bloomberg.go

@@ -356,4 +356,5 @@ type BaseFromBloombergApiIndexData struct {
 type PCSGImportHistoryDataReq struct {
 	IndexCode string                `description:"指标编码"`
 	DataMap   map[time.Time]float64 `description:"数据日期/值"`
+	IsVCode   bool                  `description:"是否指标编码中间加V"`
 }

+ 9 - 0
routers/commentsRouter.go

@@ -106,6 +106,15 @@ func init() {
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
+        beego.ControllerComments{
+            Method: "PCSGRefreshDailyRun3",
+            Router: `/pcsg/refresh_daily_run3`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
         beego.ControllerComments{
             Method: "PCSGRefreshMonthly",

+ 68 - 4
services/base_from_pcsg.go

@@ -14,9 +14,10 @@ import (
 )
 
 var (
-	BridgeApiPCSGBloombergDailyUrl   = "/api/pcsg/bloomberg/daily_index"   // 日度指标API
-	BridgeApiPCSGBloombergWeeklyUrl  = "/api/pcsg/bloomberg/weekly_index"  // 周度指标API
-	BridgeApiPCSGBloombergMonthlyUrl = "/api/pcsg/bloomberg/monthly_index" // 月度指标API
+	BridgeApiPCSGBloombergDailyUrl     = "/api/pcsg/bloomberg/daily_index"      // 日度指标API
+	BridgeApiPCSGBloombergWeeklyUrl    = "/api/pcsg/bloomberg/weekly_index"     // 周度指标API
+	BridgeApiPCSGBloombergMonthlyUrl   = "/api/pcsg/bloomberg/monthly_index"    // 月度指标API
+	BridgeApiPCSGBloombergDailyRun3Url = "/api/pcsg/bloomberg/daily_index_run3" // 月度指标API
 )
 
 // GetPCSGBloombergDailyFromBridge 获取彭博日度指标
@@ -199,8 +200,68 @@ func GetPCSGBloombergMonthlyFromBridge() (indexes []models.BaseFromBloombergApiI
 	return
 }
 
+// GetPCSGBloombergDailyFromBridgeRun3 获取彭博日度指标
+func GetPCSGBloombergDailyFromBridgeRun3() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("GetPCSGBloombergDailyFromBridgeRun3-获取彭博日度指标失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergDailyRun3Url)
+	body := ioutil.NopCloser(strings.NewReader(""))
+	client := &http.Client{}
+	req, e := http.NewRequest("POST", url, body)
+	if e != nil {
+		err = fmt.Errorf("http create request err: %s", e.Error())
+		return
+	}
+
+	checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
+	contentType := "application/json;charset=utf-8"
+	req.Header.Set("Content-Type", contentType)
+	req.Header.Set("Authorization", checkToken)
+	resp, e := client.Do(req)
+	if e != nil {
+		err = fmt.Errorf("http client do err: %s", e.Error())
+		return
+	}
+	defer func() {
+		_ = resp.Body.Close()
+	}()
+	b, e := ioutil.ReadAll(resp.Body)
+	if e != nil {
+		err = fmt.Errorf("resp body read err: %s", e.Error())
+		return
+	}
+	if len(b) == 0 {
+		err = fmt.Errorf("resp body is empty")
+		return
+	}
+	// 生产环境解密
+	if utils.RunMode == "release" {
+		str := string(b)
+		str = strings.Trim(str, `"`)
+		b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
+	}
+
+	result := new(models.BridgePCSGBloombergResultData)
+	if e = json.Unmarshal(b, &result); e != nil {
+		err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
+		return
+	}
+	if result.Code != 200 {
+		err = fmt.Errorf("result: %s", string(b))
+		return
+	}
+	indexes = result.Data
+	return
+}
+
 // PCSGWrite2BaseBloomberg 写入彭博数据源
-func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData) (err error) {
+func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool) (err error) {
 	defer func() {
 		if err != nil {
 			tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error())
@@ -217,6 +278,9 @@ func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData)
 		if len(v.Data) == 0 {
 			continue
 		}
+		if isVCode {
+			v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, "V")
+		}
 
 		// 指标是否存在
 		index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode)

+ 2 - 2
services/base_from_python.go

@@ -172,7 +172,7 @@ func getPythonFileAbsolutePath(edbCode string) (pythonFile string, err error) {
 // getPythonFrontStr 获取python前面的代码
 func getPythonFrontStr() string {
 	//return "#!/usr/bin/python\n# -*- coding: UTF-8 -*-\nimport json\n\nimport pymysql\nimport pandas as pd\n\nsql_config = {\n    'host': 'rm-uf67kg347rhjfep5c1o.mysql.rds.aliyuncs.com',\n    'port': 3306,#主机号\n    'user': 'hz_technology',#账户名\n    'passwd': 'hongze@2021',#密码\n    'db': 'test_hz_data',\n    'charset': 'utf8mb4',\n    'cursorclass': pymysql.cursors.DictCursor\n}\n\ndb = pymysql.connect(**sql_config)\ndb.autocommit(1)\ncursor = db.cursor()\npandas_fetch_all = pd.read_sql\n\n# 返回数据\nresult = {}\n\n# 格式化返回数据\ndef format_data(data: pd.DataFrame,\n                index_str: str = \"data_time\",\n                value_str: str = \"value\"\n                ) -> pd.DataFrame:\n    \"\"\"\n        Parameters\n        ----------\n        data : pandas的DataFrame数据结构.\n        index_str : 对象下标字符串,在pandas的DataFrame中的列名.\n        value_str : 对象值字符串,在pandas的DataFrame中的列名\n\n        Returns\n        -------\n        DataFrame or Iterator[DataFrame]\n        例子:{'2007-01-09': 3220.0, '2007-01-10': 3230.0}\n\n        \"\"\"\n    index_list = []  # 空列表\n    value_list = []  # 空列表\n\n    for num in range(1, data.index.size):  # 迭代 所有的指标\n        index_list.append(data[index_str][num])\n        value_list.append(data[value_str][num])\n\n    tmp_data = {\n        \"date\": index_list,\n        \"value\": value_list\n    }\n    pd_data = pd.DataFrame(tmp_data)\n    # print(pd_data)\n    return pd_data\n"
-	str := fmt.Sprintf("#!/usr/bin/python\n# -*- coding: UTF-8 -*-\nimport json\n\nimport pymysql\nimport pandas as pd\nfrom pymongo import MongoClient\nimport pytz\n\nsql_config = {\n    'host': '%s',\n    'port': 3306,#主机号\n    'user': '%s',#账户名\n    'passwd': '%s',#密码\n    'db': '%s',\n    'charset': 'utf8mb4',\n    'cursorclass': pymysql.cursors.DictCursor\n}\n\ndb = pymysql.connect(**sql_config)\ndb.autocommit(1)\ncursor = db.cursor()\npandas_fetch_all = pd.read_sql\n\n", utils.PYTHON_MYSQL_HOST, utils.PYTHON_MYSQL_USER, utils.PYTHON_MYSQL_PASSWD, utils.PYTHON_MYSQL_DB)
+	str := fmt.Sprintf("#!/usr/bin/python\n# -*- coding: UTF-8 -*-\nimport json\n\nimport pymysql\nimport pandas as pd\n\nsql_config = {\n    'host': '%s',\n    'port': 3306,#主机号\n    'user': '%s',#账户名\n    'passwd': '%s',#密码\n    'db': '%s',\n    'charset': 'utf8mb4',\n    'cursorclass': pymysql.cursors.DictCursor\n}\n\ndb = pymysql.connect(**sql_config)\ndb.autocommit(1)\ncursor = db.cursor()\npandas_fetch_all = pd.read_sql\n\n", utils.PYTHON_MYSQL_HOST, utils.PYTHON_MYSQL_USER, utils.PYTHON_MYSQL_PASSWD, utils.PYTHON_MYSQL_DB)
 
 	// mongo部分
 	if utils.PYTHON_MONGO_HOST != `` {
@@ -191,7 +191,7 @@ func getPythonFrontStr() string {
 // @datetime 2024-05-07 18:38:03
 // @return string
 func getPythonFront2Str() string {
-	str := fmt.Sprintf("\nfrom pymongo import MongoClient\nfrom dateutil.tz import tzlocal\n# MongoDB 连接配置\nmongo_config = {\n    'host': '%s',  # 替换为你的 MongoDB 连接字符串\n    'database': '%s',  # 替换为你的数据库名\n    'collection': '%s',  # 替换为你的集合名\n    'auth_mechanism': '%s'  # 替换为你的认证机制\n}\n\n# 创建 MongoClient 并连接到数据库\nclient = MongoClient(mongo_config['host'], authMechanism=mongo_config['auth_mechanism'])\nmgo_db = client[mongo_config['database']]\ncollection = mgo_db[mongo_config['collection']]\n\n# 定义时区\nutc_tz = pytz.utc\nlocal_tz = tzlocal()  # 本地时区", utils.PYTHON_MONGO_HOST, utils.PYTHON_MONGO_DATABASE, "edb_data_business", utils.PYTHON_MONGO_AUTH_MECHANISM)
+	str := fmt.Sprintf("\nfrom pymongo import MongoClient\nimport pytz\nfrom dateutil.tz import tzlocal\n# MongoDB 连接配置\nmongo_config = {\n    'host': '%s',  # 替换为你的 MongoDB 连接字符串\n    'database': '%s',  # 替换为你的数据库名\n    'collection': '%s',  # 替换为你的集合名\n    'auth_mechanism': '%s'  # 替换为你的认证机制\n}\n\n# 创建 MongoClient 并连接到数据库\nclient = MongoClient(mongo_config['host'], authMechanism=mongo_config['auth_mechanism'])\nmgo_db = client[mongo_config['database']]\ncollection = mgo_db[mongo_config['collection']]\n\n# 定义时区\nutc_tz = pytz.utc\nlocal_tz = tzlocal()  # 本地时区", utils.PYTHON_MONGO_HOST, utils.PYTHON_MONGO_DATABASE, "edb_data_business", utils.PYTHON_MONGO_AUTH_MECHANISM)
 	return str
 }
 

+ 21 - 0
utils/common.go

@@ -1289,3 +1289,24 @@ func GenerateEdbCode(num int, pre string) (edbCode string, err error) {
 
 	return
 }
+
+// InsertStr2StrIdx 可分隔的字符串中插入指定字符串, 例如: CO1 Comdty插入V => CO1 V Comdty
+func InsertStr2StrIdx(str, sep string, idx int, value string) string {
+	str = strings.TrimSpace(str)
+	// 默认以空格作为分隔符
+	if sep == "" {
+		sep = " "
+	}
+	slice := strings.Split(str, sep)
+	if len(slice) < 2 {
+		return str
+	}
+
+	// 如果idx不在切片的有效范围内,直接返回原字符串
+	if idx < 0 || idx > len(slice) {
+		return str
+	}
+
+	slice = append(slice[:idx], append([]string{value}, slice[idx:]...)...)
+	return strings.Join(slice, sep)
+}