Browse Source

Merge branch 'master' into xxy_custom/bug_fix_redis

xyxie 11 months ago
parent
commit
6d4ff69bab

+ 273 - 0
controllers/base_from_bloomberg.go

@@ -0,0 +1,273 @@
+package controllers
+
+import (
+	"encoding/json"
+	"eta/eta_index_lib/logic"
+	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/services"
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"strconv"
+	"time"
+)
+
+// BloombergController Bloomberg
+type BloombergController struct {
+	BaseAuthController
+}
+
+// Add
+// @Title 新增彭博指标接口
+// @Description 新增彭博指标接口
+// @Success 200 {object} models.AddEdbInfoReq
+// @router /add [post]
+func (this *BloombergController) Add() {
+	br := new(models.BaseResponse).Init()
+	var cacheKey string
+	defer func() {
+		if br.ErrMsg == "" {
+			br.IsSendEmail = false
+		}
+		_ = utils.Rc.Delete(cacheKey)
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+	source := utils.DATA_SOURCE_BLOOMBERG
+	var req models.AddEdbInfoReq
+	err := json.Unmarshal(this.Ctx.Input.RequestBody, &req)
+	if err != nil {
+		br.Msg = "参数解析异常!"
+		br.ErrMsg = "参数解析失败,Err:" + err.Error()
+		return
+	}
+	if req.EdbCode == "" {
+		br.Msg = "请输入指标编码!"
+		br.ErrMsg = "请输入指标编码,指标编码为空"
+		return
+	}
+	if req.Source != source {
+		br.Msg = "数据源有误"
+		br.ErrMsg = fmt.Sprintf("数据源ID不匹配, Source: %d", req.Source)
+		return
+	}
+	cacheKey = utils.CACHE_EDB_DATA_ADD + strconv.Itoa(source) + "_" + req.EdbCode
+	if !utils.Rc.IsExist(cacheKey) {
+		utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
+		err = models.AddEdbDataFromBloomberg(req.EdbCode)
+		if err != nil {
+			br.Msg = "获取指标信息失败!"
+			br.ErrMsg = "获取指标信息失败 AddEdbDataFromBloomberg,Err:" + err.Error()
+			return
+		}
+		br.Ret = 200
+		br.Success = true
+		br.Msg = "获取成功"
+	} else {
+		br.Ret = 501
+		br.Success = true
+		br.Msg = "系统处理中,请稍后重试"
+	}
+}
+
+// Refresh
+// @Title 刷新彭博指标接口
+// @Description 刷新彭博指标接口
+// @Success 200 {object} models.RefreshEdbInfoReq
+// @router /refresh [post]
+func (this *BloombergController) Refresh() {
+	br := new(models.BaseResponse).Init()
+	var cacheKey string
+	defer func() {
+		if br.ErrMsg == "" {
+			br.IsSendEmail = false
+		}
+		_ = utils.Rc.Delete(cacheKey)
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+	source := utils.DATA_SOURCE_BLOOMBERG
+	var req models.RefreshEdbInfoReq
+	err := json.Unmarshal(this.Ctx.Input.RequestBody, &req)
+	if err != nil {
+		br.Msg = "参数解析异常!"
+		br.ErrMsg = "参数解析失败,Err:" + err.Error()
+		return
+	}
+	if req.EdbCode == "" {
+		br.Msg = "请输入指标编码!"
+		br.ErrMsg = "请输入指标编码,指标编码为空"
+		return
+	}
+	if req.EdbInfoId <= 0 {
+		br.Msg = "请输入指标ID!"
+		br.ErrMsg = "请输入指标ID"
+		return
+	}
+
+	// 获取指标详情
+	edbInfo, err := models.GetEdbInfoByEdbCode(source, req.EdbCode)
+	if err != nil {
+		br.Msg = "指标不存在!"
+		br.ErrMsg = "指标不存在"
+		return
+	}
+	cacheKey = utils.CACHE_EDB_DATA_REFRESH + strconv.Itoa(source) + "_" + req.EdbCode
+	if utils.Rc.IsExist(cacheKey) {
+		br.Ret = 501
+		br.Success = true
+		br.Msg = "系统处理中,请稍后重试"
+		return
+	}
+	dataUpdateTime := time.Now().Format(utils.FormatDateTime)
+	utils.Rc.SetNX(cacheKey, 1, 1*time.Minute)
+	err = models.RefreshEdbDataFromBloomberg(req.EdbInfoId, req.EdbCode, req.StartDate)
+	if err != nil && err.Error() != utils.ErrNoRow() {
+		br.Msg = "刷新指标信息失败!"
+		br.ErrMsg = "刷新指标信息失败 RefreshEdbDataFromBloomberg,Err:" + err.Error()
+		return
+	}
+
+	// 更新指标最大最小值
+	erDataUpdateDate, err, errMsg := models.UnifiedModifyEdbInfoMaxAndMinInfoDataUpdate(edbInfo, dataUpdateTime)
+	if err != nil {
+		br.Msg = errMsg
+		br.ErrMsg = err.Error()
+		return
+	}
+	// 添加指标刷新成功日志
+	if erDataUpdateDate != "" {
+		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 1, "", 0, 0)
+	} else {
+		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 2, "未刷新到数据", 0, 0)
+	}
+
+	// 更新ES
+	go logic.UpdateEs(edbInfo.EdbInfoId)
+
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "获取成功"
+}
+
+// PCSGRefreshDaily
+// @Title 中石油新加坡-刷新日度指标
+// @Description  中石油新加坡-刷新日度指标
+// @Success 200 {object} models.AddEdbInfoReq
+// @router /pcsg/refresh_daily [post]
+func (this *BloombergController) PCSGRefreshDaily() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		if br.ErrMsg == "" {
+			br.IsSendEmail = false
+		}
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+
+	// 获取数据
+	indexes, e := services.GetPCSGBloombergDailyFromBridge()
+	if e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "Bridge-获取PCSG彭博日度指标失败, Err: " + e.Error()
+		return
+	}
+	if len(indexes) == 0 {
+		br.Ret = 200
+		br.Success = true
+		br.Msg = "操作成功"
+		return
+	}
+
+	// 写入数据
+	if e = services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
+		return
+	}
+
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "操作成功"
+}
+
+// PCSGRefreshWeekly
+// @Title 中石油新加坡-刷新周度指标
+// @Description  中石油新加坡-刷新周度指标
+// @Success 200 {object} models.AddEdbInfoReq
+// @router /pcsg/refresh_weekly [post]
+func (this *BloombergController) PCSGRefreshWeekly() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		if br.ErrMsg == "" {
+			br.IsSendEmail = false
+		}
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+
+	// 获取数据
+	indexes, e := services.GetPCSGBloombergWeeklyFromBridge()
+	if e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "Bridge-获取PCSG彭博周度指标失败, Err: " + e.Error()
+		return
+	}
+	if len(indexes) == 0 {
+		br.Ret = 200
+		br.Success = true
+		br.Msg = "操作成功"
+		return
+	}
+
+	// 写入数据
+	if e = services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
+		return
+	}
+
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "操作成功"
+}
+
+// PCSGRefreshMonthly
+// @Title 中石油新加坡-刷新月度指标
+// @Description  中石油新加坡-刷新周度指标
+// @Success 200 {object} models.AddEdbInfoReq
+// @router /pcsg/refresh_monthly [post]
+func (this *BloombergController) PCSGRefreshMonthly() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		if br.ErrMsg == "" {
+			br.IsSendEmail = false
+		}
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+
+	// 获取数据
+	indexes, e := services.GetPCSGBloombergMonthlyFromBridge()
+	if e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "Bridge-获取PCSG彭博月度指标失败, Err: " + e.Error()
+		return
+	}
+	if len(indexes) == 0 {
+		br.Ret = 200
+		br.Success = true
+		br.Msg = "操作成功"
+		return
+	}
+
+	// 写入数据
+	if e = services.PCSGWrite2BaseBloomberg(indexes); e != nil {
+		br.Msg = "刷新失败"
+		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
+		return
+	}
+
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "操作成功"
+}

+ 347 - 0
models/base_from_bloomberg.go

@@ -0,0 +1,347 @@
+package models
+
+import (
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"github.com/beego/beego/v2/client/orm"
+	"strconv"
+	"strings"
+	"time"
+)
+
+// BaseFromBloombergIndex Bloomberg原始指标
+type BaseFromBloombergIndex struct {
+	BaseFromBloombergIndexId int       `orm:"column(base_from_bloomberg_index_id);pk"`
+	IndexCode                string    `description:"指标编码"`
+	IndexName                string    `description:"指标名称"`
+	Unit                     string    `description:"单位"`
+	Source                   int       `description:"来源"`
+	Frequency                string    `description:"频度"`
+	StartDate                time.Time `description:"开始时间"`
+	EndDate                  time.Time `description:"结束时间"`
+	Describe                 string    `description:"描述"`
+	Sort                     int       `description:"排序"`
+	IsStop                   int       `description:"是否停更:0-否;1-停更"`
+	EdbExist                 int       `description:"指标库是否已添加:0-否;1-是"`
+	TerminalCode             string    `description:"所属终端编码"`
+	FilePath                 string    `description:"文件存储路径"`
+	CreateTime               time.Time `description:"创建时间"`
+	ModifyTime               time.Time `description:"修改时间"`
+}
+
+func (m *BaseFromBloombergIndex) TableName() string {
+	return "base_from_bloomberg_index"
+}
+
+func (m *BaseFromBloombergIndex) Create() (err error) {
+	o := orm.NewOrm()
+	id, err := o.Insert(m)
+	if err != nil {
+		return
+	}
+	m.BaseFromBloombergIndexId = int(id)
+	return
+}
+
+func GetBaseFromBloombergIndexByCode(indexCode string) (item *BaseFromBloombergIndex, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_bloomberg_index WHERE index_code = ? LIMIT 1`
+	err = o.Raw(sql, indexCode).QueryRow(&item)
+	return
+}
+
+// BaseFromBloombergData Bloomberg原始数据
+type BaseFromBloombergData struct {
+	BaseFromBloombergDataId  int       `orm:"column(base_from_bloomberg_data_id);pk"`
+	BaseFromBloombergIndexId int       `description:"指标ID"`
+	IndexCode                string    `description:"指标编码"`
+	DataTime                 time.Time `description:"数据日期"`
+	Value                    float64   `description:"数据值"`
+	CreateTime               time.Time `description:"创建时间"`
+	ModifyTime               time.Time `description:"修改时间"`
+	DataTimestamp            int       `description:"数据日期时间戳"`
+}
+
+func (m *BaseFromBloombergData) TableName() string {
+	return "base_from_bloomberg_data"
+}
+
+func GetBaseFromBloombergDataByCondition(condition string, pars []interface{}) (items []*BaseFromBloombergData, err error) {
+	sub := `SELECT * FROM base_from_bloomberg_data WHERE 1=1  `
+	o := orm.NewOrm()
+	if condition != "" {
+		sub += condition
+	}
+	sql := `SELECT * FROM (` + sub + ` HAVING 1 ORDER BY modify_time DESC) tmp GROUP BY data_time ORDER BY data_time DESC `
+	_, err = o.Raw(sql, pars).QueryRows(&items)
+	return
+}
+
+func MultiInsertOrUpdateBaseFromBloombergData(inserts, updates []*BaseFromBloombergData) (err error) {
+	o := orm.NewOrm()
+	if len(inserts) > 0 {
+		_, e := o.InsertMulti(len(inserts), inserts)
+		if e != nil {
+			err = fmt.Errorf("insert multi err: %s", e.Error())
+			return
+		}
+	}
+	if len(updates) > 0 {
+		p, e := o.Raw("UPDATE base_from_bloomberg_data SET value = ?, modify_time = NOW() WHERE index_code = ? AND data_time = ?").Prepare()
+		if e != nil {
+			err = fmt.Errorf("prepare err: %s", e.Error())
+			return
+		}
+		defer func() {
+			_ = p.Close()
+		}()
+		for _, v := range updates {
+			_, e = p.Exec(v.Value, v.IndexCode, v.DataTime.Format(utils.FormatDate))
+			if e != nil {
+				err = fmt.Errorf("update err: %s", e.Error())
+				return
+			}
+		}
+	}
+	return
+}
+
+func GetBaseFromBloombergIndexMinMax(indexCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT MIN(data_time) AS min_date,MAX(data_time) AS max_date,MIN(value) AS min_value,MAX(value) AS max_value FROM base_from_bloomberg_data WHERE index_code = ? `
+	err = o.Raw(sql, indexCode).QueryRow(&item)
+	if err != nil {
+		return
+	}
+
+	// 获取最新值
+	var lastVal float64
+	sql = ` SELECT value AS latest_value FROM base_from_bloomberg_data WHERE index_code = ? ORDER BY data_time DESC LIMIT 1 `
+	err = o.Raw(sql, indexCode).QueryRow(&lastVal)
+	if err != nil {
+		return
+	}
+	item.LatestValue = lastVal
+	return
+}
+
+func ModifyBaseFromBloombergIndexMinMax(indexCode string, item *EdbInfoMaxAndMinInfo) (err error) {
+	o := orm.NewOrm()
+	sql := ` UPDATE base_from_bloomberg_index SET start_date = ?, end_date = ?, modify_time = NOW() WHERE index_code = ? `
+	_, err = o.Raw(sql, item.MinDate, item.MaxDate, indexCode).Exec()
+	return
+}
+
+type BloombergData struct {
+	InputValue float64 `orm:"column(value)" description:"值"`
+	DataTime   string  `orm:"column(data_time)" description:"日期"`
+}
+
+func GetBloombergDataByCondition(condition string, pars []interface{}) (item []*BloombergData, err error) {
+	sql1 := ` SELECT * FROM base_from_bloomberg_data WHERE 1=1  `
+	o := orm.NewOrm()
+	if condition != "" {
+		sql1 += condition
+	}
+	sql := `select * from (` + sql1 + ` having 1 order by modify_time DESC ) tmp GROUP BY data_time ORDER BY data_time DESC `
+	_, err = o.Raw(sql, pars).QueryRows(&item)
+	return
+}
+
+// AddEdbDataFromBloomberg 新增Bloomberg指标数据
+func AddEdbDataFromBloomberg(edbCode string) (err error) {
+	o := orm.NewOrm()
+
+	var condition string
+	var pars []interface{}
+
+	if edbCode != "" {
+		condition += " AND index_code = ? "
+		pars = append(pars, edbCode)
+	}
+
+	bloombergDataList, err := GetBloombergDataByCondition(condition, pars)
+	if err != nil {
+		return
+	}
+
+	dataLen := len(bloombergDataList)
+
+	existMap := make(map[string]string)
+	if dataLen > 0 {
+		var isAdd bool
+		addSql := ` INSERT INTO edb_data_bloomberg (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
+		for i := 0; i < dataLen; i++ {
+			item := bloombergDataList[i]
+			eDate := item.DataTime
+			sValue := utils.SubFloatToString(item.InputValue, 4)
+			if sValue != "" {
+				if _, ok := existMap[eDate]; !ok {
+					dataTime, err := time.ParseInLocation(utils.FormatDate, eDate, time.Local)
+					if err != nil {
+						return err
+					}
+					timestamp := dataTime.UnixNano() / 1e6
+					timeStr := fmt.Sprintf("%d", timestamp)
+					addSql += GetAddSql("0", edbCode, eDate, timeStr, sValue)
+					isAdd = true
+				}
+			}
+			existMap[eDate] = eDate
+		}
+		if isAdd {
+			addSql = strings.TrimRight(addSql, ",")
+			utils.FileLog.Info("addSql:" + addSql)
+			_, err = o.Raw(addSql).Exec()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return
+}
+
+// RefreshEdbDataFromBloomberg 刷新Bloomberg指标数据
+func RefreshEdbDataFromBloomberg(edbInfoId int, edbCode, startDate string) (err error) {
+	source := utils.DATA_SOURCE_BLOOMBERG
+	subSource := utils.DATA_SUB_SOURCE_EDB
+
+	o := orm.NewOrm()
+	if err != nil {
+		return
+	}
+	edbInfoIdStr := strconv.Itoa(edbInfoId)
+	//计算数据
+	var condition string
+	var pars []interface{}
+
+	if edbCode != "" {
+		condition += " AND index_code=? "
+		pars = append(pars, edbCode)
+	}
+
+	if startDate != "" {
+		condition += " AND data_time>=? "
+		pars = append(pars, startDate)
+	}
+
+	bloombergDataList, err := GetBloombergDataByCondition(condition, pars)
+	if err != nil {
+		return
+	}
+
+	// 真实数据的最大日期  , 插入规则配置的日期
+	var realDataMaxDate, edbDataInsertConfigDate time.Time
+	var edbDataInsertConfig *EdbDataInsertConfig
+	var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
+	{
+		edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
+		if err != nil && err.Error() != utils.ErrNoRow() {
+			return
+		}
+		if edbDataInsertConfig != nil {
+			edbDataInsertConfigDate = edbDataInsertConfig.Date
+		}
+	}
+
+	var existCondition string
+	var existPars []interface{}
+
+	existCondition += " AND edb_info_id=? "
+	existPars = append(existPars, edbInfoId)
+	if startDate != "" {
+		existCondition += " AND data_time>=? "
+		existPars = append(existPars, startDate)
+	}
+	//获取指标所有数据
+	existList, err := GetEdbDataByCondition(source, subSource, existCondition, existPars)
+	if err != nil {
+		return err
+	}
+	existMap := make(map[string]*EdbInfoSearchData)
+	for _, v := range existList {
+		existMap[v.DataTime] = v
+	}
+
+	addSql := ` INSERT INTO edb_data_bloomberg(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
+	var isAdd bool
+	addMap := make(map[string]string)
+	for _, v := range bloombergDataList {
+		item := v
+		eDate := item.DataTime
+		sValue := utils.SubFloatToString(item.InputValue, 4)
+
+		dataTime, err := time.ParseInLocation(utils.FormatDate, eDate, time.Local)
+		if err != nil {
+			return err
+		}
+		if findItem, ok := existMap[v.DataTime]; !ok {
+			if sValue != "" {
+				timestamp := dataTime.UnixNano() / 1e6
+				timeStr := fmt.Sprintf("%d", timestamp)
+				saveValue := sValue
+
+				if _, addOk := addMap[eDate]; !addOk {
+					addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, saveValue)
+					isAdd = true
+				}
+			}
+		} else {
+			if findItem != nil && utils.SubFloatToString(findItem.Value, 4) != sValue {
+				err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, sValue)
+				if err != nil {
+					return err
+				}
+			}
+		}
+		addMap[v.DataTime] = v.DataTime
+
+		// 下面代码主要目的是处理掉手动插入的数据判断
+		{
+			if realDataMaxDate.IsZero() || dataTime.After(realDataMaxDate) {
+				realDataMaxDate = dataTime
+			}
+			if edbDataInsertConfigDate.IsZero() || dataTime.Equal(edbDataInsertConfigDate) {
+				isFindConfigDateRealData = true
+			}
+		}
+	}
+
+	// 处理手工数据补充的配置
+	HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
+
+	if isAdd {
+		addSql = strings.TrimRight(addSql, ",")
+		_, err = o.Raw(addSql).Exec()
+		if err != nil {
+			return err
+		}
+	}
+	return
+}
+
+// BridgePCSGBloombergResultData 中石油新加坡-指标接口响应体
+type BridgePCSGBloombergResultData struct {
+	Code int                                `json:"code" description:"状态码"`
+	Msg  string                             `json:"msg" description:"提示信息"`
+	Data []BaseFromBloombergApiIndexAndData `json:"data" description:"返回数据"`
+}
+
+// BaseFromBloombergApiIndexAndData Bloomberg原始指标及数据
+type BaseFromBloombergApiIndexAndData struct {
+	BaseFromBloombergIndexId int                             `description:"指标ID"`
+	IndexCode                string                          `description:"指标编码"`
+	IndexName                string                          `description:"指标名称"`
+	Unit                     string                          `description:"单位"`
+	Source                   string                          `description:"来源"`
+	Frequency                string                          `description:"频度"`
+	CreateTime               time.Time                       `description:"创建时间"`
+	ModifyTime               time.Time                       `description:"修改时间"`
+	Data                     []BaseFromBloombergApiIndexData `description:"数据列表"`
+}
+
+// BaseFromBloombergApiIndexData Bloomberg原始指标数据
+type BaseFromBloombergApiIndexData struct {
+	DataTime time.Time `description:"数据日期"`
+	Value    float64   `description:"数据值"`
+}

+ 2 - 0
models/db.go

@@ -129,6 +129,8 @@ func initBaseIndex() {
 		new(BaseFromMtjhIndex),
 		new(BaseFromFenweiIndex),
 		new(BaseFromFenweiData),
+		new(BaseFromBloombergIndex),
+		new(BaseFromBloombergData),
 	)
 }
 

+ 45 - 0
routers/commentsRouter.go

@@ -79,6 +79,51 @@ func init() {
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
+        beego.ControllerComments{
+            Method: "Add",
+            Router: `/add`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
+        beego.ControllerComments{
+            Method: "PCSGRefreshDaily",
+            Router: `/pcsg/refresh_daily`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
+        beego.ControllerComments{
+            Method: "PCSGRefreshMonthly",
+            Router: `/pcsg/refresh_monthly`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
+        beego.ControllerComments{
+            Method: "PCSGRefreshWeekly",
+            Router: `/pcsg/refresh_weekly`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
+    beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:BloombergController"],
+        beego.ControllerComments{
+            Method: "Refresh",
+            Router: `/refresh`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_index_lib/controllers:CalculateController"] = append(beego.GlobalControllerRouter["eta/eta_index_lib/controllers:CalculateController"],
         beego.ControllerComments{
             Method: "Add",

+ 5 - 0
routers/router.go

@@ -244,6 +244,11 @@ func init() {
 				&controllers.EdbRefreshController{},
 			),
 		),
+		beego.NSNamespace("/bloomberg",
+			beego.NSInclude(
+				&controllers.BloombergController{},
+			),
+		),
 	)
 	beego.AddNamespace(ns)
 }

+ 313 - 0
services/base_from_pcsg.go

@@ -0,0 +1,313 @@
+package services
+
+import (
+	"encoding/json"
+	"eta/eta_index_lib/logic"
+	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/services/alarm_msg"
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strings"
+	"time"
+)
+
+var (
+	BridgeApiPCSGBloombergDailyUrl   = "/api/pcsg/bloomberg/daily_index"   // 日度指标API
+	BridgeApiPCSGBloombergWeeklyUrl  = "/api/pcsg/bloomberg/weekly_index"  // 周度指标API
+	BridgeApiPCSGBloombergMonthlyUrl = "/api/pcsg/bloomberg/monthly_index" // 月度指标API
+)
+
+// GetPCSGBloombergDailyFromBridge 获取彭博日度指标
+func GetPCSGBloombergDailyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("GetPCSGBloombergDailyFromBridge-获取彭博日度指标失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergDailyUrl)
+	body := ioutil.NopCloser(strings.NewReader(""))
+	client := &http.Client{}
+	req, e := http.NewRequest("POST", url, body)
+	if e != nil {
+		err = fmt.Errorf("http create request err: %s", e.Error())
+		return
+	}
+
+	checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
+	contentType := "application/json;charset=utf-8"
+	req.Header.Set("Content-Type", contentType)
+	req.Header.Set("Authorization", checkToken)
+	resp, e := client.Do(req)
+	if e != nil {
+		err = fmt.Errorf("http client do err: %s", e.Error())
+		return
+	}
+	defer func() {
+		_ = resp.Body.Close()
+	}()
+	b, e := ioutil.ReadAll(resp.Body)
+	if e != nil {
+		err = fmt.Errorf("resp body read err: %s", e.Error())
+		return
+	}
+	if len(b) == 0 {
+		err = fmt.Errorf("resp body is empty")
+		return
+	}
+	// 生产环境解密
+	if utils.RunMode == "release" {
+		str := string(b)
+		str = strings.Trim(str, `"`)
+		b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
+	}
+
+	result := new(models.BridgePCSGBloombergResultData)
+	if e = json.Unmarshal(b, &result); e != nil {
+		err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
+		return
+	}
+	if result.Code != 200 {
+		err = fmt.Errorf("result: %s", string(b))
+		return
+	}
+	indexes = result.Data
+	return
+}
+
+// GetPCSGBloombergWeeklyFromBridge 获取彭博周度指标
+func GetPCSGBloombergWeeklyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("GetPCSGBloombergWeeklyFromBridge-获取彭博周度指标失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergWeeklyUrl)
+	body := ioutil.NopCloser(strings.NewReader(""))
+	client := &http.Client{}
+	req, e := http.NewRequest("POST", url, body)
+	if e != nil {
+		err = fmt.Errorf("http create request err: %s", e.Error())
+		return
+	}
+
+	checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
+	contentType := "application/json;charset=utf-8"
+	req.Header.Set("Content-Type", contentType)
+	req.Header.Set("Authorization", checkToken)
+	resp, e := client.Do(req)
+	if e != nil {
+		err = fmt.Errorf("http client do err: %s", e.Error())
+		return
+	}
+	defer func() {
+		_ = resp.Body.Close()
+	}()
+	b, e := ioutil.ReadAll(resp.Body)
+	if e != nil {
+		err = fmt.Errorf("resp body read err: %s", e.Error())
+		return
+	}
+	if len(b) == 0 {
+		err = fmt.Errorf("resp body is empty")
+		return
+	}
+	// 生产环境解密
+	if utils.RunMode == "release" {
+		str := string(b)
+		str = strings.Trim(str, `"`)
+		b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
+	}
+
+	result := new(models.BridgePCSGBloombergResultData)
+	if e = json.Unmarshal(b, &result); e != nil {
+		err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
+		return
+	}
+	if result.Code != 200 {
+		err = fmt.Errorf("result: %s", string(b))
+		return
+	}
+	indexes = result.Data
+	return
+}
+
+// GetPCSGBloombergMonthlyFromBridge 获取彭博月度指标
+func GetPCSGBloombergMonthlyFromBridge() (indexes []models.BaseFromBloombergApiIndexAndData, err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("GetPCSGBloombergMonthlyFromBridge-获取彭博月度指标失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	url := fmt.Sprint(utils.EtaBridgeUrl, BridgeApiPCSGBloombergMonthlyUrl)
+	body := ioutil.NopCloser(strings.NewReader(""))
+	client := &http.Client{}
+	req, e := http.NewRequest("POST", url, body)
+	if e != nil {
+		err = fmt.Errorf("http create request err: %s", e.Error())
+		return
+	}
+
+	checkToken := utils.MD5(utils.EtaBridgeAppNameEn + utils.EtaBridgeMd5Key)
+	contentType := "application/json;charset=utf-8"
+	req.Header.Set("Content-Type", contentType)
+	req.Header.Set("Authorization", checkToken)
+	resp, e := client.Do(req)
+	if e != nil {
+		err = fmt.Errorf("http client do err: %s", e.Error())
+		return
+	}
+	defer func() {
+		_ = resp.Body.Close()
+	}()
+	b, e := ioutil.ReadAll(resp.Body)
+	if e != nil {
+		err = fmt.Errorf("resp body read err: %s", e.Error())
+		return
+	}
+	if len(b) == 0 {
+		err = fmt.Errorf("resp body is empty")
+		return
+	}
+	// 生产环境解密
+	if utils.RunMode == "release" {
+		str := string(b)
+		str = strings.Trim(str, `"`)
+		b = utils.DesBase64Decrypt([]byte(str), utils.EtaBridgeDesKey)
+	}
+
+	result := new(models.BridgePCSGBloombergResultData)
+	if e = json.Unmarshal(b, &result); e != nil {
+		err = fmt.Errorf("result unmarshal err: %s\nresult: %s", e.Error(), string(b))
+		return
+	}
+	if result.Code != 200 {
+		err = fmt.Errorf("result: %s", string(b))
+		return
+	}
+	indexes = result.Data
+	return
+}
+
+// PCSGWrite2BaseBloomberg 写入彭博数据源
+func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error())
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	for _, v := range indexes {
+		if v.IndexCode == "" {
+			continue
+		}
+
+		// 指标是否存在
+		index, e := models.GetBaseFromBloombergIndexByCode(v.IndexCode)
+		if e != nil && e.Error() != utils.ErrNoRow() {
+			err = fmt.Errorf("获取Bloomberg原始指标失败, err: %s", e.Error())
+			return
+		}
+
+		// 新增指标
+		if index == nil {
+			newIndex := new(models.BaseFromBloombergIndex)
+			newIndex.IndexCode = v.IndexCode
+			newIndex.IndexName = v.IndexName
+			newIndex.Unit = v.Unit
+			newIndex.Source = utils.DATA_SOURCE_BLOOMBERG
+			newIndex.Frequency = v.Frequency
+			newIndex.CreateTime = time.Now().Local()
+			newIndex.ModifyTime = time.Now().Local()
+			if e = newIndex.Create(); e != nil {
+				err = fmt.Errorf("新增Bloomberg原始指标失败, err: %s", e.Error())
+				return
+			}
+			index = newIndex
+		}
+
+		// 更新指标数据
+		var cond string
+		var pars []interface{}
+		cond += ` AND index_code = ? `
+		pars = append(pars, v.IndexCode)
+		indexData, e := models.GetBaseFromBloombergDataByCondition(cond, pars)
+		if e != nil {
+			err = fmt.Errorf("获取Bloomberg历史数据失败, err: %s", e.Error())
+			return
+		}
+		dateExist := make(map[string]*models.BaseFromBloombergData)
+		if len(indexData) > 0 {
+			for _, d := range indexData {
+				strDate := d.DataTime.Format(utils.FormatDate)
+				dateExist[strDate] = d
+			}
+		}
+
+		// 筛选新增/更新数据
+		updateData := make([]*models.BaseFromBloombergData, 0)
+		insertData := make([]*models.BaseFromBloombergData, 0)
+		for _, d := range v.Data {
+			strDate := d.DataTime.Format(utils.FormatDate)
+			originData := dateExist[strDate]
+			if originData != nil {
+				if utils.FloatAlmostEqual(originData.Value, d.Value) {
+					continue
+				}
+				originData.Value = d.Value
+				originData.ModifyTime = time.Now().Local()
+				updateData = append(updateData, originData)
+			} else {
+				newData := new(models.BaseFromBloombergData)
+				newData.BaseFromBloombergIndexId = index.BaseFromBloombergIndexId
+				newData.IndexCode = index.IndexCode
+				newData.DataTime = d.DataTime
+				newData.Value = d.Value
+				newData.CreateTime = time.Now()
+				newData.ModifyTime = time.Now()
+				timestamp := d.DataTime.UnixNano() / 1e6
+				newData.DataTimestamp = int(timestamp)
+				insertData = append(insertData, newData)
+			}
+		}
+		if e = models.MultiInsertOrUpdateBaseFromBloombergData(insertData, updateData); e != nil {
+			err = fmt.Errorf("新增/更新Bloomberg指标数据失败, err: %s", e.Error())
+			return
+		}
+
+		// 更新指标开始结束时间
+		minMax, e := models.GetBaseFromBloombergIndexMinMax(index.IndexCode)
+		if e == nil && minMax != nil {
+			e = models.ModifyBaseFromBloombergIndexMinMax(index.IndexCode, minMax)
+			if e != nil {
+				err = fmt.Errorf("更新Bloomberg开始结束时间失败, err: %s", e.Error())
+				return
+			}
+		}
+
+		// 同步刷新指标库
+		go func() {
+			edb, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BLOOMBERG, index.IndexCode)
+			if e != nil && e.Error() != utils.ErrNoRow() {
+				utils.FileLog.Info("获取Bloomberg指标库信息失败, err: " + e.Error())
+				return
+			}
+			if edb != nil {
+				logic.RefreshBaseEdbInfo(edb, ``)
+			}
+		}()
+	}
+	return
+}

+ 6 - 0
utils/common.go

@@ -1218,3 +1218,9 @@ func DealExcelDate(date string) (newDate time.Time, err error) {
 	}
 	return
 }
+
+// FloatAlmostEqual 判断两个浮点数是否相等
+func FloatAlmostEqual(a, b float64) bool {
+	epsilon := 1e-9 // 容差值
+	return math.Abs(a-b) <= epsilon
+}

+ 1 - 0
utils/constants.go

@@ -108,6 +108,7 @@ const (
 	DATA_SOURCE_MTJH          = 80 // 煤炭江湖->80
 	DATA_SOURCE_CALCULATE_SUM = 81
 	DATA_SOURCE_CALCULATE_AVG = 82
+	DATA_SOURCE_BLOOMBERG     = 83 // bloomberg彭博数据
 )
 
 // 指标来源的中文展示