Просмотр исходного кода

Merge branch 'feature/eta_2.5.6_pb_data_deal' into debug

kobe6258 1 месяц назад
Родитель
Сommit
fb2001b0ad

+ 6 - 0
global/initDb.go

@@ -47,6 +47,12 @@ func InitDb() {
 	connectDb(utils.MYSQL_URL_DATA, utils.DbNameIndex, newLogger, dbMap, false)
 	// 钢联库
 	connectDb(utils.MYSQL_URL_GL, utils.DbNameGL, newLogger, dbMap, false)
+
+	// AI库
+	if utils.MYSQL_URL_AI != `` {
+		connectDb(utils.MYSQL_URL_AI, utils.DbNameAI, newLogger, dbMap, false)
+	}
+
 	// master库
 	connectDb(utils.MYSQL_URL_ETA, utils.DbNameMaster, newLogger, dbMap, false)
 	// 用户主库

+ 7 - 0
models/data_manage/edb_info.go

@@ -398,6 +398,13 @@ func ModifyEdbUpdateStatus(edbIdList []int, indexCodeList []string, calculateEdb
 	}
 	return
 }
+func StopRefreshCalculateEdbIndex(edbInfoId int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	// 更改指标的更新状态
+	sql := ` UPDATE edb_info SET no_update = 1,set_update_time=? WHERE edb_type =2 AND edb_info_id = ? AND  no_update = 0`
+	err = o.Exec(sql, time.Now(), edbInfoId).Error
+	return
+}
 
 // GetEdbInfoByIdList 根据指标id集合 获取 指标列表
 func GetEdbInfoByIdList(edbInfoIdList []int) (items []*EdbInfo, err error) {

+ 18 - 3
models/data_manage/trade_position_analysis.go

@@ -218,14 +218,14 @@ func DeletePositionTopByDataTimeClassify(exchange string, dataTime string, dealT
 	return
 }
 
-func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string, dealType int) (list []TradePositionTop, err error) {
+func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string, dealType int) (list []*TradePositionTop, err error) {
 	o := global.DbMap[utils.DbNameIndex]
 	sql := "select * from trade_position_" + exchange + "_top WHERE data_time=? and deal_type=?"
 	err = o.Raw(sql, dataTime, dealType).Find(&list).Error
 	return
 }
 
-func GetTradePositionTopByExchangeDataTimeTypeClassify(exchange string, dataTime string, dealType int, classifyNames, classifyTypes []string) (list []TradePositionTop, err error) {
+func GetTradePositionTopByExchangeDataTimeTypeClassify(exchange string, dataTime string, dealType int, classifyNames, classifyTypes []string) (list []*TradePositionTop, err error) {
 	o := global.DbMap[utils.DbNameIndex]
 	sql := `select * from trade_position_` + exchange + `_top WHERE data_time=? and deal_type=? and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)`
 	err = o.Raw(sql, dataTime, dealType, classifyNames, classifyTypes).Find(&list).Error
@@ -267,7 +267,18 @@ func GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err er
 	o := global.DbMap[utils.DbNameIndex]
 	sql := `SELECT DISTINCT data_time FROM base_from_trade_%s_index ORDER BY data_time ASC`
 	sql = fmt.Sprintf(sql, exchange)
-	err = o.Raw(sql).Scan(&dates).Error
+	var originDates []string
+	err = o.Raw(sql).Scan(&originDates).Error
+	if err != nil {
+		return
+	}
+	for _, v := range originDates {
+		if v == "" {
+			continue
+		}
+		v = utils.GormDateStrToDateStr(v)
+		dates = append(dates, v)
+	}
 	return
 }
 
@@ -441,6 +452,10 @@ func GetFirstBaseFromTradeIndexByDate(exchange string) (item *GetFirstBaseFromTr
 	o := global.DbMap[utils.DbNameIndex]
 	sql := "SELECT * FROM base_from_trade_" + exchange + "_index where rank < 50 order by data_time asc"
 	err = o.Raw(sql).First(&item).Error
+	if err != nil {
+		return
+	}
+	item.DataTime = utils.GormDateStrToDateStr(item.DataTime)
 	return
 }
 

+ 14 - 2
models/data_manage/trade_position_analysis_guangzhou.go

@@ -21,7 +21,7 @@ FROM
 WHERE
 	a.data_time between ? and ?
 	and c.base_from_trade_guangzhou_classify_id in (7,8)
-	and c.index_name like '%持单量%'
+	and c.index_name like '%持单量%'
 	and c.index_name not like '%日成交持仓排名%'`
 	err = o.Exec(sql1, now, now, startDate, endDate).Error
 	if err != nil {
@@ -118,7 +118,15 @@ WHERE
 	AND c.index_name NOT LIKE '%日成交持仓排名%' 
 ORDER BY
 	a.data_time asc`
-	err = o.Raw(sql).Scan(&dates).Error
+	var originDates []string
+	err = o.Raw(sql).Scan(&originDates).Error
+	if err != nil {
+		return
+	}
+	for _, v := range originDates {
+		v = utils.GormDateStrToDateStr(v)
+		dates = append(dates, v)
+	}
 	return
 }
 
@@ -176,6 +184,10 @@ WHERE
 ORDER BY
 	a.data_time asc`
 	err = o.Raw(sql).First(&item).Error
+	if err != nil {
+		return
+	}
+	item.DataTime = utils.GormDateStrToDateStr(item.DataTime)
 	return
 }
 

+ 130 - 0
models/rag/wechat_platform.go

@@ -0,0 +1,130 @@
+package rag
+
+import (
+	"database/sql"
+	"eta/eta_task/global"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+type WechatPlatform struct {
+	WechatPlatformId int       `gorm:"column:wechat_platform_id;type:int(10) UNSIGNED;primaryKey;not null;" description:"wechat_platform_id"`
+	FakeId           string    `gorm:"column:fake_id;type:varchar(255);comment:公众号唯一id;" description:"fake_id"`                             // 公众号唯一id
+	Nickname         string    `gorm:"column:nickname;type:varchar(255);comment:公众号名称;" description:"nickname"`                             // 公众号名称
+	Alias            string    `gorm:"column:alias;type:varchar(255);comment:别名;" description:"alias"`                                      // 别名
+	RoundHeadImg     string    `gorm:"column:round_head_img;type:varchar(255);comment:头像;" description:"round_head_img"`                    // 头像
+	ServiceType      int       `gorm:"column:service_type;type:int(11);comment:类型;default:0;" description:"service_type"`                   // 类型
+	Signature        string    `gorm:"column:signature;type:varchar(255);comment:签名;" description:"signature"`                              // 签名
+	Verified         int       `gorm:"column:verified;type:int(11);comment:是否认证,0:未认证,1:已认证;这个我不确定,再核实下;default:0;" description:"verified"` // 是否认证,0:未认证,1:已认证;这个我不确定,再核实下
+	ArticleLink      string    `gorm:"column:article_link;type:varchar(255);comment:添加公众时的文章链接;" description:"article_link"`                // 添加公众时的文章链接
+	Enabled          int       `gorm:"column:enabled;type:tinyint(9);comment:是否启用,0:禁用,1:启用;default:1;" description:"enabled"`              // 是否启用,0:禁用,1:启用
+	SysUserId        int       `gorm:"column:sys_user_id;type:int(9) UNSIGNED;comment:用户id;default:0;" description:"sys_user_id"`           // 用户id
+	ModifyTime       time.Time `gorm:"column:modify_time;type:datetime;comment:最后一次修改时间;default:NULL;" description:"modify_time"`           // 最后一次修改时间
+	CreateTime       time.Time `gorm:"column:create_time;type:datetime;comment:添加时间;default:NULL;" description:"create_time"`               // 添加时间
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *WechatPlatform) TableName() string {
+	return "wechat_platform"
+}
+
+// WechatPlatformColumns get sql column name.获取数据库列名
+var WechatPlatformColumns = struct {
+	WechatPlatformID string
+	FakeID           string
+	Nickname         string
+	Alias            string
+	RoundHeadImg     string
+	ServiceType      string
+	Signature        string
+	Verified         string
+	ArticleLink      string
+	Enabled          string
+	SysUserID        string
+	ModifyTime       string
+	CreateTime       string
+}{
+	WechatPlatformID: "wechat_platform_id",
+	FakeID:           "fake_id",
+	Nickname:         "nickname",
+	Alias:            "alias",
+	RoundHeadImg:     "round_head_img",
+	ServiceType:      "service_type",
+	Signature:        "signature",
+	Verified:         "verified",
+	ArticleLink:      "article_link",
+	Enabled:          "enabled",
+	SysUserID:        "sys_user_id",
+	ModifyTime:       "modify_time",
+	CreateTime:       "create_time",
+}
+
+func (m *WechatPlatform) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *WechatPlatform) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *WechatPlatform) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *WechatPlatform) GetById(wechatPlatformId int) (item *WechatPlatform, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", WechatPlatformColumns.WechatPlatformID), wechatPlatformId).First(&item).Error
+
+	return
+}
+
+func (m *WechatPlatform) GetByCondition(condition string, pars []interface{}) (item *WechatPlatform, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *WechatPlatform) GetListByCondition(condition string, pars []interface{}, startSize, pageSize int) (items []*WechatPlatform, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s LIMIT ?,?`, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *WechatPlatform) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+func (m *WechatPlatform) GetPageListByCondition(condition string, pars []interface{}, startSize, pageSize int) (total int, items []*WechatPlatform, err error) {
+
+	total, err = m.GetCountByCondition(condition, pars)
+	if err != nil {
+		return
+	}
+	if total > 0 {
+		items, err = m.GetListByCondition(condition, pars, startSize, pageSize)
+	}
+
+	return
+}
+
+func (m *WechatPlatform) GetByFakeID(fakeId string) (item *WechatPlatform, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", WechatPlatformColumns.FakeID), fakeId).First(&item).Error
+
+	return
+}

+ 16 - 1
services/data/edb_info.go

@@ -297,7 +297,22 @@ func RefreshDataFromCalculateAll() (err error) {
 			continue
 		}
 		fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source)
-
+		//对依赖指标是基础指标的且基础指标停用,停用当前计算指标,因为item是按照edb_Info_id升序,所以默认层级关系是由低而上,所以不需要考虑嵌套,只要考虑当前的下一级依赖即可
+		mappings, err := data_manage.GetEdbInfoCalculateMappingListByEdbInfoId(v.EdbInfoId)
+		if err != nil {
+			utils.FileLog.Error("[获取计算指标的依赖指标失败,不刷新指标,CODE:" + v.EdbCode + "],err:" + err.Error())
+			continue
+		}
+		for _, mapping := range mappings {
+			if mapping.FromEdbInfoType == 0 && mapping.FromEdbType == 1 && mapping.FromEdbInfoId > 0 && mapping.NoUpdate == 1 {
+				utils.FileLog.Info("基础指标停用,基础指标ID:" + mapping.FromEdbCode + ",停用当前计算指标,CODE:" + v.EdbCode)
+				stopErr := data_manage.StopRefreshCalculateEdbIndex(v.EdbInfoId)
+				if stopErr != nil {
+					utils.FileLog.Error(fmt.Sprintf("停用计算指标失败,ID:%d;指标编码:%s;err:%s", mapping.EdbInfoId, mapping.EdbCode, stopErr.Error()))
+				}
+				continue
+			}
+		}
 		fmt.Println("RefreshEdbCalculateData", v.EdbInfoId, v.EdbCode, startDate)
 		result, tmpErr := RefreshEdbCalculateData(v.EdbInfoId, v.EdbCode, startDate)
 		if tmpErr != nil {

+ 62 - 5
services/edb_refresh.go

@@ -362,6 +362,61 @@ func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb
 	fmt.Println("Get ConfigRefreshData End")
 	return
 }
+func needForUpdate(date time.Time, frequency string) bool {
+	//如果当前已经更新最新的数据则停止更新
+	today := time.Now()
+	switch frequency {
+	case "日度":
+		return !date.Equal(today)
+	case "周度":
+		// 获取本周的开始日期(周一)
+		startOfWeek := today.AddDate(0, 0, int(time.Monday-today.Weekday()))
+		return date.Before(startOfWeek)
+	case "旬度":
+		day := today.Day()
+		var beginOfTenDays time.Time
+		if day <= 10 {
+			beginOfTenDays = time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.Local)
+			return date.Before(beginOfTenDays)
+		} else if day <= 20 {
+			beginOfTenDays = time.Date(today.Year(), today.Month(), 11, 0, 0, 0, 0, time.Local)
+			return date.Before(beginOfTenDays)
+		} else {
+			beginOfTenDays = time.Date(today.Year(), today.Month(), 21, 0, 0, 0, 0, time.Local)
+			return date.Before(beginOfTenDays)
+		}
+	case "月度":
+		beginOfMonth := time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.Local)
+		return date.Before(beginOfMonth)
+	case "季度":
+		month := today.Month()
+		var beginOfQuarter time.Time
+		if month <= 3 {
+			beginOfQuarter = time.Date(today.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		} else if month <= 6 {
+			beginOfQuarter = time.Date(today.Year(), 4, 1, 0, 0, 0, 0, time.Local)
+		} else if month <= 9 {
+			beginOfQuarter = time.Date(today.Year(), 7, 1, 0, 0, 0, 0, time.Local)
+		} else {
+			beginOfQuarter = time.Date(today.Year(), 10, 1, 0, 0, 0, 0, time.Local)
+		}
+		return date.Before(beginOfQuarter)
+	case "半年度":
+		month := today.Month()
+		var beginOfHalfYear time.Time
+		if month <= 6 {
+			beginOfHalfYear = time.Date(today.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		} else {
+			beginOfHalfYear = time.Date(today.Year(), 7, 1, 0, 0, 0, 0, time.Local)
+		}
+		return date.Before(beginOfHalfYear)
+	case "年度":
+		startOfYear := time.Date(today.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		return date.Before(startOfYear)
+	default:
+		return true
+	}
+}
 
 // BaseRefreshData
 // @Description: 基础数据刷新
@@ -395,6 +450,9 @@ func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_ref
 			if v.NoUpdate == 1 {
 				continue
 			}
+			if !needForUpdate(v.EndDate, v.Frequency) {
+				continue
+			}
 			if v.DataRefreshNum > 0 {
 				dataRefreshNum = v.DataRefreshNum
 			}
@@ -422,7 +480,6 @@ func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_ref
 				}
 			}
 			fmt.Println(startDate)
-
 			// 数据更新
 			resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
 			if tmpErr != nil {
@@ -678,7 +735,7 @@ func DisableEdbRefresh(cont context.Context) (err error) {
 			utils.Rc.Delete(cacheKey)
 		}
 		if err != nil {
-			tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
+			tips := "DisableEdbRefresh-钢联化工、wind、彭博、彭博财务、同花顺指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
 			utils.FileLog.Info(tips)
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
@@ -787,12 +844,12 @@ func DisableEdbRefresh(cont context.Context) (err error) {
 	}
 
 	if rule.EdbStopDays > 0 {
-		// 查询钢联和wind来源的指标
+		// 查询钢联和wind、彭博、彭博财务、同花顺来源的指标
 		edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
 
-		condition := ` AND no_update=0 AND source in (?,?) AND ((create_time < ? and set_update_time is null) or set_update_time < ? )`
+		condition := ` AND no_update=0 AND source in (?,?,?,?,?) AND ((create_time < ? and set_update_time is null) or set_update_time < ? )`
 		var pars []interface{}
-		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate, edbEndDate)
+		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, utils.DATA_SOURCE_PB_FINANCE, utils.DATA_SOURCE_PB, utils.DATA_SOURCE_THS, edbEndDate, edbEndDate)
 		// 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
 		totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
 		if e != nil {

+ 10 - 3
services/task.go

@@ -60,6 +60,13 @@ func releaseTask() {
 		syncRankingFromDalian := task.NewTask("syncRankingFromDalian", "0 30,40,50 16,18 * * *", SyncRankingFromDalian)
 		task.AddTask("syncRankingFromDalian", syncRankingFromDalian)
 	}
+
+	if utils.BusinessCode == utils.BusinessCodeRelease {
+		// 定时刷新公众号文章
+		refreshWechatPlatform := task.NewTask("refreshWechatPlatform", "0 0 14 * * *", RefreshWechatPlatform)
+		task.AddTask("定时刷新公众号文章", refreshWechatPlatform)
+	}
+
 	// 定时统计交易所的持仓分析数据
 	initPositionTask := task.NewTask("initPositionTask", "0 20,40 16-19 * * *", data.InitPositionTask)
 	task.AddTask("initPositionTask", initPositionTask)
@@ -165,14 +172,14 @@ func releaseTask() {
 
 func RefreshData(cont context.Context) (err error) {
 	wg := sync.WaitGroup{}
-	wg.Add(14)
+	wg.Add(12)
 	//hour := time.Now().Hour()
 	//if hour != 0 {
 	//}
 	//彭博
-	go data.RefreshDataFromPb(&wg)
+	//go data.RefreshDataFromPb(&wg)
 	//彭博财务
-	go data.RefreshDataFromPbFinance(&wg)
+	//go data.RefreshDataFromPbFinance(&wg)
 	//手工数据
 	go data.RefreshDataFromManual(&wg)
 	//隆众数据

+ 58 - 0
services/wechat_platform.go

@@ -0,0 +1,58 @@
+package services
+
+import (
+	"context"
+	"eta/eta_task/models/rag"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+type WechatArticleOp struct {
+	Source           string
+	WechatPlatformId int
+}
+
+// AddWechatArticleOpToCache
+// @Description: 将公众号文章操作加入缓存
+// @param wechatPlatformId
+// @param source
+// @return bool
+func AddWechatArticleOpToCache(wechatPlatformId int, source string) bool {
+	record := new(WechatArticleOp)
+	record.Source = source
+	record.WechatPlatformId = wechatPlatformId
+	if utils.Re == nil {
+		err := utils.Rc.LPush(utils.CACHE_WECHAT_PLATFORM_ARTICLE, record)
+
+		utils.FileLog.Info(fmt.Sprintf("将公众号文章操作 加入缓存 AddWechatArticleOpToCache LPush: 操作类型:%s,公众号id:%d", source, wechatPlatformId))
+		if err != nil {
+			fmt.Println("AddWechatArticleOpToCache LPush Err:" + err.Error())
+		}
+		return true
+	}
+	return false
+}
+
+// RefreshWechatPlatform
+// @Description: 刷新公众号
+// @author: Roc
+// @datetime 2025-03-14 18:46:05
+// @param cont context.Context
+// @return err error
+func RefreshWechatPlatform(cont context.Context) (err error) {
+	utils.FileLog.Debug("RefreshWechatPlatform:", time.Now().Format(utils.FormatDateTime))
+	obj := new(rag.WechatPlatform)
+	platformList, tmpErr := obj.GetListByCondition(` AND enabled = 1 `, []interface{}{}, 0, 100000)
+	if tmpErr != nil {
+		err = tmpErr
+		return
+	}
+
+	for _, v := range platformList {
+		AddWechatArticleOpToCache(v.WechatPlatformId, "refresh")
+		time.Sleep(1 * time.Minute)
+	}
+
+	return
+}

+ 3 - 0
utils/config.go

@@ -15,6 +15,7 @@ var (
 	MYSQL_URL_EDB    string
 	MYSQL_URL_DATA   string
 	MYSQL_URL_GL     string
+	MYSQL_URL_AI     string
 	MYSQL_URL_ETA    string
 	MYSQL_WEEKLY_URL string //用户主库
 	DbDriverName     string // 数据库驱动名称
@@ -196,6 +197,8 @@ func init() {
 	MYSQL_URL_DATA = config["mysql_url_data"]
 	MYSQL_URL_GL = config["mysql_url_gl"]
 	MYSQL_URL_ETA = config["mysql_url_eta"]
+	MYSQL_URL_AI = config["mysql_url_ai"]
+
 	// 用户主库
 	MYSQL_WEEKLY_URL = config["mysql_url_weekly"]
 

+ 4 - 0
utils/constants.go

@@ -178,6 +178,10 @@ const (
 // 已经处理了的变更id
 const CACHE_EDB_UPDATE_LOG_ID = "eta:edb_update_log:id"
 
+const (
+	CACHE_WECHAT_PLATFORM_ARTICLE = "wechat_platform:article:op" //微信文章处理
+)
+
 // 指标引用对象
 const (
 	EDB_RELATION_CHART       = 1 // 图表