Browse Source

Merge branch 'feature/jiayue_index'

hsun 1 year ago
parent
commit
96b20e372e

+ 1 - 1
.gitignore

@@ -1,6 +1,6 @@
 /.idea
 /eta_task.exe
-/conf/app.conf
+/conf
 /binlog
 /rdlucklog
 /etalogs

+ 22 - 6
models/data_manage/edb_data_base.go

@@ -1,14 +1,32 @@
 package data_manage
 
 import (
-	"errors"
 	"eta/eta_task/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
-	"strconv"
 	"time"
 )
 
+var (
+	EdbDataTableNameMap     map[int]string // 指标来源对应数据表名
+	EdbDataRefreshMethodMap map[int]string // 指标来源对应的刷新指标方法
+)
+
+// InitEdbSourceVar 初始化时加载指标来源对应信息, 避免循环中查库, 注意edb_source表修改table_name的话需要重启服务
+func InitEdbSourceVar() {
+	EdbDataTableNameMap = make(map[int]string)
+	EdbDataRefreshMethodMap = make(map[int]string)
+	sources, e := GetEdbSourceItemsByCondition(``, make([]interface{}, 0), []string{}, "")
+	if e != nil {
+		utils.FileLog.Info("init source table err: %s", e.Error())
+		return
+	}
+	for _, v := range sources {
+		EdbDataTableNameMap[v.EdbSourceId] = v.TableName
+		EdbDataRefreshMethodMap[v.EdbSourceId] = v.EdbRefreshMethod
+	}
+}
+
 func GetEdbDataTableName(source int) (tableName string) {
 	switch source {
 	case utils.DATA_SOURCE_THS:
@@ -96,8 +114,7 @@ func GetEdbDataTableName(source int) (tableName string) {
 	case utils.DATA_SOURCE_PREDICT_CALCULATE_BP:
 		tableName = "edb_data_predict_calculate_bp"
 	default:
-		tableName = ""
-		errors.New("无效的渠道:" + strconv.Itoa(source))
+		tableName = EdbDataTableNameMap[source] // 没有对应的从edb_source中取
 		return
 	}
 	return
@@ -122,8 +139,7 @@ func GetEdbInfoCalculateTableName(source int) (tableName string) {
 	case utils.DATA_SOURCE_CALCULATE_BP:
 		tableName = "edb_info_calculate_bp"
 	default:
-		tableName = ""
-		errors.New("无效的渠道:" + strconv.Itoa(source))
+		tableName = EdbDataTableNameMap[source] // 没有对应的从edb_source中取
 		return
 	}
 	return

+ 44 - 0
models/data_manage/edb_source.go

@@ -0,0 +1,44 @@
+package data_manage
+
+import (
+	"fmt"
+	"github.com/beego/beego/v2/client/orm"
+	"strings"
+)
+
+// EdbSource 指标来源表
+type EdbSource struct {
+	EdbSourceId      int    `orm:"column(edb_source_id);pk"`
+	SourceName       string `description:"指标来源名称"`
+	TableName        string `description:"数据表名"`
+	EdbAddMethod     string `description:"指标新增接口"`
+	EdbRefreshMethod string `description:"指标刷新接口"`
+	IsBase           int    `description:"是否为基础指标: 0-否; 1-是"`
+	FromBridge       int    `description:"是否来源于桥接服务: 0-否; 1-是"`
+	BridgeFlag       string `description:"桥接服务对象标识"`
+	SourceExtend     string `description:"扩展字段做查询用"`
+}
+
+// GetEdbSourceItemsByCondition 获取指标来源列表
+func GetEdbSourceItemsByCondition(condition string, pars []interface{}, fieldArr []string, orderRule string) (items []*EdbSource, err error) {
+	o := orm.NewOrmUsingDB("data")
+	fields := strings.Join(fieldArr, ",")
+	if len(fieldArr) == 0 {
+		fields = `*`
+	}
+	order := `ORDER BY edb_source_id ASC`
+	if orderRule != "" {
+		order = ` ORDER BY ` + orderRule
+	}
+	sql := fmt.Sprintf(`SELECT %s FROM edb_source WHERE 1=1 %s %s`, fields, condition, order)
+	_, err = o.Raw(sql, pars).QueryRows(&items)
+	return
+}
+
+// GetEdbSourceItemByCondition 获取指标来源
+func GetEdbSourceItemByCondition(condition string, pars []interface{}) (item *EdbSource, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := fmt.Sprintf(`SELECT * FROM edb_source WHERE 1=1 %s`, condition)
+	err = o.Raw(sql, pars).QueryRow(&item)
+	return
+}

+ 8 - 7
models/db.go

@@ -19,7 +19,7 @@ func init() {
 	db, _ := orm.GetDB("default")
 	db.SetConnMaxLifetime(10 * time.Minute)
 
-	if utils.MYSQL_URL_RDDP != ""{
+	if utils.MYSQL_URL_RDDP != "" {
 		_ = orm.RegisterDataBase("rddp", "mysql", utils.MYSQL_URL_RDDP)
 		orm.SetMaxIdleConns("rddp", 50)
 		orm.SetMaxOpenConns("rddp", 100)
@@ -28,7 +28,7 @@ func init() {
 		report_db.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-	if utils.MYSQL_URL_EDB != ""{
+	if utils.MYSQL_URL_EDB != "" {
 		_ = orm.RegisterDataBase("edb", "mysql", utils.MYSQL_URL_EDB)
 		orm.SetMaxIdleConns("edb", 50)
 		orm.SetMaxOpenConns("edb", 100)
@@ -37,8 +37,7 @@ func init() {
 		edb_db.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-
-	if utils.MYSQL_URL_DATA != ""{
+	if utils.MYSQL_URL_DATA != "" {
 		_ = orm.RegisterDataBase("data", "mysql", utils.MYSQL_URL_DATA)
 		orm.SetMaxIdleConns("data", 50)
 		orm.SetMaxOpenConns("data", 100)
@@ -48,7 +47,7 @@ func init() {
 
 	}
 
-	if utils.MYSQL_URL_GL != ""{
+	if utils.MYSQL_URL_GL != "" {
 		_ = orm.RegisterDataBase("gl", "mysql", utils.MYSQL_URL_GL)
 		orm.SetMaxIdleConns("gl", 50)
 		orm.SetMaxOpenConns("gl", 100)
@@ -57,7 +56,7 @@ func init() {
 		gl.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-	if utils.MYSQL_URL_ETA != ""{
+	if utils.MYSQL_URL_ETA != "" {
 		_ = orm.RegisterDataBase("eta", "mysql", utils.MYSQL_URL_ETA)
 		orm.SetMaxIdleConns("eta", 50)
 		orm.SetMaxOpenConns("eta", 100)
@@ -66,7 +65,6 @@ func init() {
 		etaDb.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-
 	orm.Debug = true
 	orm.DebugLog = orm.NewLog(utils.Binlog)
 
@@ -81,6 +79,9 @@ func init() {
 
 	// 智能研报数据表
 	initSmartReport()
+
+	// 初始化部分数据表变量(直接init会有顺序问题=_=!)
+	data_manage.InitEdbSourceVar()
 }
 
 // initEdbDataTable 注册Edb指标 数据表

+ 7 - 0
services/data/base_edb_lib.go

@@ -3,6 +3,7 @@ package data
 import (
 	"encoding/json"
 	"eta/eta_task/models"
+	"eta/eta_task/models/data_manage"
 	"eta/eta_task/utils"
 	"fmt"
 	"io/ioutil"
@@ -16,6 +17,7 @@ func RefreshEdbData(edbInfoId, source int, edbCode, startDate string) (resp *mod
 	param["EdbCode"] = edbCode
 	param["EdbInfoId"] = edbInfoId
 	param["StartDate"] = startDate
+	param["Source"] = source
 	urlStr := ``
 	switch source {
 	case utils.DATA_SOURCE_THS:
@@ -66,6 +68,8 @@ func RefreshEdbData(edbInfoId, source int, edbCode, startDate string) (resp *mod
 		urlStr = "baiinfo/refresh"
 	case utils.DATA_SOURCE_NATIONAL_STATISTICS:
 		urlStr = "national_statistics/refresh"
+	default:
+		urlStr = data_manage.EdbDataRefreshMethodMap[source] // 没有对应的从edb_source中取
 	}
 	if urlStr == "" {
 		err = fmt.Errorf(fmt.Sprint("source:", source, ";未实现该指标的刷新接口,请联系管理员"))
@@ -127,6 +131,9 @@ func HttpPost(url, postData string, params ...string) ([]byte, error) {
 	req.Header.Set("Content-Type", contentType)
 	req.Header.Set("authorization", utils.MD5(utils.APP_EDB_LIB_NAME_EN+utils.EDB_LIB_Md5_KEY))
 	resp, err := client.Do(req)
+	if err != nil {
+		return nil, err
+	}
 	defer resp.Body.Close()
 	b, err := ioutil.ReadAll(resp.Body)
 	fmt.Println("HttpPost:" + string(b))

+ 32 - 0
services/data/base_from_jiayue.go

@@ -0,0 +1,32 @@
+package data
+
+import (
+	"context"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/utils"
+	"fmt"
+)
+
+// SyncJiaYueNewIndex 同步嘉悦物产增量指标-每分钟
+func SyncJiaYueNewIndex(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			tips := "SyncJiaYueNewIndex-同步嘉悦物产增量指标失败, ErrMsg:\n" + err.Error()
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	param := make(map[string]interface{})
+	uri := "jiayue_index/sync_new_index"
+	res, e := postRefreshEdbData(param, uri)
+	if e != nil {
+		err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
+		return
+	}
+	if res != nil && res.Ret != 200 {
+		err = fmt.Errorf("postRefreshEdbData fail")
+		return
+	}
+	return
+}

+ 69 - 0
services/data/edb_info.go

@@ -7,6 +7,7 @@ import (
 	"eta/eta_task/services/alarm_msg"
 	"eta/eta_task/utils"
 	"fmt"
+	"strconv"
 	"strings"
 	"sync"
 )
@@ -1028,3 +1029,71 @@ func RefreshDataFromEic(wg *sync.WaitGroup) (err error) {
 	}
 	return err
 }
+
+// RefreshDataFromBridge 刷新来自桥接服务的来源
+func RefreshDataFromBridge(wg *sync.WaitGroup) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Info("RefreshDataFromBridge ErrMsg: %s", err.Error())
+			go alarm_msg.SendAlarmMsg("RefreshDataFromBridge ErrMsg: "+err.Error(), 3)
+		}
+		wg.Done()
+	}()
+
+	// 获取来自桥接服务的指标来源
+	cond := ` AND from_bridge = 1`
+	pars := make([]interface{}, 0)
+	sources, e := data_manage.GetEdbSourceItemsByCondition(cond, pars, []string{}, "")
+	if e != nil {
+		err = fmt.Errorf("获取来自桥接服务的数据源失败, err: %s", e.Error())
+		return
+	}
+	sourceArr := make([]string, 0)
+	for _, v := range sources {
+		if v.EdbSourceId <= 0 {
+			continue
+		}
+		sourceArr = append(sourceArr, strconv.Itoa(v.EdbSourceId))
+	}
+	if len(sourceArr) == 0 {
+		utils.FileLog.Info("RefreshDataFromBridge无来源")
+		return
+	}
+
+	// 获取指标
+	edbCond := fmt.Sprintf(` AND source IN (%s)`, utils.GetOrmInReplace(len(sourceArr)))
+	edbPars := make([]interface{}, 0)
+	edbPars = append(edbPars, sourceArr)
+	items, e := data_manage.GetEdbInfoByCondition(edbCond, edbPars, 0)
+	if e != nil {
+		err = fmt.Errorf("GetEdbInfoByCondition err: %s", e.Error())
+		return
+	}
+
+	for _, v := range items {
+		startDate := ""
+		if v.Frequency == "日度" {
+			startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+		} else if v.Frequency == "周度" {
+			startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate)
+		} else if v.Frequency == "月度" {
+			startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "季度" {
+			startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "年度" {
+			startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate)
+		} else {
+			startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+		}
+		resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.EdbCode, startDate)
+		if e != nil {
+			err = fmt.Errorf("RefreshEdbData err: %s", e.Error())
+			return
+		}
+		if resp.Ret != 200 {
+			err = fmt.Errorf("RefreshEdbData Err: %s; ErrMsg: %s", resp.Msg, resp.ErrMsg)
+			return
+		}
+	}
+	return
+}

+ 11 - 1
services/task.go

@@ -29,7 +29,7 @@ func Task() {
 // 生产环境需要走的任务
 func releaseTask() {
 	//同步指标
-	if utils.BusinessCode != "E2023080900" {
+	if utils.BusinessCode != utils.BusinessCodeRelease {
 		syncHzDataIndex := task.NewTask("syncHzDataIndex", "0 10,20,40,50 16,18 * * *", SyncHzDataIndex)
 		task.AddTask("syncHzDataIndex", syncHzDataIndex)
 	}
@@ -87,6 +87,12 @@ func releaseTask() {
 	// 每天清理三个月前的用户操作日志
 	clearAdminOperateLog := task.NewTask("clearAdminOperateLog", "0 20 23 * * *", ClearAdminOperateLog)
 	task.AddTask("定时清理用户操作日志", clearAdminOperateLog)
+
+	// 嘉悦物产-每分钟定时同步增量指标
+	if utils.BusinessCode == utils.BusinessCodeJiaYue {
+		syncJiaYueNewIndex := task.NewTask("syncJiaYueNewIndex", "0 */1 * * * *", data.SyncJiaYueNewIndex)
+		task.AddTask("定时同步嘉悦物产增量指标", syncJiaYueNewIndex)
+	}
 }
 
 func RefreshData(cont context.Context) (err error) {
@@ -127,6 +133,10 @@ func RefreshData(cont context.Context) (err error) {
 	//国家统计局指标
 	go data.RefreshDataFromNationalStatistics(&wg)
 
+	// 来自桥接服务的指标
+	wg.Add(1)
+	go data.RefreshDataFromBridge(&wg)
+
 	wg.Wait()
 	////计算指标
 	data.RefreshDataFromCalculateAll()

+ 6 - 0
utils/constants.go

@@ -121,3 +121,9 @@ const (
 const (
 	CACHE_CREATE_REPORT_IMGPDF_QUEUE = "eta_report:report_img_pdf_queue" // 生成报告长图PDF队列
 )
+
+// 商户号
+const (
+	BusinessCodeRelease = "E2023080900" // 生产环境
+	BusinessCodeJiaYue  = "E2023092201" // 嘉悦物产
+)