watch.go 16 KB


  1. package watch
  2. import (
  3. "fmt"
  4. "hongze/mysteel_watch/global"
  5. "hongze/mysteel_watch/utils"
  6. "log"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/fsnotify/fsnotify"
  12. "github.com/xuri/excelize/v2"
  13. )
  14. func ListenFolderNew() {
  15. fmt.Println("-----文件夹监听-------")
  16. watcher, err := fsnotify.NewWatcher()
  17. if err != nil {
  18. fmt.Println("fsnotify.NewWatcher err:" + err.Error())
  19. log.Fatal(err)
  20. }
  21. defer watcher.Close()
  22. done2 := make(chan bool)
  23. go func() {
  24. for {
  25. select {
  26. case event, ok := <-watcher.Events:
  27. fmt.Println("event.Name", event.Name)
  28. fmt.Println(event.Op)
  29. if ok && event.Op == fsnotify.Create &&
  30. !strings.Contains(event.Name, "tmp") &&
  31. !strings.Contains(event.Name, ".TMP") &&
  32. !strings.Contains(event.Name, "~") &&
  33. (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
  34. // 监听文件变更事件
  35. WatchIndexFile(event.Name)
  36. }
  37. case err := <-watcher.Errors:
  38. fmt.Println("watcher.Errors:", err)
  39. log.Println("error:", err)
  40. case <-time.After(60 * time.Second):
  41. continue
  42. }
  43. }
  44. }()
  45. fmt.Println("watch dir:" + global.CONFIG.Serve.IndexSaveDir)
  46. err = watcher.Add(global.CONFIG.Serve.IndexSaveDir)
  47. if err != nil {
  48. fmt.Println("watcher.Add:" + err.Error())
  49. log.Fatal(err)
  50. }
  51. <-done2
  52. }
  53. // ListenFolderNewMerge 生产合并文件夹监听
  54. func ListenFolderNewMerge() {
  55. fmt.Println("-----生产合并文件夹监听-------")
  56. watcher, err := fsnotify.NewWatcher()
  57. if err != nil {
  58. log.Fatal(err)
  59. }
  60. defer watcher.Close()
  61. done2 := make(chan bool)
  62. go func() {
  63. for {
  64. select {
  65. case event, ok := <-watcher.Events:
  66. if ok && (event.Op == fsnotify.Create || event.Op == fsnotify.Write) &&
  67. !strings.Contains(event.Name, "tmp") &&
  68. !strings.Contains(event.Name, ".TMP") &&
  69. !strings.Contains(event.Name, "~") &&
  70. (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
  71. WatchIndexFileMergeRelease(event.Name)
  72. }
  73. case err := <-watcher.Errors:
  74. log.Println("error:", err)
  75. case <-time.After(60 * time.Second):
  76. continue
  77. }
  78. }
  79. }()
  80. err = watcher.Add(global.CONFIG.Serve.IndexMergeSaveDir)
  81. if err != nil {
  82. log.Fatal(err)
  83. }
  84. <-done2
  85. }
  86. // WatchIndexFile 检测指标文件
  87. func WatchIndexFile(filePath string) {
  88. fmt.Println("filePath:", filePath)
  89. //filePath:D:\mysteel_data\CM0000568866_release.xlsx
  90. time.Sleep(10 * time.Second)
  91. if !utils.FileIsExist(filePath) {
  92. fmt.Println("filePath is not exist:" + filePath)
  93. return
  94. }
  95. //读取文件内容
  96. global.LOG.Info("WatchFile:" + filePath)
  97. f, err := excelize.OpenFile(filePath)
  98. global.LOG.Info("OpenFile:" + filePath)
  99. if err != nil {
  100. fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
  101. return
  102. }
  103. var newFilePath string
  104. defer func() {
  105. if err := f.Close(); err != nil {
  106. fmt.Println("FileClose Err:" + err.Error())
  107. return
  108. }
  109. //重命名文件
  110. if filePath != newFilePath {
  111. err := os.Rename(filePath, newFilePath)
  112. if err != nil {
  113. fmt.Println("os.Rename Err:" + err.Error())
  114. }
  115. }
  116. }()
  117. //var runMode string
  118. //if strings.Contains(filePath, "debug") {
  119. // runMode = "debug"
  120. //} else {
  121. // runMode = "release"
  122. //}
  123. reqList := make([]*HandleMysteelIndex, 0)
  124. //dir, fp := filepath.Split(filePath)
  125. var wg = sync.WaitGroup{}
  126. wg.Add(1)
  127. go func() {
  128. sheetList := f.GetSheetList()
  129. for _, sv := range sheetList {
  130. lenRow := 0 //指标数
  131. // excel表的指标数据
  132. indexExcelDataList := make([]map[string]string, 0)
  133. indexNameMap := make(map[int]string)
  134. indexCodeMap := make(map[int]string)
  135. unitMap := make(map[int]string)
  136. sourceMap := make(map[int]string)
  137. frequencyMap := make(map[int]string)
  138. startDateMap := make(map[int]string)
  139. endDateMap := make(map[int]string)
  140. describeMap := make(map[int]string)
  141. updateDateMap := make(map[int]string)
  142. rows, err := f.GetRows(sv)
  143. if err != nil {
  144. fmt.Println("GetRows Err:", err)
  145. return
  146. }
  147. for row, cols := range rows {
  148. if row == 0 {
  149. // 第一行是 钢联数据的备注
  150. continue
  151. }
  152. // 指标名称
  153. if row == 1 {
  154. lenRow = len(cols) - 1
  155. for i := 1; i <= lenRow; i++ {
  156. tmpIndexExcelDataList := make(map[string]string, 0)
  157. indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList)
  158. }
  159. }
  160. if row < 10 {
  161. for k, colCell := range cols {
  162. switch row {
  163. case 1: //指标名称
  164. indexNameMap[k-1] = colCell
  165. case 2: //单位
  166. unitMap[k-1] = colCell
  167. case 3: //数据来源
  168. sourceMap[k-1] = colCell
  169. case 4: //指标编码
  170. indexCodeMap[k-1] = colCell
  171. case 5: //频度
  172. tmpFrequency := colCell
  173. if !strings.Contains(tmpFrequency, "度") {
  174. tmpFrequency = tmpFrequency + "度"
  175. }
  176. frequencyMap[k-1] = tmpFrequency
  177. case 6: //时间区间
  178. dateArr := strings.Split(colCell, "~")
  179. if len(dateArr) >= 2 {
  180. startDateMap[k-1] = dateArr[0]
  181. endDateMap[k-1] = dateArr[1]
  182. }
  183. case 7: //备注
  184. describeMap[k-1] = colCell
  185. case 9:
  186. updateDateMap[k-1] = colCell
  187. }
  188. }
  189. } else {
  190. date := ``
  191. for k, col := range cols {
  192. if k == 0 {
  193. date = col
  194. continue
  195. }
  196. if date == `` {
  197. continue
  198. }
  199. if col != `` {
  200. indexExcelDataList[k-1][date] = col
  201. }
  202. }
  203. }
  204. }
  205. for k, excelDataMap := range indexExcelDataList {
  206. indexItem := new(HandleMysteelIndex)
  207. indexItem.IndexName = indexNameMap[k]
  208. indexItem.IndexCode = indexCodeMap[k]
  209. indexItem.Unit = unitMap[k]
  210. indexItem.Source = sourceMap[k]
  211. indexItem.Frequency = frequencyMap[k]
  212. indexItem.StartDate = startDateMap[k]
  213. indexItem.EndDate = endDateMap[k]
  214. indexItem.Describe = describeMap[k]
  215. indexItem.UpdateDate = updateDateMap[k]
  216. indexItem.ExcelDataMap = excelDataMap
  217. reqList = append(reqList, indexItem)
  218. //mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap)
  219. }
  220. resp := new(HandleMysteelIndexReq)
  221. resp.List = reqList
  222. postHandleMysteelIndex(resp)
  223. }
  224. wg.Done()
  225. }()
  226. wg.Wait()
  227. }
  228. // WatchIndexFileMergeRelease 监听生产的合并excel文件的处理
  229. func WatchIndexFileMergeRelease(filePath string) {
  230. fmt.Println("filePath:", filePath)
  231. //return
  232. //filePath:D:\mysteel_data\CM0000568866_release.xlsx
  233. time.Sleep(5 * time.Second)
  234. if !utils.FileIsExist(filePath) {
  235. fmt.Println("filePath is not exist:" + filePath)
  236. return
  237. }
  238. //读取文件内容
  239. global.LOG.Info("WatchFile:" + filePath)
  240. f, err := excelize.OpenFile(filePath)
  241. global.LOG.Info("OpenFile:" + filePath)
  242. if err != nil {
  243. fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
  244. return
  245. }
  246. defer func() {
  247. if err := f.Close(); err != nil {
  248. fmt.Println("FileClose Err:" + err.Error())
  249. return
  250. }
  251. }()
  252. //runMode := "release"
  253. reqList := make([]*HandleMysteelIndex, 0)
  254. var wg = sync.WaitGroup{}
  255. wg.Add(1)
  256. go func() {
  257. sheetList := f.GetSheetList()
  258. for _, sv := range sheetList {
  259. lenRow := 0 //指标数
  260. // excel表的指标数据
  261. indexExcelDataList := make([]map[string]string, 0)
  262. indexNameMap := make(map[int]string)
  263. indexCodeMap := make(map[int]string)
  264. unitMap := make(map[int]string)
  265. sourceMap := make(map[int]string)
  266. frequencyMap := make(map[int]string)
  267. startDateMap := make(map[int]string)
  268. endDateMap := make(map[int]string)
  269. describeMap := make(map[int]string)
  270. updateDateMap := make(map[int]string)
  271. rows, err := f.GetRows(sv)
  272. if err != nil {
  273. fmt.Println("GetRows Err:", err)
  274. return
  275. }
  276. for row, cols := range rows {
  277. if row == 0 {
  278. // 第一行是 钢联数据的备注
  279. continue
  280. }
  281. // 指标名称
  282. if row == 1 {
  283. lenRow = len(cols) - 1
  284. for i := 1; i <= lenRow; i++ {
  285. tmpIndexExcelDataList := make(map[string]string, 0)
  286. indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList)
  287. }
  288. }
  289. if row < 10 {
  290. for k, colCell := range cols {
  291. switch row {
  292. case 1: //指标名称
  293. indexNameMap[k-1] = colCell
  294. case 2: //单位
  295. unitMap[k-1] = colCell
  296. case 3: //数据来源
  297. sourceMap[k-1] = colCell
  298. case 4: //指标编码
  299. indexCodeMap[k-1] = colCell
  300. case 5: //频度
  301. tmpFrequency := colCell
  302. if !strings.Contains(tmpFrequency, "度") {
  303. tmpFrequency = tmpFrequency + "度"
  304. }
  305. frequencyMap[k-1] = tmpFrequency
  306. case 6: //时间区间
  307. dateArr := strings.Split(colCell, "~")
  308. if len(dateArr) >= 2 {
  309. startDateMap[k-1] = dateArr[0]
  310. endDateMap[k-1] = dateArr[1]
  311. }
  312. case 7: //备注
  313. describeMap[k-1] = colCell
  314. case 9:
  315. updateDateMap[k-1] = colCell
  316. }
  317. }
  318. } else {
  319. date := ``
  320. for k, col := range cols {
  321. if k == 0 {
  322. date = col
  323. continue
  324. }
  325. if date == `` {
  326. continue
  327. }
  328. if col != `` {
  329. indexExcelDataList[k-1][date] = col
  330. }
  331. }
  332. }
  333. }
  334. for k, excelDataMap := range indexExcelDataList {
  335. indexItem := new(HandleMysteelIndex)
  336. indexItem.IndexName = indexNameMap[k]
  337. indexItem.IndexCode = indexCodeMap[k]
  338. indexItem.Unit = unitMap[k]
  339. indexItem.Source = sourceMap[k]
  340. indexItem.Frequency = frequencyMap[k]
  341. indexItem.StartDate = startDateMap[k]
  342. indexItem.EndDate = endDateMap[k]
  343. indexItem.Describe = describeMap[k]
  344. indexItem.UpdateDate = updateDateMap[k]
  345. indexItem.ExcelDataMap = excelDataMap
  346. reqList = append(reqList, indexItem)
  347. //mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap)
  348. }
  349. resp := new(HandleMysteelIndexReq)
  350. resp.List = reqList
  351. postHandleMysteelIndex(resp)
  352. }
  353. wg.Done()
  354. }()
  355. wg.Wait()
  356. }
  357. type HandleMysteelIndex struct {
  358. IndexName string `description:"指标名称"`
  359. IndexCode string `description:"指标编码"`
  360. Unit string `description:"单位"`
  361. Source string `description:"数据来源"`
  362. Frequency string `description:"频度"`
  363. StartDate string `description:"开始日期"`
  364. EndDate string `description:"结束日期"`
  365. Describe string `description:"指标描述"`
  366. UpdateDate string `description:"更新日期"`
  367. ExcelDataMap map[string]string
  368. }
  369. type HandleMysteelIndexReq struct {
  370. List []*HandleMysteelIndex
  371. }
  372. // mysteelIndexHandle 钢联数据处理
  373. //func mysteelIndexHandle(runMode, indexName, indexCode, unit, source, frequency, startDate, endDate, describe, updateDate string, excelDataMap map[string]string) {
  374. // var err error
  375. //
  376. // //return
  377. // indexObj := new(index.BaseFromMysteelChemicalIndex)
  378. // var indexId int64
  379. //
  380. // addDataList := make([]index.BaseFromMysteelChemicalData, 0)
  381. //
  382. // exitDataMap := make(map[string]*index.BaseFromMysteelChemicalData)
  383. //
  384. // // 修改指标信息
  385. // if indexName == "" {
  386. // global.LOG.Info("未刷新到指标数据:indexName:" + indexName)
  387. // return
  388. // }
  389. // //判断指标是否存在
  390. // var isAdd int
  391. // //req := GetIndexByIndexCodeReq{
  392. // // IndexCode: indexCode,
  393. // //}
  394. // //item, err := indexObj.GetIndexItem(runMode, indexCode)
  395. // item, e := GetIndexByIndexCode(indexCode)
  396. // if e != nil {
  397. // //if err.Error() == "record not found" {
  398. // // isAdd = 1
  399. // //} else {
  400. // isAdd = -1
  401. // fmt.Println("GetIndexItem Err:" + err.Error())
  402. // return
  403. // //}
  404. // }
  405. // if item.BaseFromMysteelChemicalIndexId > 0 {
  406. // fmt.Println("item:", item)
  407. // isAdd = 2
  408. // } else {
  409. // isAdd = 1
  410. // }
  411. //
  412. // fmt.Println("isAdd:", isAdd)
  413. // if !strings.Contains(frequency, "度") {
  414. // frequency = frequency + "度"
  415. // }
  416. //
  417. // if isAdd == 1 {
  418. // indexObj.IndexCode = indexCode
  419. // indexObj.IndexName = indexName
  420. // indexObj.Unit = unit
  421. // indexObj.Source = source
  422. // indexObj.Describe = describe
  423. // indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  424. // indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  425. // indexObj.Frequency = frequency
  426. // //err = indexObj.Add(runMode)
  427. // err = CreateIndex(indexObj)
  428. // if err != nil {
  429. // fmt.Println("add err:" + err.Error())
  430. // return
  431. // }
  432. // indexId = indexObj.BaseFromMysteelChemicalIndexId
  433. // } else if isAdd == 2 {
  434. // indexObj.IndexCode = indexCode
  435. // indexObj.IndexName = indexName
  436. // indexObj.Unit = unit
  437. // indexObj.Source = source
  438. // indexObj.Describe = describe
  439. // indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  440. // indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  441. // indexObj.Frequency = frequency
  442. // indexObj.ModifyTime = time.Now()
  443. // indexId = item.BaseFromMysteelChemicalIndexId
  444. // //修改数据
  445. // updateColsArr := make([]string, 0)
  446. // updateColsArr = append(updateColsArr, "index_name")
  447. // updateColsArr = append(updateColsArr, "unit")
  448. // updateColsArr = append(updateColsArr, "source")
  449. // updateColsArr = append(updateColsArr, "frequency")
  450. // updateColsArr = append(updateColsArr, "start_date")
  451. // updateColsArr = append(updateColsArr, "end_date")
  452. // updateColsArr = append(updateColsArr, "describe")
  453. // updateColsArr = append(updateColsArr, "end_date")
  454. // updateColsArr = append(updateColsArr, "modify_time")
  455. //
  456. // indexObj.Update(runMode, updateColsArr)
  457. //
  458. // dataObj := new(index.BaseFromMysteelChemicalData)
  459. //
  460. // //获取已存在的所有数据
  461. // exitDataList, err := dataObj.GetIndexDataList(runMode, indexCode)
  462. // if err != nil {
  463. // fmt.Println("GetIndexDataList Err:" + err.Error())
  464. // return
  465. // }
  466. // fmt.Println("exitDataListLen:", len(exitDataList))
  467. // for _, v := range exitDataList {
  468. // dateStr := v.DataTime.Format(utils.FormatDate)
  469. // exitDataMap[dateStr] = v
  470. // }
  471. // }
  472. //
  473. // dataObj := new(index.BaseFromMysteelChemicalData)
  474. // // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  475. // for date, value := range excelDataMap {
  476. // if findData, ok := exitDataMap[date]; !ok {
  477. // dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local)
  478. // if err != nil {
  479. // fmt.Println("time.ParseInLocation Err:" + err.Error())
  480. // return
  481. // }
  482. // if !strings.Contains(value, "#N/A") {
  483. // dataItem := new(index.BaseFromMysteelChemicalData)
  484. // dataItem.BaseFromMysteelChemicalIndexId = indexId
  485. // dataItem.IndexCode = indexCode
  486. // dataItem.DataTime = dateTime
  487. // dataItem.Value = value
  488. // dataItem.UpdateDate = updateDate
  489. // dataItem.CreateTime = time.Now()
  490. // dataItem.ModifyTime = time.Now()
  491. // addDataList = append(addDataList, *dataItem)
  492. // }
  493. // } else {
  494. // if findData != nil && findData.Value != value && !strings.Contains(value, "#N/A") { //修改数据
  495. // dataObj.Value = value
  496. // dataObj.ModifyTime = time.Now()
  497. // dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
  498. //
  499. // updateDataColsArr := make([]string, 0)
  500. // updateDataColsArr = append(updateDataColsArr, "value")
  501. // updateDataColsArr = append(updateDataColsArr, "modify_time")
  502. // dataObj.Update(runMode, updateDataColsArr)
  503. // global.LOG.Info(findData.IndexCode + " " + findData.Value + "-" + value)
  504. // }
  505. // }
  506. // }
  507. //
  508. // if len(addDataList) > 0 {
  509. // err = dataObj.Add(runMode, addDataList)
  510. // if err != nil {
  511. // fmt.Println("dataObj.Add() Err:" + err.Error())
  512. // }
  513. // }
  514. //
  515. // go syncEdbDataMysteelChemical(runMode, indexCode)
  516. //}
  517. //func syncEdbDataMysteelChemical(runMode, indexCode string) {
  518. // indexObj := new(models.EdbInfo)
  519. // var isAdd int
  520. // item, err := indexObj.GetEdbInfoItem(runMode, indexCode)
  521. // if err != nil {
  522. // if err.Error() == "record not found" {
  523. // isAdd = 1
  524. // } else {
  525. // isAdd = -1
  526. // fmt.Println("GetEdbInfoItem Err:" + err.Error())
  527. // return
  528. // }
  529. // }
  530. // if item != nil && item.EdbInfoId > 0 {
  531. // fmt.Println("item:", item)
  532. // isAdd = 2
  533. // } else {
  534. // isAdd = 1 //
  535. // }
  536. //
  537. // if isAdd == 1 { //新增
  538. // return
  539. // }
  540. //
  541. // param := make(map[string]interface{})
  542. // param["EdbCode"] = indexCode
  543. // param["EdbInfoId"] = item.EdbInfoId
  544. // param["StartDate"] = item.EndDate
  545. // postRefreshEdbData(param)
  546. //}
  547. /*
  548. CREATE动作即临时文件的创建
  549. WRITE写文件动作
  550. CHMOD修改文件属性
  551. REMOVE删除临时文件。
  552. */