Explorar o código

Merge branch 'master' into feature/change_log

xyxie hai 1 ano
pai
achega
63ec885581

+ 1 - 1
.gitignore

@@ -1,6 +1,6 @@
 /.idea
 /eta_task.exe
-/conf/app.conf
+/conf
 /binlog
 /rdlucklog
 /etalogs

+ 1 - 0
go.mod

@@ -23,6 +23,7 @@ require (
 	github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02 // indirect
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
+	github.com/garyburd/redigo v1.6.3 // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
 	github.com/hashicorp/golang-lru v0.5.4 // indirect
 	github.com/josharian/intern v1.0.0 // indirect

+ 1 - 0
go.sum

@@ -100,6 +100,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
 github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
 github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/garyburd/redigo v1.6.3 h1:HCeeRluvAgMusMomi1+6Y5dmFOdYV/JzoRrrbFlkGIc=
 github.com/garyburd/redigo v1.6.3/go.mod h1:rTb6epsqigu3kYKBnaF028A7Tf/Aw5s0cqA47doKKqw=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c/go.mod h1:Gja1A+xZ9BoviGJNA2E9vFkPjjsl+CoJxSXiQM1UXtw=

+ 11 - 0
models/admin_operate_record.go

@@ -0,0 +1,11 @@
+package models
+
+import "github.com/beego/beego/v2/client/orm"
+
+// ClearAdminOperateRecord 清理用户操作日志
+func ClearAdminOperateRecord(date string) (err error) {
+	o := orm.NewOrmUsingDB("eta")
+	sql := `DELETE FROM admin_operate_record WHERE create_time <= ?`
+	_, err = o.Raw(sql, date).Exec()
+	return
+}

+ 22 - 6
models/data_manage/edb_data_base.go

@@ -1,14 +1,32 @@
 package data_manage
 
 import (
-	"errors"
 	"eta/eta_task/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
-	"strconv"
 	"time"
 )
 
+var (
+	EdbDataTableNameMap     map[int]string // 指标来源对应数据表名
+	EdbDataRefreshMethodMap map[int]string // 指标来源对应的刷新指标方法
+)
+
+// InitEdbSourceVar 初始化时加载指标来源对应信息, 避免循环中查库, 注意edb_source表修改table_name的话需要重启服务
+func InitEdbSourceVar() {
+	EdbDataTableNameMap = make(map[int]string)
+	EdbDataRefreshMethodMap = make(map[int]string)
+	sources, e := GetEdbSourceItemsByCondition(``, make([]interface{}, 0), []string{}, "")
+	if e != nil {
+		utils.FileLog.Info("init source table err: %s", e.Error())
+		return
+	}
+	for _, v := range sources {
+		EdbDataTableNameMap[v.EdbSourceId] = v.TableName
+		EdbDataRefreshMethodMap[v.EdbSourceId] = v.EdbRefreshMethod
+	}
+}
+
 func GetEdbDataTableName(source int) (tableName string) {
 	switch source {
 	case utils.DATA_SOURCE_THS:
@@ -96,8 +114,7 @@ func GetEdbDataTableName(source int) (tableName string) {
 	case utils.DATA_SOURCE_PREDICT_CALCULATE_BP:
 		tableName = "edb_data_predict_calculate_bp"
 	default:
-		tableName = ""
-		errors.New("无效的渠道:" + strconv.Itoa(source))
+		tableName = EdbDataTableNameMap[source] // 没有对应的从edb_source中取
 		return
 	}
 	return
@@ -122,8 +139,7 @@ func GetEdbInfoCalculateTableName(source int) (tableName string) {
 	case utils.DATA_SOURCE_CALCULATE_BP:
 		tableName = "edb_info_calculate_bp"
 	default:
-		tableName = ""
-		errors.New("无效的渠道:" + strconv.Itoa(source))
+		tableName = EdbDataTableNameMap[source] // 没有对应的从edb_source中取
 		return
 	}
 	return

+ 44 - 0
models/data_manage/edb_source.go

@@ -0,0 +1,44 @@
+package data_manage
+
+import (
+	"fmt"
+	"github.com/beego/beego/v2/client/orm"
+	"strings"
+)
+
+// EdbSource 指标来源表
+type EdbSource struct {
+	EdbSourceId      int    `orm:"column(edb_source_id);pk"`
+	SourceName       string `description:"指标来源名称"`
+	TableName        string `description:"数据表名"`
+	EdbAddMethod     string `description:"指标新增接口"`
+	EdbRefreshMethod string `description:"指标刷新接口"`
+	IsBase           int    `description:"是否为基础指标: 0-否; 1-是"`
+	FromBridge       int    `description:"是否来源于桥接服务: 0-否; 1-是"`
+	BridgeFlag       string `description:"桥接服务对象标识"`
+	SourceExtend     string `description:"扩展字段做查询用"`
+}
+
+// GetEdbSourceItemsByCondition 获取指标来源列表
+func GetEdbSourceItemsByCondition(condition string, pars []interface{}, fieldArr []string, orderRule string) (items []*EdbSource, err error) {
+	o := orm.NewOrmUsingDB("data")
+	fields := strings.Join(fieldArr, ",")
+	if len(fieldArr) == 0 {
+		fields = `*`
+	}
+	order := `ORDER BY edb_source_id ASC`
+	if orderRule != "" {
+		order = ` ORDER BY ` + orderRule
+	}
+	sql := fmt.Sprintf(`SELECT %s FROM edb_source WHERE 1=1 %s %s`, fields, condition, order)
+	_, err = o.Raw(sql, pars).QueryRows(&items)
+	return
+}
+
+// GetEdbSourceItemByCondition 获取指标来源
+func GetEdbSourceItemByCondition(condition string, pars []interface{}) (item *EdbSource, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := fmt.Sprintf(`SELECT * FROM edb_source WHERE 1=1 %s`, condition)
+	err = o.Raw(sql, pars).QueryRow(&item)
+	return
+}

+ 19 - 7
models/db.go

@@ -19,7 +19,7 @@ func init() {
 	db, _ := orm.GetDB("default")
 	db.SetConnMaxLifetime(10 * time.Minute)
 
-	if utils.MYSQL_URL_RDDP != ""{
+	if utils.MYSQL_URL_RDDP != "" {
 		_ = orm.RegisterDataBase("rddp", "mysql", utils.MYSQL_URL_RDDP)
 		orm.SetMaxIdleConns("rddp", 50)
 		orm.SetMaxOpenConns("rddp", 100)
@@ -28,7 +28,7 @@ func init() {
 		report_db.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-	if utils.MYSQL_URL_EDB != ""{
+	if utils.MYSQL_URL_EDB != "" {
 		_ = orm.RegisterDataBase("edb", "mysql", utils.MYSQL_URL_EDB)
 		orm.SetMaxIdleConns("edb", 50)
 		orm.SetMaxOpenConns("edb", 100)
@@ -37,8 +37,7 @@ func init() {
 		edb_db.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-
-	if utils.MYSQL_URL_DATA != ""{
+	if utils.MYSQL_URL_DATA != "" {
 		_ = orm.RegisterDataBase("data", "mysql", utils.MYSQL_URL_DATA)
 		orm.SetMaxIdleConns("data", 50)
 		orm.SetMaxOpenConns("data", 100)
@@ -48,7 +47,7 @@ func init() {
 
 	}
 
-	if utils.MYSQL_URL_GL != ""{
+	if utils.MYSQL_URL_GL != "" {
 		_ = orm.RegisterDataBase("gl", "mysql", utils.MYSQL_URL_GL)
 		orm.SetMaxIdleConns("gl", 50)
 		orm.SetMaxOpenConns("gl", 100)
@@ -57,7 +56,7 @@ func init() {
 		gl.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-	if utils.MYSQL_URL_ETA != ""{
+	if utils.MYSQL_URL_ETA != "" {
 		_ = orm.RegisterDataBase("eta", "mysql", utils.MYSQL_URL_ETA)
 		orm.SetMaxIdleConns("eta", 50)
 		orm.SetMaxOpenConns("eta", 100)
@@ -66,7 +65,6 @@ func init() {
 		etaDb.SetConnMaxLifetime(10 * time.Minute)
 	}
 
-
 	orm.Debug = true
 	orm.DebugLog = orm.NewLog(utils.Binlog)
 
@@ -78,6 +76,12 @@ func init() {
 
 	//注册持仓分析 数据表
 	initTradePositionTop()
+
+	// 智能研报数据表
+	initSmartReport()
+
+	// 初始化部分数据表变量(直接init会有顺序问题=_=!)
+	data_manage.InitEdbSourceVar()
 }
 
 // initEdbDataTable 注册Edb指标 数据表
@@ -130,3 +134,11 @@ func initTradePositionTop() {
 		new(data_manage.BaseFromTradeClassify), // 交易所分类
 	)
 }
+
+// initSmartReport 注册智能研报数据表
+func initSmartReport() {
+	orm.RegisterModel(
+		new(SmartReport),
+		new(ReportStateRecord),
+	)
+}

+ 2 - 0
models/report.go

@@ -33,6 +33,8 @@ type Report struct {
 	ChapterType        string    `description:"章节类型 day-晨报 week-周报"`
 	OldReportId        int       `description:"research_report表ID(后续一两个版本过渡需要,之后可移除)"`
 	PreMsgSend         int       `description:"定时发布成功后是否立即推送模版消息:0否,1是"`
+	AdminId            int
+	AdminName          string `description:"系统用户名称"`
 }
 
 func GetReportById(reportId int) (item *ReportDetail, err error) {

+ 22 - 0
models/report_state_record.go

@@ -0,0 +1,22 @@
+package models
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+type ReportStateRecord struct {
+	Id         int       `orm:"column(id)" description:"Id"`
+	ReportId   int       // 研报id
+	ReportType int       // 报告类型'报告类型:1中文研报2智能研报'
+	State      int       // 状态:1-未提交 2-待审核 3-驳回 4-审核
+	AdminId    int       // 操作人id
+	AdminName  string    // 操作人姓名
+	CreateTime time.Time // 创建时间
+}
+
+func AddReportStateRecord(item *ReportStateRecord) (lastId int64, err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	lastId, err = o.Insert(item)
+	return
+}

+ 100 - 0
models/smart_report.go

@@ -0,0 +1,100 @@
+package models
+
+import (
+	"fmt"
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// SmartReport 智能研报
+type SmartReport struct {
+	SmartReportId      int       `orm:"column(smart_report_id)" description:"智能研报ID"`
+	ReportCode         string    `description:"报告唯一编码"`
+	ClassifyIdFirst    int       `description:"一级分类ID"`
+	ClassifyNameFirst  string    `description:"一级分类名称"`
+	ClassifyIdSecond   int       `description:"二级分类ID"`
+	ClassifyNameSecond string    `description:"二级分类名称"`
+	AddType            int       `description:"新增方式:1-新增报告;2-继承报告"`
+	Title              string    `description:"标题"`
+	Abstract           string    `description:"摘要"`
+	Author             string    `description:"作者"`
+	Frequency          string    `description:"频度"`
+	Stage              int       `description:"期数"`
+	Content            string    `description:"内容"`
+	ContentSub         string    `description:"内容前两个章节"`
+	ContentStruct      string    `description:"内容组件"`
+	VideoUrl           string    `description:"音频文件URL"`
+	VideoName          string    `description:"音频文件名称"`
+	VideoPlaySeconds   string    `description:"音频播放时长"`
+	VideoSize          string    `description:"音频文件大小,单位M"`
+	AdminId            int       `description:"创建者ID"`
+	AdminRealName      string    `description:"创建者姓名"`
+	State              int       `description:"发布状态:1-待发布;2-已发布"`
+	PublishTime        time.Time `description:"发布时间"`
+	PrePublishTime     time.Time `description:"预发布时间"`
+	PreMsgSend         int       `description:"定时发布后是否推送模版消息:0-否;1-是"`
+	MsgIsSend          int       `description:"消息是否已发送:0-否;1-是"`
+	MsgSendTime        time.Time `description:"模版消息发送时间"`
+	CreateTime         time.Time `description:"创建时间"`
+	ModifyTime         time.Time `description:"修改时间"`
+}
+
+func (m *SmartReport) TableName() string {
+	return "smart_report"
+}
+
+func (m *SmartReport) PrimaryId() string {
+	return "smart_report_id"
+}
+
+func (m *SmartReport) Update(cols []string) (err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	_, err = o.Update(m, cols...)
+	return
+}
+
+func (m *SmartReport) GetItemById(id int) (item *SmartReport, err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	sql := fmt.Sprintf(`SELECT * FROM %s WHERE %s = ? LIMIT 1`, m.TableName(), m.PrimaryId())
+	err = o.Raw(sql, id).QueryRow(&item)
+	return
+}
+
+// GetPrePublishSmartReports 获取定时发布时间为当前时间的未发布的报告列表
+func GetPrePublishSmartReports(startTime, endTime string) (list []*SmartReport, err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	sql := `SELECT * FROM smart_report WHERE state = 1 AND pre_publish_time >= ? AND pre_publish_time <= ?`
+	_, err = o.Raw(sql, startTime, endTime).QueryRows(&list)
+	return
+}
+
+// PublishSmartReportById 发布智能报告
+func PublishSmartReportById(reportId int, publishTime time.Time) (err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	sql := `UPDATE smart_report SET state = 2, publish_time = ?, msg_is_send = 1, msg_send_time = NOW(), modify_time = NOW() WHERE smart_report_id = ?`
+	_, err = o.Raw(sql, publishTime, reportId).Exec()
+	return
+}
+
+// ElasticSmartReport 智能研报es
+type ElasticSmartReport struct {
+	SmartReportId      int    `description:"智能研报ID"`
+	Title              string `description:"标题"`
+	Abstract           string `description:"摘要"`
+	BodyContent        string `description:"内容"`
+	PublishTime        string `description:"发布时间"`
+	PublishState       int    `description:"发布状态 1-未发布 2-已发布"`
+	Author             string `description:"作者"`
+	ClassifyIdFirst    int    `description:"一级分类ID"`
+	ClassifyNameFirst  string `description:"一级分类名称"`
+	ClassifyIdSecond   int    `description:"二级分类ID"`
+	ClassifyNameSecond string `description:"二级分类名称"`
+	StageStr           string `description:"报告期数"`
+	Frequency          string `description:"频度"`
+}
+
+// Report2ImgQueueReq 报告详情生成长图队列请求体
+type Report2ImgQueueReq struct {
+	ReportType int    `description:"报告类型: 1-研报; 2-智能研报"`
+	ReportCode string `description:"报告唯一编码"`
+}

+ 28 - 0
services/admin.go

@@ -0,0 +1,28 @@
+package services
+
+import (
+	"context"
+	"eta/eta_task/models"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+// ClearAdminOperateLog 清理三个月前的用户操作日志
+func ClearAdminOperateLog(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("清理用户操作日志, ClearAdminOperateLog error: %s", err.Error())
+			fmt.Println(tips)
+			go alarm_msg.SendAlarmMsg(tips, 2)
+		}
+	}()
+
+	preTime := time.Now().Local().AddDate(0, -3, 0).Format(utils.FormatDateTime)
+	e := models.ClearAdminOperateRecord(preTime)
+	if e != nil {
+		err = fmt.Errorf("ClearAdminOperateRecord: %s", e.Error())
+	}
+	return
+}

+ 7 - 0
services/data/base_edb_lib.go

@@ -3,6 +3,7 @@ package data
 import (
 	"encoding/json"
 	"eta/eta_task/models"
+	"eta/eta_task/models/data_manage"
 	"eta/eta_task/utils"
 	"fmt"
 	"io/ioutil"
@@ -16,6 +17,7 @@ func RefreshEdbData(edbInfoId, source int, edbCode, startDate string) (resp *mod
 	param["EdbCode"] = edbCode
 	param["EdbInfoId"] = edbInfoId
 	param["StartDate"] = startDate
+	param["Source"] = source
 	urlStr := ``
 	switch source {
 	case utils.DATA_SOURCE_THS:
@@ -66,6 +68,8 @@ func RefreshEdbData(edbInfoId, source int, edbCode, startDate string) (resp *mod
 		urlStr = "baiinfo/refresh"
 	case utils.DATA_SOURCE_NATIONAL_STATISTICS:
 		urlStr = "national_statistics/refresh"
+	default:
+		urlStr = data_manage.EdbDataRefreshMethodMap[source] // 没有对应的从edb_source中取
 	}
 	if urlStr == "" {
 		err = fmt.Errorf(fmt.Sprint("source:", source, ";未实现该指标的刷新接口,请联系管理员"))
@@ -127,6 +131,9 @@ func HttpPost(url, postData string, params ...string) ([]byte, error) {
 	req.Header.Set("Content-Type", contentType)
 	req.Header.Set("authorization", utils.MD5(utils.APP_EDB_LIB_NAME_EN+utils.EDB_LIB_Md5_KEY))
 	resp, err := client.Do(req)
+	if err != nil {
+		return nil, err
+	}
 	defer resp.Body.Close()
 	b, err := ioutil.ReadAll(resp.Body)
 	fmt.Println("HttpPost:" + string(b))

+ 32 - 0
services/data/base_from_jiayue.go

@@ -0,0 +1,32 @@
+package data
+
+import (
+	"context"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/utils"
+	"fmt"
+)
+
+// SyncJiaYueNewIndex 同步嘉悦物产增量指标-每分钟
+func SyncJiaYueNewIndex(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			tips := "SyncJiaYueNewIndex-同步嘉悦物产增量指标失败, ErrMsg:\n" + err.Error()
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	param := make(map[string]interface{})
+	uri := "jiayue_index/sync_new_index"
+	res, e := postRefreshEdbData(param, uri)
+	if e != nil {
+		err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
+		return
+	}
+	if res != nil && res.Ret != 200 {
+		err = fmt.Errorf("postRefreshEdbData fail")
+		return
+	}
+	return
+}

+ 70 - 1
services/data/edb_info.go

@@ -7,6 +7,7 @@ import (
 	"eta/eta_task/services/alarm_msg"
 	"eta/eta_task/utils"
 	"fmt"
+	"strconv"
 	"strings"
 	"sync"
 )
@@ -121,7 +122,7 @@ func RefreshDataFromPb(wg *sync.WaitGroup) (err error) {
 	}()
 	var condition string
 	var pars []interface{}
-	condition += " AND source=? "
+	condition += " AND source=? AND frequency in ('日度','周度') "
 	pars = append(pars, utils.DATA_SOURCE_PB)
 
 	items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
@@ -1028,3 +1029,71 @@ func RefreshDataFromEic(wg *sync.WaitGroup) (err error) {
 	}
 	return err
 }
+
+// RefreshDataFromBridge 刷新来自桥接服务的来源
+func RefreshDataFromBridge(wg *sync.WaitGroup) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Info("RefreshDataFromBridge ErrMsg: %s", err.Error())
+			go alarm_msg.SendAlarmMsg("RefreshDataFromBridge ErrMsg: "+err.Error(), 3)
+		}
+		wg.Done()
+	}()
+
+	// 获取来自桥接服务的指标来源
+	cond := ` AND from_bridge = 1`
+	pars := make([]interface{}, 0)
+	sources, e := data_manage.GetEdbSourceItemsByCondition(cond, pars, []string{}, "")
+	if e != nil {
+		err = fmt.Errorf("获取来自桥接服务的数据源失败, err: %s", e.Error())
+		return
+	}
+	sourceArr := make([]string, 0)
+	for _, v := range sources {
+		if v.EdbSourceId <= 0 {
+			continue
+		}
+		sourceArr = append(sourceArr, strconv.Itoa(v.EdbSourceId))
+	}
+	if len(sourceArr) == 0 {
+		utils.FileLog.Info("RefreshDataFromBridge无来源")
+		return
+	}
+
+	// 获取指标
+	edbCond := fmt.Sprintf(` AND source IN (%s)`, utils.GetOrmInReplace(len(sourceArr)))
+	edbPars := make([]interface{}, 0)
+	edbPars = append(edbPars, sourceArr)
+	items, e := data_manage.GetEdbInfoByCondition(edbCond, edbPars, 0)
+	if e != nil {
+		err = fmt.Errorf("GetEdbInfoByCondition err: %s", e.Error())
+		return
+	}
+
+	for _, v := range items {
+		startDate := ""
+		if v.Frequency == "日度" {
+			startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+		} else if v.Frequency == "周度" {
+			startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate)
+		} else if v.Frequency == "月度" {
+			startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "季度" {
+			startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "年度" {
+			startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate)
+		} else {
+			startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+		}
+		resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.EdbCode, startDate)
+		if e != nil {
+			err = fmt.Errorf("RefreshEdbData err: %s", e.Error())
+			return
+		}
+		if resp.Ret != 200 {
+			err = fmt.Errorf("RefreshEdbData Err: %s; ErrMsg: %s", resp.Msg, resp.ErrMsg)
+			return
+		}
+	}
+	return
+}

+ 18 - 15
services/data/edb_info_sync.go

@@ -3,6 +3,7 @@ package data
 import (
 	"eta/eta_task/models/data_manage"
 	"eta/eta_task/utils"
+	"strings"
 	"sync"
 )
 
@@ -47,15 +48,18 @@ func SyncGlDataBase() {
 
 var manualLock sync.Mutex
 
-// 同步手工数据
+// SyncManualDataBase 同步手工数据
 func SyncManualDataBase() {
 	var err error
+	errMsgList := make([]string, 0)
+
 	manualLock.Lock()
 	defer func() {
 		manualLock.Unlock()
-		if err != nil {
-			utils.FileLog.Info("SyncManualDataBase Err:" + err.Error())
-			go utils.SendEmailByHongze("同步手工数据失败", "同步手工数据失败 Err:"+err.Error(), utils.RefreshEdbInfoEmailSendToUsers, "", "")
+		if len(errMsgList) > 0 {
+			errMsg := strings.Join(errMsgList, "\n")
+			utils.FileLog.Info("SyncManualDataBase Err:" + errMsg)
+			go utils.SendEmailByHongze("同步手工数据失败", "同步手工数据失败 Err:"+errMsg, utils.RefreshEdbInfoEmailSendToUsers, "", "")
 		}
 	}()
 	var condition string
@@ -66,22 +70,21 @@ func SyncManualDataBase() {
 	pars = append(pars, utils.DATA_SOURCE_MANUAL)
 	items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)
 	if err != nil {
+		errMsgList = append(errMsgList, "获取指标失败 ;Err:"+err.Error())
 		return
 	}
 	for _, v := range items {
-		err = data_manage.RefreshManual(v)
-		if err != nil {
-			return
-		}
-		item, err := data_manage.GetEdbInfoMaxAndMinInfo(v.Source, v.EdbCode)
+		startDate := v.StartDate.Format(utils.FormatDate)
+		resp, err := RefreshEdbData(v.EdbInfoId, v.Source, v.EdbCode, startDate)
 		if err != nil {
-			return
+			//return errors.New("RefreshEdbData Err:" + err.Error())
+			errMsgList = append(errMsgList, "RefreshEdbData Err:"+err.Error())
+			continue
 		}
-		if item != nil {
-			err = data_manage.ModifyEdbInfoMaxAndMinInfo(v.EdbInfoId, item)
-			if err != nil {
-				return
-			}
+		if resp.Ret != 200 {
+			//return errors.New("RefreshEdbData Err:" + resp.Msg + ";ErrMsg:" + resp.ErrMsg)
+			errMsgList = append(errMsgList, "RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
+			continue
 		}
 	}
 }

+ 59 - 0
services/elastic.go

@@ -150,3 +150,62 @@ func EsAddOrEditEnglishReport(indexName, docId string, item *models.ElasticEngli
 	}
 	return
 }
+
+// EsAddOrEditSmartReport 新增编辑es智能研报
+func EsAddOrEditSmartReport(indexName, docId string, item *models.ElasticSmartReport) (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println("EsAddOrEditSmartReport Err:", err.Error())
+		}
+	}()
+	client, err := NewClient()
+	if err != nil {
+		return
+	}
+	// docId为报告ID
+	searchById, err := client.Get().Index(indexName).Id(docId).Do(context.Background())
+	if err != nil && !strings.Contains(err.Error(), "404") {
+		fmt.Println("Get Err" + err.Error())
+		return
+	}
+	if searchById != nil && searchById.Found {
+		resp, err := client.Update().Index(indexName).Id(docId).Doc(map[string]interface{}{
+			"SmartReportId":      item.SmartReportId,
+			"Title":              item.Title,
+			"Abstract":           item.Abstract,
+			"BodyContent":        item.BodyContent,
+			"PublishTime":        item.PublishTime,
+			"PublishState":       item.PublishState,
+			"Author":             item.Author,
+			"ClassifyIdFirst":    item.ClassifyIdFirst,
+			"ClassifyNameFirst":  item.ClassifyNameFirst,
+			"ClassifyIdSecond":   item.ClassifyIdSecond,
+			"ClassifyNameSecond": item.ClassifyNameSecond,
+			"StageStr":           item.StageStr,
+			"Frequency":          item.Frequency,
+		}).Do(context.Background())
+		if err != nil {
+			return err
+		}
+		//fmt.Println(resp.Status, resp.Result)
+		if resp.Status == 0 {
+			fmt.Println("修改成功" + docId)
+			err = nil
+		} else {
+			fmt.Println("EditData", resp.Status, resp.Result)
+		}
+	} else {
+		resp, err := client.Index().Index(indexName).Id(docId).BodyJson(item).Do(context.Background())
+		if err != nil {
+			fmt.Println("新增失败:", err.Error())
+			return err
+		}
+		if resp.Status == 0 && resp.Result == "created" {
+			fmt.Println("新增成功" + docId)
+			return nil
+		} else {
+			fmt.Println("AddData", resp.Status, resp.Result)
+		}
+	}
+	return
+}

+ 7 - 5
services/hz_data_api.go

@@ -39,16 +39,18 @@ func HttpPost(funcName, method string, postDataMap map[string]interface{}) (resu
 		return "", err
 	}
 	req.Header.Set("content-type", "application/json")
-	utils.FileLog.Info(fmt.Sprintf("请求函数:%s ;请求体:%s", funcName, req))
+	utils.FileLog.Info(fmt.Sprintf("请求函数:%s ;请求地址:%s;请求体:%s", funcName, utils.HzDataApi+method, string(bytesData)))
 	resp, err := client.Do(req)
 	if err != nil {
 		return "", err
 	}
-	body, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		return "", err
+	if resp.Body != nil {
+		body, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return "", err
+		}
+		result = string(body)
 	}
-	result = string(body)
 	utils.FileLog.Info(fmt.Sprintf("请求函数:%s ;返回体:%s", funcName, result))
 	return
 }

+ 11 - 1
services/report.go

@@ -194,7 +194,17 @@ func PublishReport(cont context.Context) (err error) {
 				}
 			}
 		}()
-
+		recordItem := &models.ReportStateRecord{
+			ReportId:   item.Id,
+			ReportType: 1,
+			State:      2,
+			AdminId:    item.AdminId,
+			AdminName:  item.AdminName,
+			CreateTime: time.Now(),
+		}
+		go func() {
+			_, _ = models.AddReportStateRecord(recordItem)
+		}()
 	}
 	return
 }

+ 145 - 0
services/smart_report.go

@@ -0,0 +1,145 @@
+package services
+
+import (
+	"context"
+	"eta/eta_task/models"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/utils"
+	"fmt"
+	"html"
+	"strconv"
+	"time"
+)
+
+// PublishSmartReport 定时发布智能研报
+func PublishSmartReport(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			go alarm_msg.SendAlarmMsg("PublishSmartReport-定时发布智能研报失败, ErrMsg:\n"+err.Error(), 3)
+		}
+	}()
+
+	now := time.Now().Format(utils.FormatDateTimeMinute)
+	startTime := now + ":00"
+	endTime := now + ":59"
+
+	list, e := models.GetPrePublishSmartReports(startTime, endTime)
+	if e != nil {
+		err = fmt.Errorf("GetPrePublishSmartReports err: %s", e.Error())
+		return
+	}
+	listLen := len(list)
+	if listLen == 0 {
+		return
+	}
+
+	// 比对时间(分钟), 时间相等则发布并推送
+	for i := 0; i < listLen; i++ {
+		item := list[i]
+		var publishTime time.Time
+		// 如果报告曾经发布过,并且已经发送过模版消息,则发布时间为原发布时间
+		if item.MsgIsSend == 1 && !item.PublishTime.IsZero() {
+			publishTime = item.PublishTime
+		} else {
+			publishTime = time.Now()
+		}
+		// 发布报告, 同时更新消息推送状态, 消息推送目前仅为预留功能, 后面小程序需要售卖的时候再细化消息推送
+		if err = models.PublishSmartReportById(item.SmartReportId, publishTime); err != nil {
+			return
+		}
+
+		// 写入队列
+		var queue models.Report2ImgQueueReq
+		queue.ReportType = 2
+		queue.ReportCode = item.ReportCode
+		_ = utils.Rc.LPush(utils.CACHE_CREATE_REPORT_IMGPDF_QUEUE, queue)
+
+		// 生成音频, 更新ES
+		go func() {
+			if item.VideoUrl == "" {
+				SmartReportBuildVideoAndUpdate(item)
+			}
+			_ = SmartReportElasticUpsert(item.SmartReportId, 2)
+		}()
+
+		recordItem := &models.ReportStateRecord{
+			ReportId:   item.SmartReportId,
+			ReportType: 2,
+			State:      2,
+			AdminId:    item.AdminId,
+			AdminName:  item.AdminRealName,
+			CreateTime: time.Now(),
+		}
+		go func() {
+			_, _ = models.AddReportStateRecord(recordItem)
+		}()
+	}
+	return
+}
+
+// SmartReportBuildVideoAndUpdate 生成音频
+func SmartReportBuildVideoAndUpdate(item *models.SmartReport) {
+	if item == nil {
+		return
+	}
+	var err error
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("智能研报-音频生成, errMsg: %s", err.Error())
+			go alarm_msg.SendAlarmMsg(tips, 2)
+		}
+	}()
+
+	videoUrl, videoName, videoSize, videoPlaySeconds, e := CreateReportVideo(item.Title, item.Content, time.Now().Local().Format(utils.FormatDateTime))
+	if e != nil {
+		err = fmt.Errorf("create audio err: %s", e.Error())
+		return
+	}
+	item.VideoUrl = videoUrl
+	item.VideoName = videoName
+	item.VideoSize = videoSize
+	item.VideoPlaySeconds = fmt.Sprintf("%.2f", videoPlaySeconds)
+	item.ModifyTime = time.Now().Local()
+	cols := []string{"VideoUrl", "VideoName", "VideoSize", "VideoPlaySeconds", "ModifyTime"}
+	if e = item.Update(cols); e != nil {
+		err = fmt.Errorf("smart report update err: %s", e.Error())
+		return
+	}
+}
+
+// SmartReportElasticUpsert 新增/编辑报告es
+func SmartReportElasticUpsert(smartReportId int, state int) (err error) {
+	if smartReportId <= 0 {
+		return
+	}
+
+	reportOB := new(models.SmartReport)
+	item, e := reportOB.GetItemById(smartReportId)
+	if e != nil {
+		if e.Error() == utils.ErrNoRow() {
+			// 可能被删了就直接忽略掉
+			return
+		}
+		err = fmt.Errorf("获取报告失败, Err: %s", e.Error())
+		return
+	}
+
+	esReport := new(models.ElasticSmartReport)
+	esReport.SmartReportId = item.SmartReportId
+	esReport.Title = item.Title
+	esReport.Abstract = item.Abstract
+	esReport.BodyContent = utils.TrimHtml(html.UnescapeString(item.Content))
+	esReport.PublishTime = item.PublishTime.Format(utils.FormatDateTime)
+	esReport.PublishState = state
+	esReport.Author = item.Author
+	esReport.ClassifyIdFirst = item.ClassifyIdFirst
+	esReport.ClassifyNameFirst = item.ClassifyNameFirst
+	esReport.ClassifyIdSecond = item.ClassifyIdSecond
+	esReport.ClassifyNameSecond = item.ClassifyNameSecond
+	esReport.StageStr = strconv.Itoa(item.Stage)
+	esReport.Frequency = item.Frequency
+	if err = EsAddOrEditSmartReport(utils.SmartReportIndexName, strconv.Itoa(item.SmartReportId), esReport); err != nil {
+		return
+	}
+	return
+}

+ 21 - 2
services/task.go

@@ -17,6 +17,11 @@ func Task() {
 	if utils.RunMode == "release" {
 		releaseTask()
 	}
+
+	// 定时发布智能研报
+	publishSmartReport := task.NewTask("publishSmartReport", "0 */1 * * * *", PublishSmartReport)
+	task.AddTask("定时发布智能研报", publishSmartReport)
+
 	task.StartTask()
 	fmt.Println("task end")
 }
@@ -24,7 +29,7 @@ func Task() {
 // 生产环境需要走的任务
 func releaseTask() {
 	//同步指标
-	if utils.BusinessCode != "E2023080900" {
+	if utils.BusinessCode != utils.BusinessCodeRelease {
 		syncHzDataIndex := task.NewTask("syncHzDataIndex", "0 10,20,40,50 16,18 * * *", SyncHzDataIndex)
 		task.AddTask("syncHzDataIndex", syncHzDataIndex)
 	}
@@ -78,11 +83,21 @@ func releaseTask() {
 	// 每天清理两周前的报告保存日志
 	clearReportSaveLog := task.NewTask("clearReportSaveLog", "0 15 23 * * *", ClearReportSaveLog)
 	task.AddTask("定时清理报告保存日志", clearReportSaveLog)
+
+	// 每天清理三个月前的用户操作日志
+	clearAdminOperateLog := task.NewTask("clearAdminOperateLog", "0 20 23 * * *", ClearAdminOperateLog)
+	task.AddTask("定时清理用户操作日志", clearAdminOperateLog)
+
+	// 嘉悦物产-每分钟定时同步增量指标
+	if utils.BusinessCode == utils.BusinessCodeJiaYue {
+		syncJiaYueNewIndex := task.NewTask("syncJiaYueNewIndex", "0 */1 * * * *", data.SyncJiaYueNewIndex)
+		task.AddTask("定时同步嘉悦物产增量指标", syncJiaYueNewIndex)
+	}
 }
 
 func RefreshData(cont context.Context) (err error) {
 	wg := sync.WaitGroup{}
-	wg.Add(16)
+	wg.Add(15)
 	//hour := time.Now().Hour()
 	//if hour != 0 {
 	//}
@@ -118,6 +133,10 @@ func RefreshData(cont context.Context) (err error) {
 	//国家统计局指标
 	go data.RefreshDataFromNationalStatistics(&wg)
 
+	// 来自桥接服务的指标
+	wg.Add(1)
+	go data.RefreshDataFromBridge(&wg)
+
 	wg.Wait()
 	////计算指标
 	data.RefreshDataFromCalculateAll()

+ 18 - 0
utils/config.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	beego "github.com/beego/beego/v2/adapter"
 	"github.com/beego/beego/v2/server/web"
+	"github.com/rdlucklib/rdluck_tools/cache"
 	"strconv"
 )
 
@@ -15,6 +16,10 @@ var (
 	MYSQL_URL_DATA string
 	MYSQL_URL_GL   string
 	MYSQL_URL_ETA  string
+
+	REDIS_CACHE string       //缓存地址
+	Rc          *cache.Cache //redis缓存
+	Re          error        //redis错误
 )
 
 var (
@@ -83,9 +88,11 @@ var (
 	ES_PASSWORD string // ES密码
 )
 
+// ES索引配置
 var (
 	EsReportIndexName        string //研报ES索引
 	EsEnglishReportIndexName string //英文研报ES索引
+	SmartReportIndexName     string //智能研报ES索引
 )
 
 // 科大讯飞--语音合成
@@ -145,6 +152,16 @@ func init() {
 	MYSQL_URL_GL = config["mysql_url_gl"]
 	MYSQL_URL_ETA = config["mysql_url_eta"]
 
+	REDIS_CACHE = config["beego_cache"]
+	if len(REDIS_CACHE) <= 0 {
+		panic(any("redis链接参数没有配置"))
+	}
+	Rc, Re = cache.NewCache(REDIS_CACHE) //初始化缓存
+	if Re != nil {
+		fmt.Println(Re)
+		panic(any(Re))
+	}
+
 	// 项目中文名称
 	appNameCn, err := web.AppConfig.String("app_name_cn")
 	if err != nil {
@@ -197,6 +214,7 @@ func init() {
 	{
 		EsReportIndexName = config["es_report_index_name"]
 		EsEnglishReportIndexName = config["es_english_report_index_name"]
+		SmartReportIndexName = config["es_smart_report_index_name"]
 	}
 
 	// 科大讯飞

+ 11 - 0
utils/constants.go

@@ -116,3 +116,14 @@ var (
 const (
 	TEMPLATE_MSG_REPORT = iota + 1 //日度点评报告推送
 )
+
+// 缓存key
+const (
+	CACHE_CREATE_REPORT_IMGPDF_QUEUE = "eta_report:report_img_pdf_queue" // 生成报告长图PDF队列
+)
+
+// 商户号
+const (
+	BusinessCodeRelease = "E2023080900" // 生产环境
+	BusinessCodeJiaYue  = "E2023092201" // 嘉悦物产
+)