Pārlūkot izejas kodu

Merge branch 'master' into feature/eta2.5.9_inspection_stat

xyxie 1 dienu atpakaļ
vecāks
revīzija
25a8e262da

+ 6 - 0
models/data_manage/base_from_trade_guangzhou.go

@@ -160,6 +160,12 @@ func GetAllBaseFromTradeGuangzhouDataList(startDate string) (list []*BaseFromTra
 	return
 }
 
+// 设置after find函数
+func (m *BaseFromTradeGuangzhouData) AfterFind(db *gorm.DB) (err error) {
+	m.DataTime = utils.GormDateStrToDateStr(m.DataTime)
+	return
+}
+
 type BaseFromTradeGuangzhouDataResp struct {
 	Ret     int
 	Msg     string

+ 7 - 7
models/data_manage/base_from_usda_fas.go

@@ -17,8 +17,8 @@ type BaseFromUsdaFasIndex struct {
 	Frequency              string
 	Unit                   string
 	Sort                   int
-	StartDate              string `description:"开始日期"`
-	EndDate                string `description:"结束日期"`
+	StartDate              time.Time `description:"开始日期"`
+	EndDate                time.Time `description:"结束日期"`
 	EndValue               float64
 	Country                string `description:"国家"`
 	Commodity              string `description:"属性"`
@@ -26,11 +26,11 @@ type BaseFromUsdaFasIndex struct {
 	ModifyTime             time.Time
 }
 
-func (m *BaseFromUsdaFasIndex) AfterFind(db *gorm.DB) (err error) {
-	m.StartDate = utils.GormDateStrToDateStr(m.StartDate)
-	m.EndDate = utils.GormDateStrToDateStr(m.EndDate)
-	return
-}
+// func (m *BaseFromUsdaFasIndex) AfterFind(db *gorm.DB) (err error) {
+// 	m.StartDate = utils.GormDateStrToDateStr(m.StartDate)
+// 	m.EndDate = utils.GormDateStrToDateStr(m.EndDate)
+// 	return
+// }
 
 // BaseFromUsdaFasClassify UsdaFas原始数据分类表
 type BaseFromUsdaFasClassify struct {

+ 10 - 2
models/data_manage/edb_info.go

@@ -373,8 +373,8 @@ func ModifyEdbUpdateStatus(edbIdList []int, indexCodeList []string, calculateEdb
 	}()
 
 	// 更改指标的更新状态
-	sql := ` UPDATE edb_info SET no_update = 1 WHERE source in (?, ?) AND edb_info_id IN (` + utils.GetOrmInReplace(idNum) + `) AND  no_update = 0`
-	err = tx.Exec(sql, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbIdList).Error
+	sql := ` UPDATE edb_info SET no_update = 1 WHERE source in (?,?,?,?,?) AND edb_info_id IN (` + utils.GetOrmInReplace(idNum) + `) AND  no_update = 0`
+	err = tx.Exec(sql, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, utils.DATA_SOURCE_PB_FINANCE, utils.DATA_SOURCE_PB, utils.DATA_SOURCE_THS, edbIdList).Error
 	if err != nil {
 		return
 	}
@@ -401,6 +401,14 @@ func ModifyEdbUpdateStatus(edbIdList []int, indexCodeList []string, calculateEdb
 	return
 }
 
+//func StopRefreshCalculateEdbIndex(edbInfoId int) (err error) {
+//	o := global.DbMap[utils.DbNameIndex]
+//	// 更改指标的更新状态
+//	sql := ` UPDATE edb_info SET no_update = 1,set_update_time=? WHERE edb_type =2 AND edb_info_id = ? AND  no_update = 0`
+//	err = o.Exec(sql, time.Now(), edbInfoId).Error
+//	return
+//}
+
 // GetEdbInfoByIdList 根据指标id集合 获取 指标列表
 func GetEdbInfoByIdList(edbInfoIdList []int) (items []*EdbInfo, err error) {
 	num := len(edbInfoIdList)

+ 164 - 0
models/rag/ai_task.go

@@ -0,0 +1,164 @@
+package rag
+
+import (
+	"database/sql"
+	"eta/eta_task/global"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+// AiTask ai这边的任务表
+type AiTask struct {
+	AiTaskID                int       `gorm:"primaryKey;column:ai_task_id" description:"-"`
+	TaskName                string    `gorm:"column:task_name" description:"任务名称"`
+	TaskType                string    `gorm:"column:task_type" description:"任务类型"`
+	Status                  string    `gorm:"column:status" description:"任务状态"`
+	StartTime               time.Time `gorm:"column:start_time" description:"开始时间"`
+	EndTime                 time.Time `gorm:"column:end_time" description:"结束时间"`
+	CreateTime              time.Time `gorm:"column:create_time" description:"创建时间"`
+	UpdateTime              time.Time `gorm:"column:update_time" description:"更新时间"`
+	Parameters              string    `gorm:"column:parameters" description:"执行参数"`
+	Logs                    string    `gorm:"column:logs" description:"日志"`
+	Errormessage            string    `gorm:"column:ErrorMessage" description:"错误信息"`
+	Priority                int       `gorm:"column:priority" description:"优先级"`
+	RetryCount              int       `gorm:"column:retry_count" description:"重试次数"`
+	EstimatedCompletionTime time.Time `gorm:"column:estimated_completion_time" description:"预计完成时间"`
+	ActualCompletitonTime   time.Time `gorm:"column:actual_completiton_time" description:"实际完成时间"`
+	Remark                  string    `gorm:"column:remark" description:"备注"`
+	SysUserID               int       `gorm:"column:sys_user_id" description:"任务创建人id"`
+	SysUserRealName         string    `gorm:"column:sys_user_real_name" description:"任务创建人名称"`
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *AiTask) TableName() string {
+	return "ai_task"
+}
+
+// AiTaskColumns get sql column name.获取数据库列名
+var AiTaskColumns = struct {
+	AiTaskID                string
+	TaskName                string
+	TaskType                string
+	Status                  string
+	StartTime               string
+	EndTime                 string
+	CreateTime              string
+	UpdateTime              string
+	Parameters              string
+	Logs                    string
+	Errormessage            string
+	Priority                string
+	RetryCount              string
+	EstimatedCompletionTime string
+	ActualCompletitonTime   string
+	Remark                  string
+	SysUserID               string
+	SysUserRealName         string
+}{
+	AiTaskID:                "ai_task_id",
+	TaskName:                "task_name",
+	TaskType:                "task_type",
+	Status:                  "status",
+	StartTime:               "start_time",
+	EndTime:                 "end_time",
+	CreateTime:              "create_time",
+	UpdateTime:              "update_time",
+	Parameters:              "parameters",
+	Logs:                    "logs",
+	Errormessage:            "ErrorMessage",
+	Priority:                "priority",
+	RetryCount:              "retry_count",
+	EstimatedCompletionTime: "estimated_completion_time",
+	ActualCompletitonTime:   "actual_completiton_time",
+	Remark:                  "remark",
+	SysUserID:               "sys_user_id",
+	SysUserRealName:         "sys_user_real_name",
+}
+
+func (m *AiTask) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *AiTask) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *AiTask) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *AiTask) GetByID(id int) (item *AiTask, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", AiTaskColumns.AiTaskID), id).First(&item).Error
+
+	return
+}
+
+func (m *AiTask) GetByCondition(condition string, pars []interface{}) (item *AiTask, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *AiTask) GetListByCondition(field, condition string, pars []interface{}, startSize, pageSize int) (items []*AiTask, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *AiTask) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+// AddAiTask
+// @Description: 添加Ai模块的任务
+// @author: Roc
+// @datetime 2025-04-16 16:55:36
+// @param aiTask *AiTask
+// @param aiRecordList []*AiTaskRecord
+// @return err error
+func AddAiTask(aiTask *AiTask, aiRecordList []*AiTaskRecord) (err error) {
+	to := global.DbMap[utils.DbNameAI].Begin()
+	defer func() {
+		if err != nil {
+			_ = to.Rollback()
+		} else {
+			_ = to.Commit()
+		}
+	}()
+
+	err = to.Create(aiTask).Error
+	if err != nil {
+		return
+	}
+
+	for _, aiTaskRecord := range aiRecordList {
+		aiTaskRecord.AiTaskID = aiTask.AiTaskID
+	}
+
+	err = to.CreateInBatches(aiRecordList, utils.MultiAddNum).Error
+	if err != nil {
+		return
+	}
+
+	return
+}

+ 115 - 0
models/rag/ai_task_record.go

@@ -0,0 +1,115 @@
+package rag
+
+import (
+	"database/sql"
+	"eta/eta_task/global"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+// AiTaskRecord AI任务的子记录
+type AiTaskRecord struct {
+	AiTaskRecordID int       `gorm:"primaryKey;column:ai_task_record_id" json:"-"` // 任务记录id
+	AiTaskID       int       `gorm:"column:ai_task_id" json:"aiTaskId"`            // 任务id
+	Parameters     string    `gorm:"column:parameters" json:"parameters"`          // 子任务参数
+	Status         string    `gorm:"column:status" json:"status"`                  // 状态
+	Remark         string    `gorm:"column:remark" json:"remark"`                  // 备注
+	ModifyTime     time.Time `gorm:"column:modify_time" json:"modifyTime"`         // 最后一次修改时间
+	CreateTime     time.Time `gorm:"column:create_time" json:"createTime"`         // 任务创建时间
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *AiTaskRecord) TableName() string {
+	return "ai_task_record"
+}
+
+// AiTaskRecordColumns get sql column name.获取数据库列名
+var AiTaskRecordColumns = struct {
+	AiTaskRecordID string
+	AiTaskID       string
+	Parameters     string
+	Status         string
+	Remark         string
+	ModifyTime     string
+	CreateTime     string
+}{
+	AiTaskRecordID: "ai_task_record_id",
+	AiTaskID:       "ai_task_id",
+	Parameters:     "parameters",
+	Status:         "status",
+	Remark:         "remark",
+	ModifyTime:     "modify_time",
+	CreateTime:     "create_time",
+}
+
+func (m *AiTaskRecord) Create() (err error) {
+	err = global.DbMap[utils.DbNameAI].Create(&m).Error
+
+	return
+}
+
+func (m *AiTaskRecord) Update(updateCols []string) (err error) {
+	err = global.DbMap[utils.DbNameAI].Select(updateCols).Updates(&m).Error
+
+	return
+}
+
+func (m *AiTaskRecord) Del() (err error) {
+	err = global.DbMap[utils.DbNameAI].Delete(&m).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetByID(id int) (item *AiTaskRecord, err error) {
+	err = global.DbMap[utils.DbNameAI].Where(fmt.Sprintf("%s = ?", AiTaskRecordColumns.AiTaskRecordID), id).First(&item).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetByCondition(condition string, pars []interface{}) (item *AiTaskRecord, err error) {
+	sqlStr := fmt.Sprintf(`SELECT * FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).First(&item).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetAllListByCondition(field, condition string, pars []interface{}) (items []*AiTaskRecord, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_record_id desc `, field, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetListByCondition(field, condition string, pars []interface{}, startSize, pageSize int) (items []*AiTaskRecord, err error) {
+	if field == "" {
+		field = "*"
+	}
+	sqlStr := fmt.Sprintf(`SELECT %s FROM %s WHERE 1=1 %s order by ai_task_record_id desc LIMIT ?,?`, field, m.TableName(), condition)
+	pars = append(pars, startSize, pageSize)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Find(&items).Error
+
+	return
+}
+
+func (m *AiTaskRecord) GetCountByCondition(condition string, pars []interface{}) (total int, err error) {
+	var intNull sql.NullInt64
+	sqlStr := fmt.Sprintf(`SELECT COUNT(1) total FROM %s WHERE 1=1 %s`, m.TableName(), condition)
+	err = global.DbMap[utils.DbNameAI].Raw(sqlStr, pars...).Scan(&intNull).Error
+	if err == nil && intNull.Valid {
+		total = int(intNull.Int64)
+	}
+
+	return
+}
+
+// QuestionGenerateAbstractParam
+// @Description:
+type QuestionGenerateAbstractParam struct {
+	QuestionId  int    `json:"questionId"`
+	ArticleType string `json:"articleType"`
+	ArticleId   int    `json:"articleId"`
+}

+ 2 - 2
services/changes_visitors_covid.go

@@ -24,8 +24,8 @@ func SyncChangesVisitorsCovid() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncChangesVisitorsCovid", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.ChangesVisitorsCovidIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)

+ 2 - 2
services/com_trade.go

@@ -124,8 +124,8 @@ func SyncComTradeData() (err error) {
 
 		var result string
 		result, err = HttpPost("SyncComTradeData", method, data)
-		utils.FileLog.Info(result)
-		fmt.Println(result)
+		//utils.FileLog.Info(result)
+		//fmt.Println(result)
 
 		respObj := new(data_manage.ComTradeIndexDataResp)
 		err = json.Unmarshal([]byte(result), &respObj)

+ 16 - 1
services/data/edb_info.go

@@ -297,7 +297,22 @@ func RefreshDataFromCalculateAll() (err error) {
 			continue
 		}
 		fmt.Println(v.EdbInfoId, v.EdbCode, v.EdbName, v.SourceName, source)
-
+		////对依赖指标是基础指标的且基础指标停用,停用当前计算指标,因为item是按照edb_Info_id升序,所以默认层级关系是由低而上,所以不需要考虑嵌套,只要考虑当前的下一级依赖即可
+		//mappings, err := data_manage.GetEdbInfoCalculateMappingListByEdbInfoId(v.EdbInfoId)
+		//if err != nil {
+		//	utils.FileLog.Error("[获取计算指标的依赖指标失败,不刷新指标,CODE:" + v.EdbCode + "],err:" + err.Error())
+		//	continue
+		//}
+		//for _, mapping := range mappings {
+		//	if mapping.FromEdbInfoId > 0 && mapping.NoUpdate == 1 {
+		//		utils.FileLog.Info("依赖指标停用,依赖指标ID:" + mapping.FromEdbCode + ",停用当前计算指标,CODE:" + v.EdbCode)
+		//		stopErr := data_manage.StopRefreshCalculateEdbIndex(v.EdbInfoId)
+		//		if stopErr != nil {
+		//			utils.FileLog.Error(fmt.Sprintf("停用计算指标失败,ID:%d;指标编码:%s;err:%s", mapping.EdbInfoId, mapping.EdbCode, stopErr.Error()))
+		//		}
+		//		continue
+		//	}
+		//}
 		fmt.Println("RefreshEdbCalculateData", v.EdbInfoId, v.EdbCode, startDate)
 		result, tmpErr := RefreshEdbCalculateData(v.EdbInfoId, v.EdbCode, startDate)
 		if tmpErr != nil {

+ 62 - 5
services/edb_refresh.go

@@ -366,6 +366,61 @@ func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb
 	fmt.Println("Get ConfigRefreshData End")
 	return
 }
+func needForUpdate(date time.Time, frequency string) bool {
+	//如果当前已经更新最新的数据则停止更新
+	today := time.Now()
+	switch frequency {
+	case "日度":
+		return !date.Equal(today)
+	case "周度":
+		// 获取本周的开始日期(周一)
+		startOfWeek := today.AddDate(0, 0, int(time.Monday-today.Weekday()))
+		return date.Before(startOfWeek)
+	case "旬度":
+		day := today.Day()
+		var beginOfTenDays time.Time
+		if day <= 10 {
+			beginOfTenDays = time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.Local)
+			return date.Before(beginOfTenDays)
+		} else if day <= 20 {
+			beginOfTenDays = time.Date(today.Year(), today.Month(), 11, 0, 0, 0, 0, time.Local)
+			return date.Before(beginOfTenDays)
+		} else {
+			beginOfTenDays = time.Date(today.Year(), today.Month(), 21, 0, 0, 0, 0, time.Local)
+			return date.Before(beginOfTenDays)
+		}
+	case "月度":
+		beginOfMonth := time.Date(today.Year(), today.Month(), 1, 0, 0, 0, 0, time.Local)
+		return date.Before(beginOfMonth)
+	case "季度":
+		month := today.Month()
+		var beginOfQuarter time.Time
+		if month <= 3 {
+			beginOfQuarter = time.Date(today.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		} else if month <= 6 {
+			beginOfQuarter = time.Date(today.Year(), 4, 1, 0, 0, 0, 0, time.Local)
+		} else if month <= 9 {
+			beginOfQuarter = time.Date(today.Year(), 7, 1, 0, 0, 0, 0, time.Local)
+		} else {
+			beginOfQuarter = time.Date(today.Year(), 10, 1, 0, 0, 0, 0, time.Local)
+		}
+		return date.Before(beginOfQuarter)
+	case "半年度":
+		month := today.Month()
+		var beginOfHalfYear time.Time
+		if month <= 6 {
+			beginOfHalfYear = time.Date(today.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		} else {
+			beginOfHalfYear = time.Date(today.Year(), 7, 1, 0, 0, 0, 0, time.Local)
+		}
+		return date.Before(beginOfHalfYear)
+	case "年度":
+		startOfYear := time.Date(today.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		return date.Before(startOfYear)
+	default:
+		return true
+	}
+}
 
 // BaseRefreshData
 // @Description: 基础数据刷新
@@ -399,6 +454,9 @@ func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_ref
 			if v.NoUpdate == 1 {
 				continue
 			}
+			if !needForUpdate(v.EndDate, v.Frequency) {
+				continue
+			}
 			if v.DataRefreshNum > 0 {
 				dataRefreshNum = v.DataRefreshNum
 			}
@@ -426,7 +484,6 @@ func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_ref
 				}
 			}
 			fmt.Println(startDate)
-
 			// 数据更新
 			resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
 			if tmpErr != nil {
@@ -682,7 +739,7 @@ func DisableEdbRefresh(cont context.Context) (err error) {
 			utils.Rc.Delete(cacheKey)
 		}
 		if err != nil {
-			tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
+			tips := "DisableEdbRefresh-钢联化工、wind、彭博、彭博财务、同花顺指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
 			utils.FileLog.Info(tips)
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
@@ -791,12 +848,12 @@ func DisableEdbRefresh(cont context.Context) (err error) {
 	}
 
 	if rule.EdbStopDays > 0 {
-		// 查询钢联和wind来源的指标
+		// 查询钢联和wind、彭博、彭博财务、同花顺来源的指标
 		edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
 
-		condition := ` AND no_update=0 AND source in (?,?) AND ((create_time < ? and set_update_time is null) or set_update_time < ? )`
+		condition := ` AND no_update=0 AND source in (?,?,?,?,?) AND ((create_time < ? and set_update_time is null) or set_update_time < ? )`
 		var pars []interface{}
-		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate, edbEndDate)
+		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, utils.DATA_SOURCE_PB_FINANCE, utils.DATA_SOURCE_PB, utils.DATA_SOURCE_THS, edbEndDate, edbEndDate)
 		// 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
 		totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
 		if e != nil {

+ 2 - 2
services/eia_steo.go

@@ -92,8 +92,8 @@ func SyncEiaSteoIndexData() (err error) {
 			utils.FileLog.Info("HttpPost err:", err)
 			return
 		}
-		utils.FileLog.Info(result)
-		fmt.Println(result)
+		//utils.FileLog.Info(result)
+		//fmt.Println(result)
 
 		respObj := new(data_manage.EiaSteoIndexDataResp)
 		err = json.Unmarshal([]byte(result), &respObj)

+ 2 - 2
services/eic.go

@@ -24,8 +24,8 @@ func SyncRankingFromEic() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncRankingFromEic", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.EicIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)

+ 6 - 6
services/gpr_risk.go

@@ -25,8 +25,8 @@ func SyncBaseFromGprRisk() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncRankingFromGprRisk", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.GprRiskIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -57,8 +57,8 @@ func SyncBaseFromGprRiskClassify() (err error) {
 	if err != nil {
 		fmt.Println("SyncBaseFromGprRiskClassify HttpPost Err:", err.Error())
 	}
-	utils.FileLog.Info(result)
-	fmt.Println("SyncBaseFromGprRiskClassify result:", result)
+	//utils.FileLog.Info(result)
+	//fmt.Println("SyncBaseFromGprRiskClassify result:", result)
 
 	respObj := new(data_manage.GprRiskClassifyResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -124,8 +124,8 @@ func SyncBaseFromGprRiskData() (err error) {
 
 		var result string
 		result, err = HttpPost("SyncBaseFromGprRiskData", method, data)
-		utils.FileLog.Info(result)
-		fmt.Println(result)
+		//utils.FileLog.Info(result)
+		//fmt.Println(result)
 
 		respObj := new(data_manage.BaseFromGprRiskDataResp)
 		err = json.Unmarshal([]byte(result), &respObj)

+ 2 - 2
services/icpi.go

@@ -25,8 +25,8 @@ func SyncBaseFromIcpi() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncRankingFromIcpi", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.IcpiIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)

+ 87 - 58
services/sync_hz_data.go

@@ -46,135 +46,172 @@ func SyncHzDataIndex(cont context.Context) (err error) {
 	//}
 
 	//欧洲天然气
-	err = SyncRankingFromEic()
-	if err != nil {
-		fmt.Println("SyncRankingFromEic Err:" + err.Error())
-		return
-	}
+	go func() {
+		err = SyncRankingFromEic()
+		if err != nil {
+			utils.FileLog.Info("SyncRankingFromEic Err:" + err.Error())
+			fmt.Println("SyncRankingFromEic Err:" + err.Error())
+			return
+		}
+	}()
 
 	//谷歌出行数据
-	err = SyncChangesVisitorsCovid()
-	if err != nil {
-		fmt.Println("SyncChangesVisitorsCovid Err:" + err.Error())
-		return
-	}
+	go func() {
+		err = SyncChangesVisitorsCovid()
+		if err != nil {
+			utils.FileLog.Info("SyncChangesVisitorsCovid Err:" + err.Error())
+			fmt.Println("SyncChangesVisitorsCovid Err:" + err.Error())
+			return
+		}
+	}()
 
 	// EiaSteo
-	err = SyncEiaSteoClassify()
-	if err != nil {
-		fmt.Println("SyncEiaSteoClassify Err:" + err.Error())
-		return
-	}
-	err = SyncEiaSteoIndex()
-	if err != nil {
-		fmt.Println("SyncEiaSteoIndex Err:" + err.Error())
-		return
-	}
+	go func() {
+		err = SyncEiaSteoClassify()
+		if err != nil {
+			utils.FileLog.Info("SyncEiaSteoClassify Err:" + err.Error())
+			fmt.Println("SyncEiaSteoClassify Err:" + err.Error())
+			//return
+		}
+		err = SyncEiaSteoIndex()
+		if err != nil {
+			utils.FileLog.Info("SyncEiaSteoIndex Err:" + err.Error())
+			fmt.Println("SyncEiaSteoIndex Err:" + err.Error())
+			//return
+		}
+
+		//EiaSteo
+		err = SyncEiaSteoIndexDataV2()
+		if err != nil {
+			utils.FileLog.Info("SyncEiaSteoIndexDataV2 Err:" + err.Error())
+			fmt.Println("SyncEiaSteoIndexDataV2 Err:" + err.Error())
+			return
+		}
+	}()
 
 	// UN联合国数据
-	err = SyncComTradeIndex()
-	if err != nil {
-		fmt.Println("SyncComTradeIndexAndData Err:" + err.Error())
-		return
-	}
+	go func() {
+		err = SyncComTradeIndex()
+		if err != nil {
+			utils.FileLog.Info("SyncComTradeIndexAndData Err:" + err.Error())
+			fmt.Println("SyncComTradeIndexAndData Err:" + err.Error())
+			return
+		}
+	}()
 
 	// 美联储加息概率
-	err = SyncMeetingProbabilities()
-	if err != nil {
-		fmt.Println("SyncMeetingProbabilities Err:" + err.Error())
-		return
-	}
+	go func() {
+		err = SyncMeetingProbabilities()
+		if err != nil {
+			utils.FileLog.Info("SyncMeetingProbabilities Err:" + err.Error())
+			fmt.Println("SyncMeetingProbabilities Err:" + err.Error())
+			return
+		}
+	}()
 
 	//广期所
-	{
+	go func() {
 		//分类信息
 		err = SyncFromGuangzhouClassify()
 		if err != nil {
+			utils.FileLog.Info("SyncFromGuangzhouClassify Err:" + err.Error())
 			fmt.Println("SyncFromGuangzhouClassify Err:" + err.Error())
-			return
+			//return
 		}
 		//合约信息
 		err = SyncFromGuangzhouContract()
 		if err != nil {
+			utils.FileLog.Info("SyncFromGuangzhouContract Err:" + err.Error())
 			fmt.Println("SyncFromGuangzhouContract Err:" + err.Error())
-			return
+			//return
 		}
 		//指标信息
 		err = SyncFromGuangzhouIndex()
 		if err != nil {
+			utils.FileLog.Info("SyncFromGuangzhouIndex Err:" + err.Error())
 			fmt.Println("SyncFromGuangzhouIndex Err:" + err.Error())
-			return
+			//return
 		}
 		//数据
 		err = SyncFromGuangzhouTradeData()
 		if err != nil {
+			utils.FileLog.Info("SyncFromGuangzhouTradeData Err:" + err.Error())
 			fmt.Println("SyncFromGuangzhouTradeData Err:" + err.Error())
 			return
 		}
-	}
+	}()
 
 	//ICPI
-	{
+	go func() {
 		//分类信息
 		err = SyncBaseFromIcpiClassify()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromIcpiClassify Err:" + err.Error())
 			fmt.Println("SyncBaseFromIcpiClassify Err:" + err.Error())
-			return
+			//return
 		}
 		//指标信息
 		err = SyncBaseFromIcpi()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromIcpi Err:" + err.Error())
 			fmt.Println("SyncBaseFromIcpi Err:" + err.Error())
-			return
+			//return
 		}
 		err = SyncBaseFromIcpiData()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromIcpiData Err:" + err.Error())
 			fmt.Println("SyncBaseFromIcpiData Err:" + err.Error())
 			return
 		}
-	}
+	}()
 
 	//美国农业部
-	{
+	go func() {
 		//分类信息
 		err = SyncBaseFromUsdaFasClassify()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromUsdaFasClassify Err:" + err.Error())
 			fmt.Println("SyncBaseFromUsdaFasClassify Err:" + err.Error())
-			return
+			//return
 		}
 		//指标信息
 		err = SyncBaseFromUsdaFas()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromUsdaFas Err:" + err.Error())
 			fmt.Println("SyncBaseFromUsdaFas Err:" + err.Error())
-			return
+			//return
 		}
 		err = SyncBaseFromUsdaFasData()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromUsdaFasData Err:" + err.Error())
 			fmt.Println("SyncBaseFromUsdaFasData Err:" + err.Error())
-			return
+			//return
 		}
-	}
+	}()
 
 	//GPR地缘风险指数
-	{
+	go func() {
 		//分类信息
 		err = SyncBaseFromGprRiskClassify()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromGprRiskClassify Err:" + err.Error())
 			fmt.Println("SyncBaseFromGprRiskClassify Err:" + err.Error())
-			return
+			//return
 		}
 		//指标信息
 		err = SyncBaseFromGprRisk()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromGprRisk Err:" + err.Error())
 			fmt.Println("SyncBaseFromGprRisk Err:" + err.Error())
-			return
+			//return
 		}
 		err = SyncBaseFromGprRiskData()
 		if err != nil {
+			utils.FileLog.Info("SyncBaseFromGprRiskData Err:" + err.Error())
 			fmt.Println("SyncBaseFromGprRiskData Err:" + err.Error())
-			return
+			//return
 		}
-	}
+	}()
 
 	// 同步指标数据
 	SyncHzDataIndexData()
@@ -186,21 +223,13 @@ func SyncHzDataIndex(cont context.Context) (err error) {
 // SyncHzDataIndexData 同步指标数据
 func SyncHzDataIndexData() {
 	var err error
-	//EiaSteo
-	err = SyncEiaSteoIndexDataV2()
-	if err != nil {
-		fmt.Println("SyncEiaSteoIndexDataV2 Err:" + err.Error())
-		utils.FileLog.Info("SyncEiaSteoIndexDataV2 Err:" + err.Error())
-		return
-	}
-
 	// 同步un数据
 	err = SyncComTradeData()
 	if err != nil {
+		utils.FileLog.Info("SyncComTradeData Err:" + err.Error())
 		fmt.Println("SyncComTradeData Err:" + err.Error())
 		return
 	}
-
 	//
 	fmt.Println(err)
 }

+ 6 - 0
services/task.go

@@ -31,6 +31,12 @@ func Task() {
 	// 定时汇总数据源终端指标更新情况
 	setEdbSourceStatTask := task.NewTask("setEdbSourceStatTask", "0 20 19,23 * * *", data_stat.SetEdbSourceStatTask)
 	task.AddTask("数据源统计表", setEdbSourceStatTask)
+	
+	// 定时监测AI任务调度情况
+	if utils.BusinessCode == utils.BusinessCodeRelease || utils.BusinessCode == utils.BusinessCodeDebug {
+		checkAiTask := task.NewTask("checkAiTask", "0 */5 * * * *", CheckAiTask)
+		task.AddTask("定时定时监测AI任务调度情况", checkAiTask)
+	}
 
 	// 根据配置对指标的刷新做巡检
 	addEdbInspectionRecord := task.NewTask("addEdbInspectionRecord", "0 */30 * * * * ", AddEdbInspectionRecord)

+ 2 - 2
services/trade_cffex.go

@@ -24,8 +24,8 @@ func SyncRankingFromCffex() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncRankingFromCffex", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.CffexIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)

+ 10 - 8
services/trade_guangzhou.go

@@ -19,8 +19,8 @@ func SyncFromGuangzhouClassify() (err error) {
 		fmt.Println("SyncFromGuangzhouClassify HttpPost Err:", err.Error())
 		return err
 	}
-	utils.FileLog.Info("SyncFromGuangzhouClassify:" + result)
-	fmt.Println("SyncFromGuangzhouClassify result:", result)
+	//utils.FileLog.Info("SyncFromGuangzhouClassify:" + result)
+	//fmt.Println("SyncFromGuangzhouClassify result:", result)
 
 	respObj := new(data_manage.GuangzhouClassifyResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -59,8 +59,8 @@ func SyncFromGuangzhouIndex() (err error) {
 	if err != nil {
 		fmt.Println("SyncFromGuangzhou HttpPost Err:", err.Error())
 	}
-	utils.FileLog.Info(result)
-	fmt.Println("SyncFromGuangzhouIndex result:", result)
+	//utils.FileLog.Info(result)
+	//fmt.Println("SyncFromGuangzhouIndex result:", result)
 
 	respObj := new(data_manage.GuangzhouIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -100,8 +100,8 @@ func SyncFromGuangzhouContract() (err error) {
 	if err != nil {
 		fmt.Println("SyncFromGuangzhouContract HttpPost Err:", err.Error())
 	}
-	utils.FileLog.Info(result)
-	fmt.Println("SyncFromGuangzhouContract result:", result)
+	//utils.FileLog.Info(result)
+	//fmt.Println("SyncFromGuangzhouContract result:", result)
 
 	respObj := new(data_manage.GuangzhouContractResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -154,6 +154,7 @@ func SyncFromGuangzhouTradeData() (err error) {
 	}
 	for _, dv := range allData {
 		tmpKey := dv.IndexCode + "_" + dv.DataTime
+		//fmt.Println("GetAllBaseFromTradeGuangzhouDataList tmpKey:", tmpKey)
 		existDataMap[tmpKey] = dv
 	}
 
@@ -169,8 +170,8 @@ func SyncFromGuangzhouTradeData() (err error) {
 
 		var result string
 		result, err = HttpPost("SyncFromGuangzhouTradeData", method, data)
-		utils.FileLog.Info(result)
-		fmt.Println(result)
+		//utils.FileLog.Info(result)
+		//fmt.Println(result)
 
 		respObj := new(data_manage.BaseFromTradeGuangzhouDataResp)
 		err = json.Unmarshal([]byte(result), &respObj)
@@ -189,6 +190,7 @@ func SyncFromGuangzhouTradeData() (err error) {
 		if respObj.Data.List != nil && len(respObj.Data.List) > 0 {
 			for _, dv := range respObj.Data.List {
 				tmpKey := dv.IndexCode + "_" + dv.DataTime
+				//fmt.Println("SyncFromGuangzhouTradeData2 tmpKey:", tmpKey)
 				if _, ok := existDataMap[tmpKey]; !ok {
 					addDataList = append(addDataList, dv)
 					existDataMap[tmpKey] = dv

+ 2 - 2
services/trade_shanghai.go

@@ -24,8 +24,8 @@ func SyncRankingFromShanghai() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncRankingFromShanghai", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.ShanghaiIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)

+ 6 - 6
services/usda_fas.go

@@ -25,8 +25,8 @@ func SyncBaseFromUsdaFas() (err error) {
 	data["StartDate"] = startDate
 	//data["EndDate"] = endDate
 	result, err := HttpPost("SyncRankingFromUsdaFas", method, data)
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.UsdaFasIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -39,11 +39,11 @@ func SyncBaseFromUsdaFas() (err error) {
 	UsdaFasObj := new(data_manage.BaseFromUsdaFasIndex)
 
 	for _, zv := range respObj.Data {
-		newID, err := UsdaFasObj.InsertOrUpdateBaseFromUsdaFasIndex(zv)
+		_, err := UsdaFasObj.InsertOrUpdateBaseFromUsdaFasIndex(zv)
 		if err != nil {
 			fmt.Println("InsertOrUpdateBaseFromUsdaFasIndex error:", err)
 		}
-		fmt.Println("InsertOrUpdateBaseFromUsdaFasIndex new indexID:", newID)
+		//fmt.Println("InsertOrUpdateBaseFromUsdaFasIndex new indexID:", newID)
 	}
 	return err
 }
@@ -124,8 +124,8 @@ func SyncBaseFromUsdaFasData() (err error) {
 
 		var result string
 		result, err = HttpPost("SyncBaseFromUsdaFasData", method, data)
-		utils.FileLog.Info(result)
-		fmt.Println(result)
+		//utils.FileLog.Info(result)
+		//fmt.Println(result)
 
 		respObj := new(data_manage.BaseFromUsdaFasDataResp)
 		err = json.Unmarshal([]byte(result), &respObj)

+ 75 - 0
services/wechat_platform.go

@@ -56,3 +56,78 @@ func RefreshWechatPlatform(cont context.Context) (err error) {
 
 	return
 }
+
+// AiTaskRecordOp
+// @Description: AI模块任务操作记录
+type AiTaskRecordOp struct {
+	AiTaskRecordId int
+}
+
+// AddAiTaskRecordOpToCache
+// @Description: AI任务操作调度入队列
+// @author: Roc
+// @datetime 2025-04-24 09:41:11
+// @param aiTaskRecordId int
+// @return bool
+func AddAiTaskRecordOpToCache(aiTaskRecordId int) bool {
+	// 如果不在发布和调试模式,那么就不加入缓存
+	if !utils.InArrayByStr([]string{utils.BusinessCodeRelease, utils.BusinessCodeDebug}, utils.BusinessCode) {
+		return true
+	}
+	record := new(AiTaskRecordOp)
+	record.AiTaskRecordId = aiTaskRecordId
+	if utils.Re == nil {
+		err := utils.Rc.LPush(utils.CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK, record)
+
+		utils.FileLog.Info(fmt.Sprintf("将AI任务操作调度入队列 加入缓存 AddAiTaskRecordOpToCache LPush: 记录id:%d", aiTaskRecordId))
+		if err != nil {
+			fmt.Println("AddAiTaskRecordOpToCache LPush Err:" + err.Error())
+		}
+		return true
+	}
+	return false
+}
+
+func CheckAiTask(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("CheckAiTask err:", err)
+		}
+	}()
+	count, err := utils.Rc.LLen(utils.CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK)
+	if err != nil {
+		fmt.Println("CheckAiTask err:", err)
+	}
+	if count > 0 {
+		return
+	}
+
+	// 队列没数据了,那么需要重新检查下任务状态,并将状态为待处理且未完成的任务加入到队列中
+	aiTaskObj := rag.AiTask{}
+	taskList, err := aiTaskObj.GetListByCondition(``, ` AND status = ? `, []interface{}{`processing`}, 0, 10)
+	if err != nil {
+		fmt.Println("CheckAiTask err:", err)
+		return
+	}
+
+	if len(taskList) <= 0 {
+		return
+	}
+
+	aiTaskRecordObj := rag.AiTaskRecord{}
+	for _, v := range taskList {
+		// 查找具体记录
+		recordList, tmpErr := aiTaskRecordObj.GetAllListByCondition(``, ` AND ai_task_id = ? AND status = ? `, []interface{}{v.AiTaskID, `待处理`})
+		if tmpErr != nil {
+			fmt.Println(v.AiTaskID, "获取待处理记录失败; err:", tmpErr)
+			continue
+		}
+
+		// 将具体记录加入到队列中
+		for _, record := range recordList {
+			AddAiTaskRecordOpToCache(record.AiTaskRecordID)
+		}
+	}
+
+	return
+}

+ 2 - 1
utils/constants.go

@@ -181,7 +181,8 @@ const (
 const CACHE_EDB_UPDATE_LOG_ID = "eta:edb_update_log:id"
 
 const (
-	CACHE_WECHAT_PLATFORM_ARTICLE = "wechat_platform:article:op:" //微信文章处理
+	CACHE_WECHAT_PLATFORM_ARTICLE      = "wechat_platform:article:op:"          //微信文章处理
+	CACHE_AI_ARTICLE_ABSTRACT_LLM_TASK = "eta:ai:article:abstract:llm:task:op:" //微信文章/eta报告的摘要重新生成处理(任务调度)
 )
 
 // 指标引用对象

+ 1 - 0
utils/redis.go

@@ -19,6 +19,7 @@ type RedisClient interface {
 	IsExist(key string) bool
 	LPush(key string, val interface{}) error
 	Brpop(key string, callback func([]byte))
+	LLen(key string) (int64, error)
 	GetRedisTTL(key string) time.Duration
 	Incrby(key string, num int) (interface{}, error)
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)

+ 12 - 0
utils/redis/cluster_redis.go

@@ -254,6 +254,18 @@ func (rc *ClusterRedisClient) Brpop(key string, callback func([]byte)) {
 
 }
 
+// LLen
+// @Description: 获取list中剩余的数据数
+// @author: Roc
+// @receiver rc
+// @datetime 2025-04-25 10:58:25
+// @param key string
+// @return int64
+// @return error
+func (rc *ClusterRedisClient) LLen(key string) (int64, error) {
+	return rc.redisClient.LLen(context.TODO(), key).Result()
+}
+
 // GetRedisTTL
 // @Description: 获取key的过期时间
 // @receiver rc

+ 12 - 0
utils/redis/standalone_redis.go

@@ -246,6 +246,18 @@ func (rc *StandaloneRedisClient) Brpop(key string, callback func([]byte)) {
 
 }
 
+// LLen
+// @Description: 获取list中剩余的数据数
+// @author: Roc
+// @receiver rc
+// @datetime 2025-04-25 10:58:25
+// @param key string
+// @return int64
+// @return error
+func (rc *StandaloneRedisClient) LLen(key string) (int64, error) {
+	return rc.redisClient.LLen(context.TODO(), key).Result()
+}
+
 // GetRedisTTL
 // @Description: 获取key的过期时间
 // @receiver rc