package data import ( "context" "fmt" "hongze/hongze_task/models/data_manage" "hongze/hongze_task/utils" "strings" ) // AppendDataToEdbTable 追加数据到ETA表格 func AppendDataToEdbTable(cont context.Context) (err error) { errMsgList := make([]string, 0) defer func() { if len(errMsgList) > 0 { go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", "追加数据到ETA表格; ErrMsg:"+strings.Join(errMsgList, "\n"), utils.EmailSendToUsers) } }() tableInfoList, err := data_manage.GetAllTableInfoList() if err != nil { errMsgList = append(errMsgList, fmt.Sprint("获取ETA表格列表失败,Err:", err.Error())) return } for _, tableInfo := range tableInfoList { edbMappingList, tmpErr := data_manage.GetAllTableEdbMappingItemListByTableInfoId(tableInfo.TableInfoId) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "获取关联关系失败,Err:", tmpErr.Error())) continue } dataMap := make(map[string]map[string]float64) dateList := make([]string, 0) mappingMap := make(map[int]string) //指标map的最后一次更新时间 for _, v := range edbMappingList { //如果最近更新eta表格数据的时间早于最新数据时间,那么需要查询出该数据并插入到eta表格中 if v.EdbDataEndDate.After(v.EndDate) { dataList, tmpErr := data_manage.GetEdbDataListAll(" and edb_info_id=? and data_time > ?", []interface{}{v.EdbInfoId, v.EndDate.Format(utils.FormatDate)}, v.Source, 1) if tmpErr != nil { errMsg := fmt.Sprint(v.TableInfoId, "获取指标数据失败,Err:", tmpErr.Error()) errMsgList = append(errMsgList, errMsg) continue } if len(dataList) <= 0 { continue } for _, data := range dataList { //指标map的最后一次更新时间 mappingMap[v.TableEdbMappingId] = data.DataTime tmpDataMap, ok := dataMap[data.DataTime] if !ok { tmpDataMap = make(map[string]float64) dateList = append(dateList, data.DataTime) } key := fmt.Sprint("data_col_", v.EdbDataColIndex) tmpDataMap[key] = data.Value dataMap[data.DataTime] = tmpDataMap } } } fmt.Println(dataMap) fmt.Println(dateList) for _, date := range dateList { tmpErr = insertOrUpdateTableData(tableInfo.TableInfoId, date, dataMap[date]) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "插入更新", date, "数据失败,Err:", tmpErr.Error())) continue } } for tableEdbMappingId, date := range mappingMap { tmpErr = updateTableEdbMappingEndDate(tableEdbMappingId, date) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprint(tableEdbMappingId, "更新最后一次数据更新日期数据失败,Err:", tmpErr.Error())) continue } } } return } // insertOrUpdateTableData 插入/更新ETA表格数据 func insertOrUpdateTableData(tableInfoId int, date string, dataMap map[string]float64) (err error) { item, err := data_manage.GetBetweenTableDataByTableInfoIdAndDate(tableInfoId, date) if err != nil && err.Error() != utils.ErrNoRow() { return } err = nil sql := `` //如果找不到数据,那么是插入数据 if item == nil { sort, tmpErr := data_manage.GetMaxSortByTableInfoId(tableInfoId) if tmpErr != nil { err = tmpErr return } columnStr := `` valueStr := `` for columnValue, value := range dataMap { columnStr += columnValue + "," valueStr += fmt.Sprint(value, ",") } sort = sort + 1 sql = fmt.Sprintf("INSERT INTO `table_data`( `table_info_id`, `date`, %s `data_type`, `sort`, `modify_time`, `create_time`) VALUES ( %d, '%s', %s 1, %d, now(), now());", columnStr, tableInfoId, date, valueStr, sort) } else { updateStr := `` for columnValue, value := range dataMap { updateStr += fmt.Sprint(columnValue, "=", value, ",") } sql = fmt.Sprintf("UPDATE `table_data` SET %s `modify_time` = now() WHERE `table_data_id` = %d;", updateStr, item.TableDataId) } fmt.Println(sql) err = data_manage.ExecTableDataSql(sql) return } func updateTableEdbMappingEndDate(tableEdbMappingId int, endDate string) (err error) { sql := fmt.Sprintf("UPDATE `table_edb_mapping` SET `end_date` = '%s', `modify_time` = now() WHERE `table_edb_mapping_id` = %d;", endDate, tableEdbMappingId) fmt.Println(sql) err = data_manage.ExecTableDataSql(sql) return }