123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package data
- import (
- "context"
- "fmt"
- "hongze/hongze_task/models/data_manage"
- "hongze/hongze_task/utils"
- "strings"
- )
- // AppendDataToEdbTable 追加数据到ETA表格
- func AppendDataToEdbTable(cont context.Context) (err error) {
- errMsgList := make([]string, 0)
- defer func() {
- if len(errMsgList) > 0 {
- go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", "追加数据到ETA表格; ErrMsg:"+strings.Join(errMsgList, "\n"), utils.EmailSendToUsers)
- }
- }()
- tableInfoList, err := data_manage.GetAllTableInfoList()
- if err != nil {
- errMsgList = append(errMsgList, fmt.Sprint("获取ETA表格列表失败,Err:", err.Error()))
- return
- }
- for _, tableInfo := range tableInfoList {
- edbMappingList, tmpErr := data_manage.GetAllTableEdbMappingItemListByTableInfoId(tableInfo.TableInfoId)
- if tmpErr != nil {
- errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "获取关联关系失败,Err:", tmpErr.Error()))
- continue
- }
- dataMap := make(map[string]map[string]float64)
- dateList := make([]string, 0)
- mappingMap := make(map[int]string) //指标map的最后一次更新时间
- for _, v := range edbMappingList {
- //如果最近更新eta表格数据的时间早于最新数据时间,那么需要查询出该数据并插入到eta表格中
- if v.EdbDataEndDate.After(v.EndDate) {
- dataList, tmpErr := data_manage.GetEdbDataListAll(" and edb_info_id=? and data_time > ?", []interface{}{v.EdbInfoId, v.EndDate.Format(utils.FormatDate)}, v.Source, 1)
- if tmpErr != nil {
- errMsg := fmt.Sprint(v.TableInfoId, "获取指标数据失败,Err:", tmpErr.Error())
- errMsgList = append(errMsgList, errMsg)
- continue
- }
- if len(dataList) <= 0 {
- continue
- }
- for _, data := range dataList {
- //指标map的最后一次更新时间
- mappingMap[v.TableEdbMappingId] = data.DataTime
- tmpDataMap, ok := dataMap[data.DataTime]
- if !ok {
- tmpDataMap = make(map[string]float64)
- dateList = append(dateList, data.DataTime)
- }
- key := fmt.Sprint("data_col_", v.EdbDataColIndex)
- tmpDataMap[key] = data.Value
- dataMap[data.DataTime] = tmpDataMap
- }
- }
- }
- fmt.Println(dataMap)
- fmt.Println(dateList)
- for _, date := range dateList {
- tmpErr = insertOrUpdateTableData(tableInfo.TableInfoId, date, dataMap[date])
- if tmpErr != nil {
- errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "插入更新", date, "数据失败,Err:", tmpErr.Error()))
- continue
- }
- }
- for tableEdbMappingId, date := range mappingMap {
- tmpErr = updateTableEdbMappingEndDate(tableEdbMappingId, date)
- if tmpErr != nil {
- errMsgList = append(errMsgList, fmt.Sprint(tableEdbMappingId, "更新最后一次数据更新日期数据失败,Err:", tmpErr.Error()))
- continue
- }
- }
- }
- return
- }
- // insertOrUpdateTableData 插入/更新ETA表格数据
- func insertOrUpdateTableData(tableInfoId int, date string, dataMap map[string]float64) (err error) {
- item, err := data_manage.GetBetweenTableDataByTableInfoIdAndDate(tableInfoId, date)
- if err != nil && err.Error() != utils.ErrNoRow() {
- return
- }
- err = nil
- sql := ``
- //如果找不到数据,那么是插入数据
- if item == nil {
- sort, tmpErr := data_manage.GetMaxSortByTableInfoId(tableInfoId)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- columnStr := ``
- valueStr := ``
- for columnValue, value := range dataMap {
- columnStr += columnValue + ","
- valueStr += fmt.Sprint(value, ",")
- }
- sort = sort + 1
- sql = fmt.Sprintf("INSERT INTO `table_data`( `table_info_id`, `date`, %s `data_type`, `sort`, `modify_time`, `create_time`) VALUES ( %d, '%s', %s 1, %d, now(), now());", columnStr, tableInfoId, date, valueStr, sort)
- } else {
- updateStr := ``
- for columnValue, value := range dataMap {
- updateStr += fmt.Sprint(columnValue, "=", value, ",")
- }
- sql = fmt.Sprintf("UPDATE `table_data` SET %s `modify_time` = now() WHERE `table_data_id` = %d;", updateStr, item.TableDataId)
- }
- fmt.Println(sql)
- err = data_manage.ExecTableDataSql(sql)
- return
- }
- func updateTableEdbMappingEndDate(tableEdbMappingId int, endDate string) (err error) {
- sql := fmt.Sprintf("UPDATE `table_edb_mapping` SET `end_date` = '%s', `modify_time` = now() WHERE `table_edb_mapping_id` = %d;", endDate, tableEdbMappingId)
- fmt.Println(sql)
- err = data_manage.ExecTableDataSql(sql)
- return
- }
|