package services import ( "context" "eta/eta_task/services/data" "eta/eta_task/services/data/future_good" "eta/eta_task/services/data_stat" "eta/eta_task/utils" "fmt" "sync" "time" "github.com/beego/beego/v2/task" ) func Task() { fmt.Println("task start") //如果是生产环境,才需要走这些任务 if utils.RunMode == "release" { releaseTask() } // 定时发布智能研报 publishSmartReport := task.NewTask("publishSmartReport", "0 */1 * * * *", PublishSmartReport) task.AddTask("定时发布智能研报", publishSmartReport) // 定时发布研报 publishReport := task.NewTask("publishReport", "0 */1 * * * *", PublishReport) task.AddTask("定时发布研报", publishReport) // 定时汇总数据源终端指标更新情况 setEdbSourceStatTask := task.NewTask("setEdbSourceStatTask", "0 20 19,23 * * *", data_stat.SetEdbSourceStatTask) task.AddTask("数据源统计表", setEdbSourceStatTask) XyTask() // 测试-刷新多空分析/相关性表格 if utils.RunMode == "debug" { refreshTradeAnalysisTables := task.NewTask("refreshTradeAnalysisTables", "0 0 20 * * *", data.RefreshTradeAnalysisTables) task.AddTask("refreshTradeAnalysisTables", refreshTradeAnalysisTables) } task.StartTask() fmt.Println("task end") } // 生产环境需要走的任务 func releaseTask() { //同步指标 if utils.BusinessCode != utils.BusinessCodeRelease { syncHzDataIndex := task.NewTask("syncHzDataIndex", "0 10,20,40,50 16,18 * * *", SyncHzDataIndex) task.AddTask("syncHzDataIndex", syncHzDataIndex) syncRankingFromDalian := task.NewTask("syncRankingFromDalian", "0 30,40,50 16,18 * * *", SyncRankingFromDalian) task.AddTask("syncRankingFromDalian", syncRankingFromDalian) } // 定时统计交易所的持仓分析数据 initPositionTask := task.NewTask("initPositionTask", "0 20,40 16-19 * * *", data.InitPositionTask) task.AddTask("initPositionTask", initPositionTask) //刷新指标数据 refreshData := task.NewTask("refreshData", "0 30 0,19 * * *", RefreshData) task.AddTask("refreshData", refreshData) // 根据配置刷新指标数据 configRefreshData := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", ConfigRefreshData) task.AddTask("configRefreshData", configRefreshData) // 定时刷新计算指标-默认每天的7:05,8:05分刷新计算指标 refreshCalculateEdbTaskTime := utils.RefreshCalculateEdbTaskTime if refreshCalculateEdbTaskTime == "" { refreshCalculateEdbTaskTime = "0 5 7,8 * * *" } refreshAllCalculateEdbData := task.NewTask("refreshAllCalculateEdbData", refreshCalculateEdbTaskTime, RefreshAllCalculateEdbData) task.AddTask("refreshAllCalculateEdbData", refreshAllCalculateEdbData) // 定时禁用钢联化工和wind指标的刷新状态 disableEdbRefresh := task.NewTask("disableEdbRefresh", "0 0 10 * * *", DisableEdbRefresh) task.AddTask("disableEdbRefresh", disableEdbRefresh) //同步弘则数据库中来自,钢联,隆众,有色,人工等基础数据--每隔五分钟,同步一次最新数据 syncBaseData := task.NewTask("syncBaseData", "0 */5 * * * * ", SyncBaseData) task.AddTask("syncBaseData", syncBaseData) syncBaseDataExt := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", SyncBaseDataExt) task.AddTask("syncBaseDataExt", syncBaseDataExt) //初始化指标更新状态 resetEdbInfoIsUpdate := task.NewTask("resetEdbInfoIsUpdate", "0 0 0 * * *", data.ResetEdbInfoIsUpdate) task.AddTask("resetEdbInfoIsUpdate", resetEdbInfoIsUpdate) // 刷新商品数据 refreshFutureGoodData := task.NewTask("refreshFutureGoodData", "0 30 0,19 * * *", RefreshFutureGoodData) task.AddTask("refreshFutureGoodData", refreshFutureGoodData) //刷新交易所指标数据 refreshTradeData := task.NewTask("refreshData", "0 1 4 * * *", RefreshTradeData) task.AddTask("refreshTradeData", refreshTradeData) //刷新欧洲天然气指标数据 refreshEicData := task.NewTask("refreshData", "0 1 3,7 * * *", RefreshEicData) task.AddTask("refreshEicData", refreshEicData) //检测数据服务器 checkDataServer := task.NewTask("checkDataServer", "0 */2 * * * * ", checkDataServer) task.AddTask("checkDataServer", checkDataServer) //删除日志 report_save_log,ppt_v2_save_log,保留一个月的 deleteLog := task.NewTask("syncSubStatus", "0 0 2 2 * *", DeleteLog) task.AddTask("deleteLog", deleteLog) // 定时发布英文研报 publishEnglishReport := task.NewTask("publishEnglishReport", "0 */1 * * * *", PublishEnglishReport) task.AddTask("定时发布英文研报", publishEnglishReport) // 每天清理两周前的报告保存日志 clearReportSaveLog := task.NewTask("clearReportSaveLog", "0 15 23 * * *", ClearReportSaveLog) task.AddTask("定时清理报告保存日志", clearReportSaveLog) // 每天清理三个月前的用户操作日志 clearAdminOperateLog := task.NewTask("clearAdminOperateLog", "0 20 23 * * *", ClearAdminOperateLog) task.AddTask("定时清理用户操作日志", clearAdminOperateLog) // 嘉悦物产 if utils.BusinessCode == utils.BusinessCodeJiaYue { // 每10分钟定时同步增量指标 syncJiaYueNewIndex := task.NewTask("syncJiaYueNewIndex", "0 */10 * * * *", data.SyncJiaYueNewIndex) task.AddTask("定时同步嘉悦物产增量指标", syncJiaYueNewIndex) // 每30分钟同步一次数据宝指标数据 syncJiaYueDataBaby := task.NewTask("syncJiaYueDataBaby", "0 */30 * * * * ", data.RefreshJiaYueDataFromBridge) task.AddTask("syncJiaYueDataBaby", syncJiaYueDataBaby) } if utils.BusinessCode == utils.BusinessCodeZhongJi { // 每天同步一次指标列表 syncZhongJiIndexList := task.NewTask("syncZhongJiIndexList", "0 0 17 * * *", data.SyncZhongJiIndexList) task.AddTask("定时同步中基宁波SMM指标列表", syncZhongJiIndexList) } // 中石油新加坡 if utils.IsPCSG == "1" { refreshPCSGBloomberg := task.NewTask("refreshPCSGBloombergDaily", "0 */30 * * * *", data.RefreshPCSGBloomberg) task.AddTask("中石油新加坡-每日Bloomberg指标刷新", refreshPCSGBloomberg) } // 刷新同花顺高频 refreshThsHfBase := task.NewTask("refreshThsHfBase", "0 0 0,6,9,12,15,18,21 * * *", data.RefreshBaseFromThsHfIndex) task.AddTask("refreshThsHfBase", refreshThsHfBase) // (基于交易所数据)多空分析/相关性表格刷新 refreshTradeAnalysisTables := task.NewTask("refreshTradeAnalysisTables", "0 0 20 * * *", data.RefreshTradeAnalysisTables) task.AddTask("refreshTradeAnalysisTables", refreshTradeAnalysisTables) } func RefreshData(cont context.Context) (err error) { wg := sync.WaitGroup{} wg.Add(14) //hour := time.Now().Hour() //if hour != 0 { //} //彭博 go data.RefreshDataFromPb(&wg) //彭博财务 go data.RefreshDataFromPbFinance(&wg) //手工数据 go data.RefreshDataFromManual(&wg) //隆众数据 //go data.RefreshDataFromLz(&wg) //有色 go data.RefreshDataFromYs(&wg) //钢联 go data.RefreshDataFromGl(&wg) //路透 go data.RefreshDataFromLt(&wg) //煤炭 go data.RefreshDataFromCoal(&wg) //谷歌出行数据 go data.RefreshDataFromGoogleTravel(&wg) //钢联化工 go data.RefreshDataFromMysteelChemical(&wg) //eia steo报告指标 go data.RefreshDataFromEiaSteo(&wg) //UN报告指标 go data.RefreshDataFromComTrade(&wg) //卓创报告指标 go data.RefreshDataFromSci(&wg) //国家统计局指标 go data.RefreshDataFromNationalStatistics(&wg) //富宝指标刷新 go data.RefreshDataFromFubao(&wg) // Bloomberg go func() { wg.Add(1) _ = data.RefreshDataFromBloomberg(&wg) }() // CCF化纤信息 go func() { wg.Add(1) _ = data.RefreshDataFromCCF(&wg) }() // 持仓分析 go func() { wg.Add(1) _ = data.RefreshDataFromTradeAnalysis(&wg) }() wg.Wait() ////计算指标 data.RefreshDataFromCalculateAll() // 刷新所有的基础预测指标 data.RefreshBasePredictDataAll() // 预测计算指标 data.RefreshPredictDataFromCalculateAll() // 指标系列计算数据 _ = data.RefreshFactorEdbCalculateData() // 指标系列图表计算数据 _ = data.RefreshFactorEdbChartCalculateData() // 刷新商品利润曲线图表数据 _ = data.RefreshFutureGoodProfitChart() // 刷新计算指标-依赖指标id大于当前计算指标id的计算指标 data.RefreshDataFromCalculateAfter() time.Sleep(5 * time.Second) //data.RefreshNotice() fmt.Println("Refresh End") return } // 刷新所有计算指标数据 func RefreshAllCalculateEdbData(cont context.Context) (err error) { ////计算指标 err = data.RefreshDataFromCalculateAll() return } // SyncBaseData 刷新基础数据 func SyncBaseData(cont context.Context) (err error) { now := time.Now() if now.Hour() == 0 || now.Hour() == 19 { return nil } //同步钢联基础数据 if utils.BusinessCode == utils.BusinessCodeRelease { go data.SyncGlDataBase() } return } // SyncBaseDataExt 刷新基础数据 func SyncBaseDataExt(cont context.Context) (err error) { now := time.Now() if now.Hour() == 0 || now.Hour() == 19 { return nil } //同步手工数据 go data.SyncManualDataBase() //刷新图表中,指标的最新日期 go data.SetChartEdbEndDate() return } // RefreshFutureGoodData 刷新商品数据 func RefreshFutureGoodData(cont context.Context) (err error) { future_good.RefreshFutureGoodDataFromThs() fmt.Println("Refresh End") return } // RefreshTradeData 刷新交易所数据 func RefreshTradeData(cont context.Context) (err error) { wg := sync.WaitGroup{} wg.Add(6) //郑商所 go data.RefreshBaseDataFromSource(&wg, utils.DATA_SOURCE_ZZ) //上期所 go data.RefreshBaseDataFromSource(&wg, utils.DATA_SOURCE_SH) //上期能源 go data.RefreshBaseDataFromSource(&wg, utils.DATA_SOURCE_SHFE) //中金所 go data.RefreshBaseDataFromSource(&wg, utils.DATA_SOURCE_CFFEX) //大商所 go data.RefreshBaseDataFromSource(&wg, utils.DATA_SOURCE_DL) //广期所数据 go data.RefreshBaseDataFromSource(&wg, utils.DATA_SOURCE_GFEX) wg.Wait() //计算指标 data.RefreshDataFromCalculateAll() // 刷新所有的基础预测指标 data.RefreshBasePredictDataAll() // 计算预测指标 data.RefreshPredictDataFromCalculateAll() return } // RefreshEicData 刷新欧洲天然气数据 func RefreshEicData(cont context.Context) (err error) { wg := sync.WaitGroup{} wg.Add(1) //欧洲天然气 go data.RefreshDataFromEic(&wg) wg.Wait() ////计算指标 //data.RefreshDataFromCalculateAll() // //// 刷新所有的基础预测指标 //data.RefreshBasePredictDataAll() // //// 计算预测指标 //data.RefreshPredictDataFromCalculateAll() return } // 检测数据服务 func checkDataServer(cont context.Context) (err error) { //检测wind新服务器 go data.CheckWindDataInterface(cont) //检测同花顺数据服务器 //go data.CheckThsDataInterface(cont) //检测路透数据服务器 go data.CheckLtDataInterface(cont) //检测彭博 go data.CheckPbDataInterface(cont) return } func TaskInit() { fmt.Println("TaskInit start") //data.InitGuangzhouPositionTask() fmt.Println("TaskInit end") return }