|
@@ -0,0 +1,126 @@
|
|
|
+package data
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "hongze/hongze_task/models/data_manage"
|
|
|
+ "hongze/hongze_task/utils"
|
|
|
+ "strings"
|
|
|
+)
|
|
|
+
|
|
|
+// AppendDataToEdbTable 追加数据到ETA表格
|
|
|
+func AppendDataToEdbTable(cont context.Context) (err error) {
|
|
|
+ errMsgList := make([]string, 0)
|
|
|
+ defer func() {
|
|
|
+ if len(errMsgList) > 0 {
|
|
|
+ go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", "追加数据到ETA表格; ErrMsg:"+strings.Join(errMsgList, "\n"), utils.EmailSendToUsers)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ tableInfoList, err := data_manage.GetAllTableInfoList()
|
|
|
+ if err != nil {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprint("获取ETA表格列表失败,Err:", err.Error()))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, tableInfo := range tableInfoList {
|
|
|
+ edbMappingList, tmpErr := data_manage.GetAllTableEdbMappingItemListByTableInfoId(tableInfo.TableInfoId)
|
|
|
+ if tmpErr != nil {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "获取关联关系失败,Err:", tmpErr.Error()))
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ dataMap := make(map[string]map[string]float64)
|
|
|
+ dateList := make([]string, 0)
|
|
|
+ mappingMap := make(map[int]string) //指标map的最后一次更新时间
|
|
|
+ for _, v := range edbMappingList {
|
|
|
+ //如果最近更新eta表格数据的时间早于最新数据时间,那么需要查询出该数据并插入到eta表格中
|
|
|
+ if v.EdbDataEndDate.After(v.EndDate) {
|
|
|
+ dataList, tmpErr := data_manage.GetEdbDataListAll(" and edb_info_id=? and data_time > ?", []interface{}{v.EdbInfoId, v.EndDate.Format(utils.FormatDate)}, v.Source, 1)
|
|
|
+ if tmpErr != nil {
|
|
|
+ errMsg := fmt.Sprint(v.TableInfoId, "获取指标数据失败,Err:", tmpErr.Error())
|
|
|
+ errMsgList = append(errMsgList, errMsg)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if len(dataList) <= 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for _, data := range dataList {
|
|
|
+ //指标map的最后一次更新时间
|
|
|
+ mappingMap[v.TableEdbMappingId] = data.DataTime
|
|
|
+
|
|
|
+ tmpDataMap, ok := dataMap[data.DataTime]
|
|
|
+ if !ok {
|
|
|
+ tmpDataMap = make(map[string]float64)
|
|
|
+ dateList = append(dateList, data.DataTime)
|
|
|
+ }
|
|
|
+ key := fmt.Sprint("data_col_", v.EdbDataColIndex)
|
|
|
+ tmpDataMap[key] = data.Value
|
|
|
+ dataMap[data.DataTime] = tmpDataMap
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Println(dataMap)
|
|
|
+ fmt.Println(dateList)
|
|
|
+ for _, date := range dateList {
|
|
|
+ tmpErr = insertOrUpdateTableData(tableInfo.TableInfoId, date, dataMap[date])
|
|
|
+ if tmpErr != nil {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprint(tableInfo.TableInfoId, "插入更新", date, "数据失败,Err:", tmpErr.Error()))
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for tableEdbMappingId, date := range mappingMap {
|
|
|
+ tmpErr = updateTableEdbMappingEndDate(tableEdbMappingId, date)
|
|
|
+ if tmpErr != nil {
|
|
|
+ errMsgList = append(errMsgList, fmt.Sprint(tableEdbMappingId, "更新最后一次数据更新日期数据失败,Err:", tmpErr.Error()))
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// insertOrUpdateTableData 插入/更新ETA表格数据
|
|
|
+func insertOrUpdateTableData(tableInfoId int, date string, dataMap map[string]float64) (err error) {
|
|
|
+ item, err := data_manage.GetBetweenTableDataByTableInfoIdAndDate(tableInfoId, date)
|
|
|
+ if err != nil && err.Error() != utils.ErrNoRow() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err = nil
|
|
|
+ sql := ``
|
|
|
+ //如果找不到数据,那么是插入数据
|
|
|
+ if item == nil {
|
|
|
+ sort, tmpErr := data_manage.GetMaxSortByTableInfoId(tableInfoId)
|
|
|
+ if tmpErr != nil {
|
|
|
+ err = tmpErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ columnStr := ``
|
|
|
+ valueStr := ``
|
|
|
+ for columnValue, value := range dataMap {
|
|
|
+ columnStr += columnValue + ","
|
|
|
+ valueStr += fmt.Sprint(value, ",")
|
|
|
+ }
|
|
|
+ sort = sort + 1
|
|
|
+ sql = fmt.Sprintf("INSERT INTO `table_data`( `table_info_id`, `date`, %s `data_type`, `sort`, `modify_time`, `create_time`) VALUES ( %d, '%s', %s 1, %d, now(), now());", columnStr, tableInfoId, date, valueStr, sort)
|
|
|
+
|
|
|
+ } else {
|
|
|
+ updateStr := ``
|
|
|
+ for columnValue, value := range dataMap {
|
|
|
+ updateStr += fmt.Sprint(columnValue, "=", value, ",")
|
|
|
+ }
|
|
|
+ sql = fmt.Sprintf("UPDATE `table_data` SET %s `modify_time` = now() WHERE `table_data_id` = %d;", updateStr, item.TableDataId)
|
|
|
+ }
|
|
|
+ fmt.Println(sql)
|
|
|
+ err = data_manage.ExecTableDataSql(sql)
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func updateTableEdbMappingEndDate(tableEdbMappingId int, endDate string) (err error) {
|
|
|
+ sql := fmt.Sprintf("UPDATE `table_edb_mapping` SET `end_date` = '%s', `modify_time` = now() WHERE `table_edb_mapping_id` = %d;", endDate, tableEdbMappingId)
|
|
|
+ fmt.Println(sql)
|
|
|
+ err = data_manage.ExecTableDataSql(sql)
|
|
|
+ return
|
|
|
+}
|