package watch import ( "fmt" "hongze/mysteel_watch/global" "hongze/mysteel_watch/utils" "log" "os" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/xuri/excelize/v2" ) func ListenFolderNew() { fmt.Println("-----文件夹监听-------") watcher, err := fsnotify.NewWatcher() if err != nil { fmt.Println("fsnotify.NewWatcher err:" + err.Error()) log.Fatal(err) } defer watcher.Close() done2 := make(chan bool) go func() { for { select { case event, ok := <-watcher.Events: fmt.Println("event.Name", event.Name) fmt.Println(event.Op) if ok && event.Op == fsnotify.Create && !strings.Contains(event.Name, "tmp") && !strings.Contains(event.Name, ".TMP") && !strings.Contains(event.Name, "~") && (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) { // 监听文件变更事件 WatchIndexFile(event.Name) } case err := <-watcher.Errors: fmt.Println("watcher.Errors:", err) log.Println("error:", err) case <-time.After(60 * time.Second): continue } } }() fmt.Println("watch dir:" + global.CONFIG.Serve.IndexSaveDir) err = watcher.Add(global.CONFIG.Serve.IndexSaveDir) if err != nil { fmt.Println("watcher.Add:" + err.Error()) log.Fatal(err) } <-done2 } // ListenFolderNewMerge 生产合并文件夹监听 func ListenFolderNewMerge() { fmt.Println("-----生产合并文件夹监听-------") watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() done2 := make(chan bool) go func() { for { select { case event, ok := <-watcher.Events: if ok && (event.Op == fsnotify.Create || event.Op == fsnotify.Write) && !strings.Contains(event.Name, "tmp") && !strings.Contains(event.Name, ".TMP") && !strings.Contains(event.Name, "~") && (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) { WatchIndexFileMergeRelease(event.Name) } case err := <-watcher.Errors: log.Println("error:", err) case <-time.After(60 * time.Second): continue } } }() err = watcher.Add(global.CONFIG.Serve.IndexMergeSaveDir) if err != nil { log.Fatal(err) } <-done2 } // WatchIndexFile 检测指标文件 func WatchIndexFile(filePath string) { fmt.Println("filePath:", filePath) //filePath:D:\mysteel_data\CM0000568866_release.xlsx time.Sleep(10 * time.Second) if !utils.FileIsExist(filePath) { fmt.Println("filePath is not exist:" + filePath) return } //读取文件内容 global.LOG.Info("WatchFile:" + filePath) f, err := excelize.OpenFile(filePath) global.LOG.Info("OpenFile:" + filePath) if err != nil { fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error()) return } var newFilePath string defer func() { if err := f.Close(); err != nil { fmt.Println("FileClose Err:" + err.Error()) return } //重命名文件 if filePath != newFilePath { err := os.Rename(filePath, newFilePath) if err != nil { fmt.Println("os.Rename Err:" + err.Error()) } } }() //var runMode string //if strings.Contains(filePath, "debug") { // runMode = "debug" //} else { // runMode = "release" //} reqList := make([]*HandleMysteelIndex, 0) //dir, fp := filepath.Split(filePath) var wg = sync.WaitGroup{} wg.Add(1) go func() { sheetList := f.GetSheetList() for _, sv := range sheetList { lenRow := 0 //指标数 // excel表的指标数据 indexExcelDataList := make([]map[string]string, 0) indexNameMap := make(map[int]string) indexCodeMap := make(map[int]string) unitMap := make(map[int]string) sourceMap := make(map[int]string) frequencyMap := make(map[int]string) startDateMap := make(map[int]string) endDateMap := make(map[int]string) describeMap := make(map[int]string) updateDateMap := make(map[int]string) rows, err := f.GetRows(sv) if err != nil { fmt.Println("GetRows Err:", err) return } for row, cols := range rows { if row == 0 { // 第一行是 钢联数据的备注 continue } // 指标名称 if row == 1 { lenRow = len(cols) - 1 for i := 1; i <= lenRow; i++ { tmpIndexExcelDataList := make(map[string]string, 0) indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList) } } if row < 10 { for k, colCell := range cols { switch row { case 1: //指标名称 indexNameMap[k-1] = colCell case 2: //单位 unitMap[k-1] = colCell case 3: //数据来源 sourceMap[k-1] = colCell case 4: //指标编码 indexCodeMap[k-1] = colCell case 5: //频度 tmpFrequency := colCell if !strings.Contains(tmpFrequency, "度") { tmpFrequency = tmpFrequency + "度" } frequencyMap[k-1] = tmpFrequency case 6: //时间区间 dateArr := strings.Split(colCell, "~") if len(dateArr) >= 2 { startDateMap[k-1] = dateArr[0] endDateMap[k-1] = dateArr[1] } case 7: //备注 describeMap[k-1] = colCell case 9: updateDateMap[k-1] = colCell } } } else { date := `` for k, col := range cols { if k == 0 { date = col continue } if date == `` { continue } if col != `` { indexExcelDataList[k-1][date] = col } } } } for k, excelDataMap := range indexExcelDataList { indexItem := new(HandleMysteelIndex) indexItem.IndexName = indexNameMap[k] indexItem.IndexCode = indexCodeMap[k] indexItem.Unit = unitMap[k] indexItem.Source = sourceMap[k] indexItem.Frequency = frequencyMap[k] indexItem.StartDate = startDateMap[k] indexItem.EndDate = endDateMap[k] indexItem.Describe = describeMap[k] indexItem.UpdateDate = updateDateMap[k] indexItem.ExcelDataMap = excelDataMap reqList = append(reqList, indexItem) //mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap) } resp := new(HandleMysteelIndexReq) resp.List = reqList postHandleMysteelIndex(resp) } wg.Done() }() wg.Wait() } // WatchIndexFileMergeRelease 监听生产的合并excel文件的处理 func WatchIndexFileMergeRelease(filePath string) { fmt.Println("filePath:", filePath) //return //filePath:D:\mysteel_data\CM0000568866_release.xlsx time.Sleep(5 * time.Second) if !utils.FileIsExist(filePath) { fmt.Println("filePath is not exist:" + filePath) return } //读取文件内容 global.LOG.Info("WatchFile:" + filePath) f, err := excelize.OpenFile(filePath) global.LOG.Info("OpenFile:" + filePath) if err != nil { fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error()) return } defer func() { if err := f.Close(); err != nil { fmt.Println("FileClose Err:" + err.Error()) return } }() //runMode := "release" reqList := make([]*HandleMysteelIndex, 0) var wg = sync.WaitGroup{} wg.Add(1) go func() { sheetList := f.GetSheetList() for _, sv := range sheetList { lenRow := 0 //指标数 // excel表的指标数据 indexExcelDataList := make([]map[string]string, 0) indexNameMap := make(map[int]string) indexCodeMap := make(map[int]string) unitMap := make(map[int]string) sourceMap := make(map[int]string) frequencyMap := make(map[int]string) startDateMap := make(map[int]string) endDateMap := make(map[int]string) describeMap := make(map[int]string) updateDateMap := make(map[int]string) rows, err := f.GetRows(sv) if err != nil { fmt.Println("GetRows Err:", err) return } for row, cols := range rows { if row == 0 { // 第一行是 钢联数据的备注 continue } // 指标名称 if row == 1 { lenRow = len(cols) - 1 for i := 1; i <= lenRow; i++ { tmpIndexExcelDataList := make(map[string]string, 0) indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList) } } if row < 10 { for k, colCell := range cols { switch row { case 1: //指标名称 indexNameMap[k-1] = colCell case 2: //单位 unitMap[k-1] = colCell case 3: //数据来源 sourceMap[k-1] = colCell case 4: //指标编码 indexCodeMap[k-1] = colCell case 5: //频度 tmpFrequency := colCell if !strings.Contains(tmpFrequency, "度") { tmpFrequency = tmpFrequency + "度" } frequencyMap[k-1] = tmpFrequency case 6: //时间区间 dateArr := strings.Split(colCell, "~") if len(dateArr) >= 2 { startDateMap[k-1] = dateArr[0] endDateMap[k-1] = dateArr[1] } case 7: //备注 describeMap[k-1] = colCell case 9: updateDateMap[k-1] = colCell } } } else { date := `` for k, col := range cols { if k == 0 { date = col continue } if date == `` { continue } if col != `` { indexExcelDataList[k-1][date] = col } } } } for k, excelDataMap := range indexExcelDataList { indexItem := new(HandleMysteelIndex) indexItem.IndexName = indexNameMap[k] indexItem.IndexCode = indexCodeMap[k] indexItem.Unit = unitMap[k] indexItem.Source = sourceMap[k] indexItem.Frequency = frequencyMap[k] indexItem.StartDate = startDateMap[k] indexItem.EndDate = endDateMap[k] indexItem.Describe = describeMap[k] indexItem.UpdateDate = updateDateMap[k] indexItem.ExcelDataMap = excelDataMap reqList = append(reqList, indexItem) //mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap) } resp := new(HandleMysteelIndexReq) resp.List = reqList postHandleMysteelIndex(resp) } wg.Done() }() wg.Wait() } type HandleMysteelIndex struct { IndexName string `description:"指标名称"` IndexCode string `description:"指标编码"` Unit string `description:"单位"` Source string `description:"数据来源"` Frequency string `description:"频度"` StartDate string `description:"开始日期"` EndDate string `description:"结束日期"` Describe string `description:"指标描述"` UpdateDate string `description:"更新日期"` ExcelDataMap map[string]string } type HandleMysteelIndexReq struct { List []*HandleMysteelIndex } // mysteelIndexHandle 钢联数据处理 //func mysteelIndexHandle(runMode, indexName, indexCode, unit, source, frequency, startDate, endDate, describe, updateDate string, excelDataMap map[string]string) { // var err error // // //return // indexObj := new(index.BaseFromMysteelChemicalIndex) // var indexId int64 // // addDataList := make([]index.BaseFromMysteelChemicalData, 0) // // exitDataMap := make(map[string]*index.BaseFromMysteelChemicalData) // // // 修改指标信息 // if indexName == "" { // global.LOG.Info("未刷新到指标数据:indexName:" + indexName) // return // } // //判断指标是否存在 // var isAdd int // //req := GetIndexByIndexCodeReq{ // // IndexCode: indexCode, // //} // //item, err := indexObj.GetIndexItem(runMode, indexCode) // item, e := GetIndexByIndexCode(indexCode) // if e != nil { // //if err.Error() == "record not found" { // // isAdd = 1 // //} else { // isAdd = -1 // fmt.Println("GetIndexItem Err:" + err.Error()) // return // //} // } // if item.BaseFromMysteelChemicalIndexId > 0 { // fmt.Println("item:", item) // isAdd = 2 // } else { // isAdd = 1 // } // // fmt.Println("isAdd:", isAdd) // if !strings.Contains(frequency, "度") { // frequency = frequency + "度" // } // // if isAdd == 1 { // indexObj.IndexCode = indexCode // indexObj.IndexName = indexName // indexObj.Unit = unit // indexObj.Source = source // indexObj.Describe = describe // indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local) // indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local) // indexObj.Frequency = frequency // //err = indexObj.Add(runMode) // err = CreateIndex(indexObj) // if err != nil { // fmt.Println("add err:" + err.Error()) // return // } // indexId = indexObj.BaseFromMysteelChemicalIndexId // } else if isAdd == 2 { // indexObj.IndexCode = indexCode // indexObj.IndexName = indexName // indexObj.Unit = unit // indexObj.Source = source // indexObj.Describe = describe // indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local) // indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local) // indexObj.Frequency = frequency // indexObj.ModifyTime = time.Now() // indexId = item.BaseFromMysteelChemicalIndexId // //修改数据 // updateColsArr := make([]string, 0) // updateColsArr = append(updateColsArr, "index_name") // updateColsArr = append(updateColsArr, "unit") // updateColsArr = append(updateColsArr, "source") // updateColsArr = append(updateColsArr, "frequency") // updateColsArr = append(updateColsArr, "start_date") // updateColsArr = append(updateColsArr, "end_date") // updateColsArr = append(updateColsArr, "describe") // updateColsArr = append(updateColsArr, "end_date") // updateColsArr = append(updateColsArr, "modify_time") // // indexObj.Update(runMode, updateColsArr) // // dataObj := new(index.BaseFromMysteelChemicalData) // // //获取已存在的所有数据 // exitDataList, err := dataObj.GetIndexDataList(runMode, indexCode) // if err != nil { // fmt.Println("GetIndexDataList Err:" + err.Error()) // return // } // fmt.Println("exitDataListLen:", len(exitDataList)) // for _, v := range exitDataList { // dateStr := v.DataTime.Format(utils.FormatDate) // exitDataMap[dateStr] = v // } // } // // dataObj := new(index.BaseFromMysteelChemicalData) // // 遍历excel数据,然后跟现有的数据做校验,不存在则入库 // for date, value := range excelDataMap { // if findData, ok := exitDataMap[date]; !ok { // dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local) // if err != nil { // fmt.Println("time.ParseInLocation Err:" + err.Error()) // return // } // if !strings.Contains(value, "#N/A") { // dataItem := new(index.BaseFromMysteelChemicalData) // dataItem.BaseFromMysteelChemicalIndexId = indexId // dataItem.IndexCode = indexCode // dataItem.DataTime = dateTime // dataItem.Value = value // dataItem.UpdateDate = updateDate // dataItem.CreateTime = time.Now() // dataItem.ModifyTime = time.Now() // addDataList = append(addDataList, *dataItem) // } // } else { // if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据 // dataObj.Value = value // dataObj.ModifyTime = time.Now() // dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId // // updateDataColsArr := make([]string, 0) // updateDataColsArr = append(updateDataColsArr, "value") // updateDataColsArr = append(updateDataColsArr, "modify_time") // dataObj.Update(runMode, updateDataColsArr) // global.LOG.Info(findData.IndexCode + " " + findData.Value + "-" + value) // } // } // } // // if len(addDataList) > 0 { // err = dataObj.Add(runMode, addDataList) // if err != nil { // fmt.Println("dataObj.Add() Err:" + err.Error()) // } // } // // go syncEdbDataMysteelChemical(runMode, indexCode) //} //func syncEdbDataMysteelChemical(runMode, indexCode string) { // indexObj := new(models.EdbInfo) // var isAdd int // item, err := indexObj.GetEdbInfoItem(runMode, indexCode) // if err != nil { // if err.Error() == "record not found" { // isAdd = 1 // } else { // isAdd = -1 // fmt.Println("GetEdbInfoItem Err:" + err.Error()) // return // } // } // if item != nil && item.EdbInfoId > 0 { // fmt.Println("item:", item) // isAdd = 2 // } else { // isAdd = 1 // // } // // if isAdd == 1 { //新增 // return // } // // param := make(map[string]interface{}) // param["EdbCode"] = indexCode // param["EdbInfoId"] = item.EdbInfoId // param["StartDate"] = item.EndDate // postRefreshEdbData(param) //} /* CREATE动作即临时文件的创建 WRITE写文件动作 CHMOD修改文件属性 REMOVE删除临时文件。 */