package watch import ( "encoding/json" "fmt" "hongze/mysteel_watch/global" "hongze/mysteel_watch/models" "hongze/mysteel_watch/models/index" "hongze/mysteel_watch/utils" "io/ioutil" "log" "net/http" "os" "path/filepath" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/xuri/excelize/v2" ) func ListenFolderNew() { fmt.Println("-----文件夹监听-------") watcher, err := fsnotify.NewWatcher() if err != nil { fmt.Println("fsnotify.NewWatcher err:" + err.Error()) log.Fatal(err) } defer watcher.Close() done2 := make(chan bool) go func() { for { select { case event, ok := <-watcher.Events: fmt.Println("event.Name", event.Name) fmt.Println(event.Op) if ok && event.Op == fsnotify.Create && !strings.Contains(event.Name, "tmp") && !strings.Contains(event.Name, ".TMP") && !strings.Contains(event.Name, "~") && (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) { WatchIndexFile(event.Name) } case err := <-watcher.Errors: fmt.Println("watcher.Errors:", err) log.Println("error:", err) case <-time.After(60 * time.Second): continue } } }() fmt.Println("watch dir:" + utils.IndexSaveDir) err = watcher.Add(utils.IndexSaveDir) if err != nil { fmt.Println("watcher.Add:" + err.Error()) log.Fatal(err) } <-done2 } // ListenFolderNewMerge 生产合并文件夹监听 func ListenFolderNewMerge() { fmt.Println("-----生产合并文件夹监听-------") watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() done2 := make(chan bool) go func() { for { select { case event, ok := <-watcher.Events: if ok && (event.Op == fsnotify.Create || event.Op == fsnotify.Write) && !strings.Contains(event.Name, "tmp") && !strings.Contains(event.Name, ".TMP") && !strings.Contains(event.Name, "~") && (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) { WatchIndexFileMergeRelease(event.Name) } case err := <-watcher.Errors: log.Println("error:", err) case <-time.After(60 * time.Second): continue } } }() err = watcher.Add(utils.IndexMsergeSaveDir) if err != nil { log.Fatal(err) } <-done2 } // WatchIndexFile 检测指标文件 func WatchIndexFile(filePath string) { fmt.Println("filePath:", filePath) //filePath:D:\mysteel_data\CM0000568866_release.xlsx time.Sleep(10 * time.Second) if !utils.FileIsExist(filePath) { fmt.Println("filePath is not exist:" + filePath) return } //读取文件内容 global.LOG.Info("WatchFile:" + filePath) f, err := excelize.OpenFile(filePath) global.LOG.Info("OpenFile:" + filePath) if err != nil { fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error()) return } var newFilePath string defer func() { if err := f.Close(); err != nil { fmt.Println("FileClose Err:" + err.Error()) return } //重命名文件 if filePath != newFilePath { err := os.Rename(filePath, newFilePath) if err != nil { fmt.Println("os.Rename Err:" + err.Error()) } } }() var runMode string if strings.Contains(filePath, "debug") { runMode = "debug" } else { runMode = "release" } dir, fp := filepath.Split(filePath) var wg = sync.WaitGroup{} wg.Add(1) go func() { sheetList := f.GetSheetList() for _, sv := range sheetList { var indexName, indexCode, unit, source, frequency, startDate, endDate, describe, updateDate string var indexId int64 rows, err := f.GetRows(sv) if err != nil { fmt.Println("f.GetRows:err:" + err.Error()) return } indexObj := new(index.BaseFromMysteelChemicalIndex) dataList := make([]index.BaseFromMysteelChemicalData, 0) dataMap := make(map[string]string) for rk, row := range rows { if rk > 0 { if rk < 10 { for ck, colCell := range row { if ck == 1 { if rk == 1 { indexName = colCell } if rk == 2 { unit = colCell } if rk == 3 { source = colCell } if rk == 4 { indexCode = colCell } if rk == 5 { frequency = colCell if !strings.Contains(frequency, "度") { frequency = frequency + "度" } } if rk == 6 { dateArr := strings.Split(colCell, "~") if len(dateArr) >= 2 { startDate = dateArr[0] endDate = dateArr[1] } } if rk == 7 { describe = colCell } if rk == 9 { updateDate = colCell } } } if rk == 9 { if indexName == "" { global.LOG.Info("未刷新到指标数据:filePath:" + filePath) break } //判断指标是否存在 var isAdd int item, err := indexObj.GetIndexItem(runMode, indexCode) if err != nil { if err.Error() == "record not found" { isAdd = 1 } else { isAdd = -1 fmt.Println("GetIndexItem Err:" + err.Error()) return } } if item != nil && item.BaseFromMysteelChemicalIndexId > 0 { fmt.Println("item:", item) isAdd = 2 } else { isAdd = 1 } fmt.Println("isAdd:", isAdd) if !strings.Contains(frequency, "度") { frequency = frequency + "度" } var frequencyStr string if strings.Contains(frequency, "日") { frequencyStr = "day" } else if strings.Contains(frequency, "周") { frequencyStr = "week" } else if strings.Contains(frequency, "月") || strings.Contains(frequency, "旬") { frequencyStr = "month" } else if strings.Contains(frequency, "年") { frequencyStr = "year" } frequencyStr = "_" + frequencyStr if !strings.Contains(filePath, frequencyStr) { fpArr := strings.Split(fp, "_") for k, v := range fpArr { if k == 0 { newFilePath = v + frequencyStr } else { newFilePath = newFilePath + "_" + v } } newFilePath = dir + newFilePath } else { newFilePath = filePath } if isAdd == 1 { indexObj.IndexCode = indexCode indexObj.IndexName = indexName indexObj.Unit = unit indexObj.Source = source indexObj.Describe = describe indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local) indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local) indexObj.Frequency = frequency indexObj.FilePath = newFilePath err = indexObj.Add(runMode) if err != nil { fmt.Println("add err:" + err.Error()) return } indexId = indexObj.BaseFromMysteelChemicalIndexId } else if isAdd == 2 { indexObj.IndexCode = indexCode indexObj.IndexName = indexName indexObj.Unit = unit indexObj.Source = source indexObj.Describe = describe indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local) indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local) indexObj.Frequency = frequency indexObj.FilePath = newFilePath indexObj.ModifyTime = time.Now() indexId = item.BaseFromMysteelChemicalIndexId //修改数据 updateColsArr := make([]string, 0) updateColsArr = append(updateColsArr, "index_name") updateColsArr = append(updateColsArr, "unit") updateColsArr = append(updateColsArr, "source") updateColsArr = append(updateColsArr, "frequency") updateColsArr = append(updateColsArr, "start_date") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "describe") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "modify_time") updateColsArr = append(updateColsArr, "file_path") indexObj.Update(runMode, updateColsArr) dataObj := new(index.BaseFromMysteelChemicalData) //获取已存在的所有数据 dataList, err := dataObj.GetIndexDataList(runMode, indexCode) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return } fmt.Println("dataListLen:", len(dataList)) for _, v := range dataList { dateStr := v.DataTime.Format(utils.FormatDate) dataMap[dateStr] = v.Value } } } } else { var date, value string for ck, colCell := range row { if ck == 0 { date = colCell } else { value = colCell } } if _, ok := dataMap[date]; !ok { dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local) if err != nil { fmt.Println("time.ParseInLocation Err:" + err.Error()) return } if !strings.Contains(value, "#N/A") { dataItem := new(index.BaseFromMysteelChemicalData) dataItem.BaseFromMysteelChemicalIndexId = indexId dataItem.IndexCode = indexCode dataItem.DataTime = dateTime dataItem.Value = value dataItem.UpdateDate = updateDate dataItem.CreateTime = time.Now() dataItem.ModifyTime = time.Now() dataList = append(dataList, *dataItem) } } } } } if len(dataList) > 0 { dataObj := new(index.BaseFromMysteelChemicalData) err = dataObj.Add(runMode, dataList) if err != nil { fmt.Println("dataObj.Add() Err:" + err.Error()) } } } wg.Done() }() wg.Wait() } // WatchIndexFileMergeRelease 监听生产的合并excel文件的处理 func WatchIndexFileMergeRelease(filePath string) { fmt.Println("filePath:", filePath) //return //filePath:D:\mysteel_data\CM0000568866_release.xlsx time.Sleep(5 * time.Second) if !utils.FileIsExist(filePath) { fmt.Println("filePath is not exist:" + filePath) return } //读取文件内容 global.LOG.Info("WatchFile:" + filePath) f, err := excelize.OpenFile(filePath) global.LOG.Info("OpenFile:" + filePath) if err != nil { fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error()) return } defer func() { if err := f.Close(); err != nil { fmt.Println("FileClose Err:" + err.Error()) return } }() runMode := "release" var wg = sync.WaitGroup{} wg.Add(1) go func() { sheetList := f.GetSheetList() for _, sv := range sheetList { lenRow := 0 //指标数 // excel表的指标数据 indexExcelDataList := make([]map[string]string, 0) indexNameMap := make(map[int]string) indexCodeMap := make(map[int]string) unitMap := make(map[int]string) sourceMap := make(map[int]string) frequencyMap := make(map[int]string) startDateMap := make(map[int]string) endDateMap := make(map[int]string) describeMap := make(map[int]string) updateDateMap := make(map[int]string) rows, err := f.GetRows(sv) if err != nil { fmt.Println("GetRows Err:", err) return } for row, cols := range rows { if row == 0 { // 第一行是 钢联数据的备注 continue } // 指标名称 if row == 1 { lenRow = len(cols) - 1 for i := 1; i <= lenRow; i++ { tmpIndexExcelDataList := make(map[string]string, 0) indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList) } } if row < 10 { for k, colCell := range cols { switch row { case 1: //指标名称 indexNameMap[k-1] = colCell case 2: //单位 unitMap[k-1] = colCell case 3: //数据来源 sourceMap[k-1] = colCell case 4: //指标编码 indexCodeMap[k-1] = colCell case 5: //频度 tmpFrequency := colCell if !strings.Contains(tmpFrequency, "度") { tmpFrequency = tmpFrequency + "度" } frequencyMap[k-1] = tmpFrequency case 6: //时间区间 dateArr := strings.Split(colCell, "~") if len(dateArr) >= 2 { startDateMap[k-1] = dateArr[0] endDateMap[k-1] = dateArr[1] } case 7: //备注 describeMap[k-1] = colCell case 9: updateDateMap[k-1] = colCell } } } else { date := `` for k, col := range cols { if k == 0 { date = col continue } if date == `` { continue } if col != `` { indexExcelDataList[k-1][date] = col } } } } for k, excelDataMap := range indexExcelDataList { mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap) } } wg.Done() }() wg.Wait() } // mysteelIndexHandle 钢联数据处理 func mysteelIndexHandle(runMode, indexName, indexCode, unit, source, frequency, startDate, endDate, describe, updateDate string, excelDataMap map[string]string) { var err error //return indexObj := new(index.BaseFromMysteelChemicalIndex) var indexId int64 addDataList := make([]index.BaseFromMysteelChemicalData, 0) exitDataMap := make(map[string]*index.BaseFromMysteelChemicalData) // 修改指标信息 if indexName == "" { global.LOG.Info("未刷新到指标数据:indexName:" + indexName) return } //判断指标是否存在 var isAdd int item, err := indexObj.GetIndexItem(runMode, indexCode) if err != nil { if err.Error() == "record not found" { isAdd = 1 } else { isAdd = -1 fmt.Println("GetIndexItem Err:" + err.Error()) return } } if item != nil && item.BaseFromMysteelChemicalIndexId > 0 { fmt.Println("item:", item) isAdd = 2 } else { isAdd = 1 } fmt.Println("isAdd:", isAdd) if !strings.Contains(frequency, "度") { frequency = frequency + "度" } if isAdd == 1 { indexObj.IndexCode = indexCode indexObj.IndexName = indexName indexObj.Unit = unit indexObj.Source = source indexObj.Describe = describe indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local) indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local) indexObj.Frequency = frequency err = indexObj.Add(runMode) if err != nil { fmt.Println("add err:" + err.Error()) return } indexId = indexObj.BaseFromMysteelChemicalIndexId } else if isAdd == 2 { indexObj.IndexCode = indexCode indexObj.IndexName = indexName indexObj.Unit = unit indexObj.Source = source indexObj.Describe = describe indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local) indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local) indexObj.Frequency = frequency indexObj.ModifyTime = time.Now() indexId = item.BaseFromMysteelChemicalIndexId //修改数据 updateColsArr := make([]string, 0) updateColsArr = append(updateColsArr, "index_name") updateColsArr = append(updateColsArr, "unit") updateColsArr = append(updateColsArr, "source") updateColsArr = append(updateColsArr, "frequency") updateColsArr = append(updateColsArr, "start_date") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "describe") updateColsArr = append(updateColsArr, "end_date") updateColsArr = append(updateColsArr, "modify_time") indexObj.Update(runMode, updateColsArr) dataObj := new(index.BaseFromMysteelChemicalData) //获取已存在的所有数据 exitDataList, err := dataObj.GetIndexDataList(runMode, indexCode) if err != nil { fmt.Println("GetIndexDataList Err:" + err.Error()) return } fmt.Println("exitDataListLen:", len(exitDataList)) for _, v := range exitDataList { dateStr := v.DataTime.Format(utils.FormatDate) exitDataMap[dateStr] = v } } dataObj := new(index.BaseFromMysteelChemicalData) // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 for date, value := range excelDataMap { if findData, ok := exitDataMap[date]; !ok { dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local) if err != nil { fmt.Println("time.ParseInLocation Err:" + err.Error()) return } if !strings.Contains(value, "#N/A") { dataItem := new(index.BaseFromMysteelChemicalData) dataItem.BaseFromMysteelChemicalIndexId = indexId dataItem.IndexCode = indexCode dataItem.DataTime = dateTime dataItem.Value = value dataItem.UpdateDate = updateDate dataItem.CreateTime = time.Now() dataItem.ModifyTime = time.Now() addDataList = append(addDataList, *dataItem) } } else { if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据 dataObj.Value = value dataObj.ModifyTime = time.Now() dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId updateDataColsArr := make([]string, 0) updateDataColsArr = append(updateDataColsArr, "value") updateDataColsArr = append(updateDataColsArr, "modify_time") dataObj.Update(runMode, updateDataColsArr) global.LOG.Info(findData.IndexCode + " " + findData.Value + "-" + value) } } } if len(addDataList) > 0 { err = dataObj.Add(runMode, addDataList) if err != nil { fmt.Println("dataObj.Add() Err:" + err.Error()) } } go syncEdbDataMysteelChemical(runMode, indexCode) } func syncEdbDataMysteelChemical(runMode, indexCode string) { indexObj := new(models.EdbInfo) var isAdd int item, err := indexObj.GetEdbInfoItem(runMode, indexCode) if err != nil { if err.Error() == "record not found" { isAdd = 1 } else { isAdd = -1 fmt.Println("GetEdbInfoItem Err:" + err.Error()) return } } if item != nil && item.EdbInfoId > 0 { fmt.Println("item:", item) isAdd = 2 } else { isAdd = 1 // } if isAdd == 1 { //新增 return } param := make(map[string]interface{}) param["EdbCode"] = indexCode param["EdbInfoId"] = item.EdbInfoId param["StartDate"] = item.EndDate postRefreshEdbData(param) } type BaseResponse struct { Ret int Msg string ErrMsg string ErrCode string Data interface{} Success bool `description:"true 执行成功,false 执行失败"` IsSendEmail bool `json:"-" description:"true 发送邮件,false 不发送邮件"` IsAddLog bool `json:"-" description:"true 新增操作日志,false 不新增操作日志" ` } // postRefreshEdbData 刷新指标数据 func postRefreshEdbData(param map[string]interface{}) (resp *BaseResponse, err error) { urlStr := "mysteel_chemical/refresh" EDB_LIB_URL := "http://47.102.213.75:8300/edbapi/" postUrl := EDB_LIB_URL + urlStr postData, err := json.Marshal(param) if err != nil { return } result, err := HttpPost(postUrl, string(postData), "application/json") if err != nil { return } global.LOG.Info(" Refresh Result: " + string(result)) err = json.Unmarshal(result, &resp) if err != nil { return } return resp, nil } func HttpPost(url, postData string, params ...string) ([]byte, error) { body := ioutil.NopCloser(strings.NewReader(postData)) client := &http.Client{} req, err := http.NewRequest("POST", url, body) if err != nil { return nil, err } contentType := "application/x-www-form-urlencoded;charset=utf-8" if len(params) > 0 && params[0] != "" { contentType = params[0] } req.Header.Set("Content-Type", contentType) req.Header.Set("authorization", utils.MD5(utils.APP_EDB_LIB_NAME_EN+utils.EDB_LIB_Md5_KEY)) resp, err := client.Do(req) defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) fmt.Println("HttpPost:" + string(b)) return b, err } /* CREATE动作即临时文件的创建 WRITE写文件动作 CHMOD修改文件属性 REMOVE删除临时文件。 */