package eta_bridge import ( "context" "encoding/json" "eta_gn/eta_task/models/data_manage" "eta_gn/eta_task/services/alarm_msg" "eta_gn/eta_task/services/data" "eta_gn/eta_task/utils" "fmt" "github.com/rdlucklib/rdluck_tools/paging" "io/ioutil" "net/url" "strconv" "strings" "sync" "time" ) // 同步指标信息锁 var lockSyncGnIndex sync.Mutex const GnEdbListUri = `/index_data/gn/edb/list` // 国能指标列表接口 // CurrLevelParentClassifyMap 当前层级分类map var CurrLevelParentClassifyMap map[int64]map[int64]map[string]CurrClassify // CurrEdbInfoMap 当前库里已有的指标map var CurrEdbInfoMap map[string]*data_manage.EdbInfo type CurrClassify struct { ClassifyId int64 ParentId int64 ClassifyName string } // SyncGnIndex // @Description: 定时同步指标信息 // @author: Roc // @datetime 2024-03-07 17:39:34 // @param cont context.Context // @return err error func SyncGnIndex(cont context.Context) (err error) { fmt.Println("准备同步指标") lockSyncGnIndex.Lock() errMsgList := make([]string, 0) fmt.Println("开始同步指标") defer func() { if err != nil { tips := "SyncGnIndex-定时同步国能的指标信息到ETA失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } if len(errMsgList) > 0 { tips := "SyncGnIndex-定时同步国能的指标信息到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n") utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } lockSyncGnIndex.Unlock() }() initCurrEdbInfoMap() initCurrLevelParentClassifyMap() var lastUpdateTimeStr string // 上一次更新的时间 err, errMsgList = syncGnIndex(1, utils.SyncCrmIndexNum, lastUpdateTimeStr) // 下面的用完整体注释掉就行 //indexIds, e := LoadGnTempIndexIds() //if e != nil { // err = fmt.Errorf("读取IDs配置文件失败, %v", e) // return //} //if len(indexIds) == 0 { // fmt.Println("IndexIds为空") // return //} //for _, v := range indexIds { // strId := strconv.Itoa(v) // fmt.Printf("正在同步IndexId: %s\n", strId) // err, errMsgList = syncGnIndexV2(1, 1, lastUpdateTimeStr, strId) // if err != nil { // errMsg := strings.Join(errMsgList, "\n") // fmt.Printf("IndexId: %s, 同步失败, ErrMsg: %s", strId, errMsg) // continue // } //} return } // initCurrLevelParentClassifyMap // @Description: 初始化当前层级分类map func initCurrLevelParentClassifyMap() { var condition string var pars []interface{} // 普通指标分类 condition = " AND classify_type = ? " pars = append(pars, 0) classifyList, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars) if err != nil { utils.FileLog.Error("获取分类列表数据失败:" + err.Error()) return } // 清空缓存 CurrLevelParentClassifyMap = make(map[int64]map[int64]map[string]CurrClassify) for _, v := range classifyList { currParentClassifyMap, ok := CurrLevelParentClassifyMap[v.Level] if !ok { currParentClassifyMap = make(map[int64]map[string]CurrClassify) } currClassifyMap, ok := currParentClassifyMap[v.ParentID] if !ok { currClassifyMap = make(map[string]CurrClassify) } classifyName := strings.TrimSpace(v.ClassifyName) currClassifyMap[classifyName] = CurrClassify{ ClassifyId: v.ClassifyID, ParentId: v.ParentID, ClassifyName: classifyName, } currParentClassifyMap[v.ParentID] = currClassifyMap CurrLevelParentClassifyMap[v.Level] = currParentClassifyMap } } // initCurrEdbInfoMap // @Description: 初始化当前指标map func initCurrEdbInfoMap() { // 获取指标列表 edbInfoList, err := data_manage.GetAllBaseEdbInfo() if err != nil { utils.FileLog.Error("获取指标列表数据失败:" + err.Error()) return } // 清空缓存 CurrEdbInfoMap = make(map[string]*data_manage.EdbInfo) for _, v := range edbInfoList { CurrEdbInfoMap[v.OriginalEdbCode] = v } } // EtaBridgeGnIndexListResp // @Description: 指标列表返回数据 type EtaBridgeGnIndexListResp struct { Code int `json:"code" description:"状态码"` Msg string `json:"msg" description:"提示信息"` Data IndexListResp `json:"data" description:"返回数据"` } // IndexListResp // @Description: 指标列表数据 type IndexListResp struct { Page paging.PagingItem `description:"分页数据"` List []IndexInfo } // IndexInfo // @Description: 指标信息 type IndexInfo struct { ClassifyNameOne string `description:"一级目录"` ClassifyNameTwo string `description:"二级目录"` ClassifyNameThree string `description:"三级目录"` DataIndexCode string `description:"数据节点指标编码"` SourceEdbCode string `description:"数据源指标原始编码"` EdbName string `description:"指标名称"` Frequency string `description:"频度"` Unit string `description:"单位"` SourceName string `description:"来源"` SourceCode string `description:"来源编码"` } // BridgeGnIndexParams // @Description: 桥接服务-获取国能指标数据入参 type BridgeGnIndexParams struct { LastModifyTime string `json:"last_modify_time" description:"最近一次更新时间"` PageIndex int `json:"page_index" description:"当前页码"` PageSize int `json:"page_size" description:"每页数量"` IndexId string `json:"index_id" description:"指标ID,不为空时表示只取该指标"` } // syncCrmIndex // @Description: 开始同步CRM指标信息 // @author: Roc // @datetime 2024-05-17 15:55:11 // @param assetPkgCd string // @param currIndex int // @param pageSize int // @param lastUpdateTimeStr string // @return err error // @return errMsgList []string func syncGnIndex(currIndex, pageSize int, baseLastUpdateTimeStr string) (err error, errMsgList []string) { fmt.Println("开始第", currIndex, "页的更新") errMsgList = make([]string, 0) lastUpdateTimeStr := baseLastUpdateTimeStr if lastUpdateTimeStr != `` { lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr) } params := BridgeGnIndexParams{ LastModifyTime: lastUpdateTimeStr, PageIndex: currIndex, PageSize: pageSize, } bResult, err, _ := HttpEtaBridgePost(utils.SyncIndexPath, params) if err != nil { return } var result EtaBridgeGnIndexListResp err = json.Unmarshal(bResult, &result) if err != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult)) utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult)) return } //totalPage := result.Data.Paging.Pages // 处理指标信息 for _, v := range result.Data.List { tmpErr := handleIndex(v) if tmpErr != nil { errMsgList = append(errMsgList, tmpErr.Error()) } } fmt.Println(currIndex, "是否已结束:", result.Data.Page.IsEnd) // 如果还有下一页,那么就继续请求下一页 if !result.Data.Page.IsEnd { _, tmpErrMsgList := syncGnIndex(currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr) errMsgList = append(errMsgList, tmpErrMsgList...) } return } // handleIndex // @Description: 指标处理 // @param index // @return err func handleIndex(index IndexInfo) (err error) { // 处理分类(如果不存在就创建) oneClassifyId, twoClassifyId, thirdClassifyId, err := handleClassify(index) if err != nil { return } classifyId := thirdClassifyId if classifyId <= 0 { classifyId = twoClassifyId } if classifyId <= 0 { classifyId = oneClassifyId } // 处理指标(如果不存在就创建) err = handleEdbInfo(index, classifyId) return } // handleClassify // @Description: 分类处理 // @param index // @return firstClassifyId // @return secondClassifyId // @return thirdClassifyId // @return err func handleClassify(index IndexInfo) (firstClassifyId, secondClassifyId, thirdClassifyId int64, err error) { firstClassifyName := getClassifyName(index.ClassifyNameOne) secondClassifyName := getClassifyName(index.ClassifyNameTwo) thirdClassifyName := getClassifyName(index.ClassifyNameThree) var oneLevel, twoLevel, threeLevel int64 oneLevel = 1 twoLevel = 2 threeLevel = 3 // 一级分类 { var parentId int64 parentId = 0 classifyName := firstClassifyName level := oneLevel // 获取层级下的父级分类map currParentClassifyMap, ok := CurrLevelParentClassifyMap[level] if !ok { currParentClassifyMap = make(map[int64]map[string]CurrClassify) } // 获取父级id下的分类列表 currClassifyListMap, ok := currParentClassifyMap[parentId] if !ok { currClassifyListMap = make(map[string]CurrClassify) } // 根据分类名称获取分类 currClassify, ok := currClassifyListMap[classifyName] if !ok { timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) classifyInfo := &data_manage.EdbClassify{ //ClassifyId: 0, ClassifyType: 0, ClassifyName: classifyName, ClassifyNameEn: classifyName, ParentID: parentId, RootID: 0, HasData: 0, CreateTime: time.Now(), ModifyTime: time.Now(), SysUserID: 0, SysUserRealName: "", Level: level, UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)), Sort: 0, } err = data_manage.AddEdbClassify(classifyInfo) if err != nil { return } classifyInfo.RootID = classifyInfo.ClassifyID err = classifyInfo.Update([]string{"root_id"}) if err != nil { return } currClassify = CurrClassify{ ClassifyId: classifyInfo.ClassifyID, ParentId: classifyInfo.ParentID, ClassifyName: classifyInfo.ClassifyName, } currClassifyListMap[classifyName] = currClassify currParentClassifyMap[parentId] = currClassifyListMap CurrLevelParentClassifyMap[level] = currParentClassifyMap } firstClassifyId = currClassify.ClassifyId } // 二级分类 { parentId := firstClassifyId classifyName := secondClassifyName level := twoLevel if secondClassifyName == `` { return } // 获取层级下的父级分类map currParentClassifyMap, ok := CurrLevelParentClassifyMap[level] if !ok { currParentClassifyMap = make(map[int64]map[string]CurrClassify) } // 获取父级id下的分类列表 currClassifyListMap, ok := currParentClassifyMap[parentId] if !ok { currClassifyListMap = make(map[string]CurrClassify) } // 根据分类名称获取分类 currClassify, ok := currClassifyListMap[classifyName] if !ok { timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) classifyInfo := &data_manage.EdbClassify{ //ClassifyId: 0, ClassifyType: 0, ClassifyName: classifyName, ClassifyNameEn: classifyName, ParentID: parentId, RootID: firstClassifyId, HasData: 0, CreateTime: time.Now(), ModifyTime: time.Now(), SysUserID: 0, SysUserRealName: "", Level: level, UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)), Sort: 0, } err = data_manage.AddEdbClassify(classifyInfo) if err != nil { return } currClassify = CurrClassify{ ClassifyId: classifyInfo.ClassifyID, ParentId: classifyInfo.ParentID, ClassifyName: classifyInfo.ClassifyName, } currClassifyListMap[classifyName] = currClassify currParentClassifyMap[parentId] = currClassifyListMap CurrLevelParentClassifyMap[level] = currParentClassifyMap } secondClassifyId = currClassify.ClassifyId } // 三级分类 { parentId := secondClassifyId classifyName := thirdClassifyName level := threeLevel if thirdClassifyName == `` { return } // 获取层级下的父级分类map currParentClassifyMap, ok := CurrLevelParentClassifyMap[level] if !ok { currParentClassifyMap = make(map[int64]map[string]CurrClassify) } // 获取父级id下的分类列表 currClassifyListMap, ok := currParentClassifyMap[parentId] if !ok { currClassifyListMap = make(map[string]CurrClassify) } // 根据分类名称获取分类 currClassify, ok := currClassifyListMap[classifyName] if !ok { timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) classifyInfo := &data_manage.EdbClassify{ //ClassifyId: 0, ClassifyType: 0, ClassifyName: classifyName, ClassifyNameEn: classifyName, ParentID: parentId, RootID: firstClassifyId, HasData: 1, CreateTime: time.Now(), ModifyTime: time.Now(), SysUserID: 0, SysUserRealName: "", Level: level, UniqueCode: utils.MD5(fmt.Sprint(parentId, "_", utils.DATA_PREFIX+"_"+timestamp)), Sort: 0, } err = data_manage.AddEdbClassify(classifyInfo) if err != nil { return } currClassify = CurrClassify{ ClassifyId: classifyInfo.ClassifyID, ParentId: classifyInfo.ParentID, ClassifyName: classifyInfo.ClassifyName, } currClassifyListMap[classifyName] = currClassify currParentClassifyMap[parentId] = currClassifyListMap CurrLevelParentClassifyMap[level] = currParentClassifyMap } thirdClassifyId = currClassify.ClassifyId } return } func getClassifyName(classifyName string) string { classifyName = strings.TrimSpace(classifyName) if classifyName == `` { return classifyName } // 如果不是“其它指标”,那么就是处理掉带有序号的、 if classifyName != `其它指标` { classifyNameList := strings.Split(classifyName, `、`) if len(classifyNameList) > 0 { classifyName = classifyNameList[len(classifyNameList)-1] } } return classifyName } // handleEdbInfo // @Description: 处理指标 // @param index // @param thirdClassifyId // @return err func handleEdbInfo(index IndexInfo, thirdClassifyId int64) (err error) { // 过滤数据节点指标唯一编码为空的数据 if index.DataIndexCode == `` { return } // 过滤基础指标编码为空的数据 if index.SourceEdbCode == `` { return } edbInfo, ok := CurrEdbInfoMap[index.DataIndexCode] frequency := Frequency(strings.TrimSpace(index.Frequency)) unit := strings.TrimSpace(index.Unit) sourceName, sourceId, err := GetSource(strings.TrimSpace(index.SourceName), strings.TrimSpace(index.SourceCode)) if err != nil { return } if !ok { endDate := time.Date(1899, 1, 1, 0, 0, 0, 0, time.Local) timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) edbInfo = &data_manage.EdbInfo{ EdbInfoId: 0, EdbInfoType: utils.EDB_INFO_TYPE, SourceName: sourceName, Source: sourceId, EdbCode: index.SourceEdbCode, EdbName: index.EdbName, EdbNameEn: index.EdbName, EdbNameSource: index.EdbName, Frequency: frequency, Unit: unit, UnitEn: unit, StartDate: endDate, EndDate: endDate, ClassifyId: int(thirdClassifyId), SysUserId: 0, SysUserRealName: "", UniqueCode: utils.MD5(fmt.Sprint(index.SourceEdbCode, "_", utils.DATA_PREFIX+"_"+timestamp)), CreateTime: time.Now(), ModifyTime: time.Now(), BaseModifyTime: time.Now(), MinValue: 0, MaxValue: 0, CalculateFormula: "", EdbType: utils.EdbTypeBase, Sort: 0, LatestDate: "", LatestValue: 0, EndValue: 0, MoveType: 0, MoveFrequency: "", NoUpdate: 0, ServerUrl: "", ChartImage: "", // 缩略图 Calendar: "", DataDateType: "", ManualSave: 0, EmptyType: 0, MaxEmptyType: 0, TerminalCode: "", DataUpdateTime: "", ErDataUpdateDate: "", SourceIndexName: index.EdbName, SubSource: 0, SubSourceName: "", IndicatorCode: "", StockCode: "", Extra: "", IsJoinPermission: 0, OriginalEdbCode: index.DataIndexCode, } err = data_manage.AddEdbInfo(edbInfo) if err != nil { return } CurrEdbInfoMap[index.DataIndexCode] = edbInfo // TODO 刷新指标明细数据 fmt.Println(data.RefreshEdbData(edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EndDate.Format(utils.FormatDate))) return } updateCols := make([]string, 0) if edbInfo.EdbNameEn == edbInfo.EdbName && edbInfo.EdbName != index.EdbName { edbInfo.EdbNameEn = index.EdbName updateCols = append(updateCols, "edb_name_en") } if edbInfo.EdbName != index.EdbName { edbInfo.EdbName = index.EdbName updateCols = append(updateCols, "edb_name") } if edbInfo.Frequency != index.Frequency { edbInfo.Frequency = index.Frequency updateCols = append(updateCols, "frequency") } if edbInfo.UnitEn == edbInfo.Unit && edbInfo.Unit != unit { edbInfo.UnitEn = unit updateCols = append(updateCols, "unit_en") } if edbInfo.Unit != unit { edbInfo.Unit = unit updateCols = append(updateCols, "unit") } if edbInfo.ClassifyId != int(thirdClassifyId) { edbInfo.ClassifyId = int(thirdClassifyId) updateCols = append(updateCols, "classify_id") } if len(updateCols) > 0 { err = edbInfo.Update(updateCols) } return } // Frequency // @Description: 获取频度 // @param unit // @return string func Frequency(unit string) string { switch unit { case "半月度": unit = `周度` case "不定期": unit = `日度` case `日度`, `周度`, `旬度`, `月度`, `季度`, `半年度`, `年度`: default: unit = `` } return unit } // GetSource // @Description: 获取来源 // @param sourceName // @return gnSourceName // @return sourceCode // @return source // @return err func GetSource(sourceName, sourceCode string) (gnSourceName string, source int, err error) { gnSourceName = sourceName var tableNameSuffix, indexNamePrefix string tableNamePrefix := "edb_data_gn_" switch sourceName { case "CCTD": tableNameSuffix = "cctd" case "mysteel": tableNameSuffix = "mysteel" case "wind": tableNameSuffix = "wind" case "卓创": tableNameSuffix = "sci" case "CCI": tableNameSuffix = "cci" //return default: // 来源编码前缀把还是 tableNameSuffix = strings.ToLower(sourceCode) //if strings.Contains(sourceName, "国能购销辅助决策系统") { // gnSourceName = `国能购销辅助决策系统` // tableNameSuffix = "purchase_sales" //} else if strings.Contains(sourceName, "国能市场分析平台") { // gnSourceName = `国能市场分析平台` // tableNameSuffix = "market_analysis" //} else { // // TODO 自动生成表名(暂时以时间作为表名后缀,如果客户提前告知了,那么可以直接先建data表,以及edb_source表写入新的source) // tableNameSuffix = fmt.Sprint(time.Now().Format(utils.FormatDateTimeUnSpace)) //} } sourceItem := data_manage.GetEdbSourceBySourceName(gnSourceName) // 如果找不到,说明是 if sourceItem == nil { utils.FileLog.Debug("%s表不存在,需要创建相关的表:%s", gnSourceName, tableNamePrefix+tableNameSuffix) indexNamePrefix = strings.ToUpper(tableNameSuffix) sourceItem = &data_manage.EdbSource{ EdbSourceId: 0, SourceName: gnSourceName, TableName: tableNamePrefix + tableNameSuffix, EdbAddMethod: "gn_index/add", EdbRefreshMethod: "gn_index/refresh", IsBase: 1, FromBridge: 1, BridgeFlag: "bridge_gn", SourceExtend: gnSourceName, EdbCodeRequired: 1, IndexTableName: "", SourceNameEn: gnSourceName, } err = data_manage.AddEdbSource(sourceItem, indexNamePrefix) if err != nil { return } } source = sourceItem.EdbSourceId return } func syncGnIndexV2(currIndex, pageSize int, baseLastUpdateTimeStr string, indexId string) (err error, errMsgList []string) { fmt.Println("开始第", currIndex, "页的更新") errMsgList = make([]string, 0) lastUpdateTimeStr := baseLastUpdateTimeStr if lastUpdateTimeStr != `` { lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr) } params := BridgeGnIndexParams{ LastModifyTime: lastUpdateTimeStr, PageIndex: currIndex, PageSize: pageSize, IndexId: indexId, } bResult, err, _ := HttpEtaBridgePost(utils.SyncIndexPath, params) if err != nil { return } var result EtaBridgeGnIndexListResp err = json.Unmarshal(bResult, &result) if err != nil { err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult)) utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult)) return } //totalPage := result.Data.Paging.Pages // 处理指标信息 for _, v := range result.Data.List { tmpErr := handleIndex(v) if tmpErr != nil { errMsgList = append(errMsgList, tmpErr.Error()) } } fmt.Println(currIndex, "是否已结束:", result.Data.Page.IsEnd) // 如果还有下一页,那么就继续请求下一页 //if !result.Data.Paging.IsEnd { // _, tmpErrMsgList := syncGnIndex(currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr) // errMsgList = append(errMsgList, tmpErrMsgList...) //} return } func LoadGnTempIndexIds() (indexIds []int, err error) { filePath := "./static/gn_index_ids.json" b, e := ioutil.ReadFile(filePath) if e != nil { err = fmt.Errorf("读取配置失败, err: %v", e) return } if e = json.Unmarshal(b, &indexIds); e != nil { err = fmt.Errorf("解析配置失败, err: %v", e) return } return } // 用户同步的锁 var lockSyncUser sync.Mutex func SyncGnUser(cont context.Context) (err error) { lockSyncUser.Lock() defer func() { if err != nil { tips := "SyncUser-定时将第三方的用户数据同步到ETA失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } lockSyncUser.Unlock() }() uri := "/gn/user/sync" fmt.Println("开始同步OA用户") _, err, _ = HttpEtaBridgeGet(uri) if err != nil { return } fmt.Println("结束同步OA用户") return } // InitRefreshEdb // @Description: 初始化明细数据指标 // @param cont // @return err func InitRefreshEdb(cont context.Context) (err error) { fmt.Println("准备更新指标明细数据") lockSyncGnIndex.Lock() errMsgList := make([]string, 0) fmt.Println("开始更新指标明细数据") defer func() { fmt.Println("初始化指标明细数据结束") if err != nil { tips := "SyncGnIndex-初始化指标明细数据结束到ETA失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) } if len(errMsgList) > 0 { tips := "SyncGnIndex-初始化指标明细数据结束到ETA失败, ErrMsg:\n" + strings.Join(errMsgList, "\n") utils.FileLog.Info(tips) } lockSyncGnIndex.Unlock() }() initCurrEdbInfoMap() count := len(CurrEdbInfoMap) for _, v := range CurrEdbInfoMap { count-- fmt.Println("剩余:", count, "条数据待初始化") // 未初始化明细数据的指标 //if !v.EndDate.IsZero() { // continue //} fmt.Println(data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, utils.BaseEdbRefreshStartDate)) } return } // PostOAReq // @Description: 请求oa待办参数 type PostOAReq struct { AppPersonId string `json:"appPersonId" form:"appPersonId" description:"⽤户⼯号"` AppPersonName string `json:"appPersonName" form:"appPersonName" description:"⽤户名"` AppTaskUrl string `json:"appTaskUrl" form:"appTaskUrl" description:"待办地址,跳转地址"` TaskName string `json:"taskName" form:"taskName" description:"待办标题"` StatusName string `json:"statusName" form:"statusName" description:"待办节点名称"` Status int `json:"status" form:"status" description:"待办状态:1-插⼊,2-更新,3-删除"` AppTaskId string `json:"appTaskId" form:"appTaskId" description:"待办任务ID:插⼊时不需要传参,由接⼝⽣成唯⼀ID并返回,需要业务系统保存此ID,⽤于后续的更新和删除操作"` } // PostOAResp 请求OA响应 type PostOAResp struct { Code int `json:"code" description:"状态码"` Msg string `json:"msg" description:"提示信息"` Data string `json:"data" description:"待办任务ID"` }