watch_v2.go 9.6 KB


  1. package watch
  2. import (
  3. "fmt"
  4. "hongze/mysteel_watch/global"
  5. "hongze/mysteel_watch/utils"
  6. "log"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/fsnotify/fsnotify"
  12. "github.com/xuri/excelize/v2"
  13. )
  14. func ListenFolderNewV2() {
  15. fmt.Println("-----文件夹监听-------")
  16. watcher, err := fsnotify.NewWatcher()
  17. if err != nil {
  18. fmt.Println("fsnotify.NewWatcher err:" + err.Error())
  19. log.Fatal(err)
  20. }
  21. defer watcher.Close()
  22. done2 := make(chan bool)
  23. go func() {
  24. for {
  25. select {
  26. case event, ok := <-watcher.Events:
  27. fmt.Println("event.Name", event.Name)
  28. fmt.Println(event.Op)
  29. if ok && event.Op == fsnotify.Create &&
  30. !strings.Contains(event.Name, "tmp") &&
  31. !strings.Contains(event.Name, ".TMP") &&
  32. !strings.Contains(event.Name, "~") &&
  33. (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
  34. // 监听文件变更事件
  35. WatchIndexFile(event.Name)
  36. }
  37. case err := <-watcher.Errors:
  38. fmt.Println("watcher.Errors:", err)
  39. log.Println("error:", err)
  40. case <-time.After(60 * time.Second):
  41. continue
  42. }
  43. }
  44. }()
  45. fmt.Println("watch dir:" + global.CONFIG.Serve.IndexSaveDir)
  46. err = watcher.Add(global.CONFIG.Serve.IndexSaveDir)
  47. if err != nil {
  48. fmt.Println("watcher.Add:" + err.Error())
  49. log.Fatal(err)
  50. }
  51. <-done2
  52. }
  53. // ListenFolderNewMerge 生产合并文件夹监听
  54. func ListenFolderNewMergeV2() {
  55. fmt.Println("-----生产合并文件夹监听-------")
  56. watcher, err := fsnotify.NewWatcher()
  57. if err != nil {
  58. log.Fatal(err)
  59. }
  60. defer watcher.Close()
  61. done2 := make(chan bool)
  62. go func() {
  63. for {
  64. select {
  65. case event, ok := <-watcher.Events:
  66. if ok && (event.Op == fsnotify.Create || event.Op == fsnotify.Write) &&
  67. !strings.Contains(event.Name, "tmp") &&
  68. !strings.Contains(event.Name, ".TMP") &&
  69. !strings.Contains(event.Name, "~") &&
  70. (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
  71. WatchIndexFileMergeRelease(event.Name)
  72. }
  73. case err := <-watcher.Errors:
  74. log.Println("error:", err)
  75. case <-time.After(60 * time.Second):
  76. continue
  77. }
  78. }
  79. }()
  80. err = watcher.Add(global.CONFIG.Serve.IndexMergeSaveDir)
  81. if err != nil {
  82. log.Fatal(err)
  83. }
  84. <-done2
  85. }
  86. // WatchIndexFile 检测指标文件
  87. func WatchIndexFileV2(filePath string) {
  88. fmt.Println("filePath:", filePath)
  89. //filePath:D:\mysteel_data\CM0000568866_release.xlsx
  90. time.Sleep(10 * time.Second)
  91. if !utils.FileIsExist(filePath) {
  92. fmt.Println("filePath is not exist:" + filePath)
  93. return
  94. }
  95. //读取文件内容
  96. global.LOG.Info("WatchFile:" + filePath)
  97. f, err := excelize.OpenFile(filePath)
  98. global.LOG.Info("OpenFile:" + filePath)
  99. if err != nil {
  100. fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
  101. return
  102. }
  103. var newFilePath string
  104. defer func() {
  105. if err := f.Close(); err != nil {
  106. fmt.Println("FileClose Err:" + err.Error())
  107. return
  108. }
  109. //重命名文件
  110. if filePath != newFilePath {
  111. err := os.Rename(filePath, newFilePath)
  112. if err != nil {
  113. fmt.Println("os.Rename Err:" + err.Error())
  114. }
  115. }
  116. }()
  117. reqList := make([]*HandleMysteelIndex, 0)
  118. var wg = sync.WaitGroup{}
  119. wg.Add(1)
  120. go func() {
  121. sheetList := f.GetSheetList()
  122. for _, sv := range sheetList {
  123. lenRow := 0 //指标数
  124. // excel表的指标数据
  125. indexExcelDataList := make([]map[string]string, 0)
  126. indexNameMap := make(map[int]string)
  127. indexCodeMap := make(map[int]string)
  128. unitMap := make(map[int]string)
  129. sourceMap := make(map[int]string)
  130. frequencyMap := make(map[int]string)
  131. startDateMap := make(map[int]string)
  132. endDateMap := make(map[int]string)
  133. describeMap := make(map[int]string)
  134. updateDateMap := make(map[int]string)
  135. rows, err := f.GetRows(sv)
  136. if err != nil {
  137. fmt.Println("GetRows Err:", err)
  138. return
  139. }
  140. for row, cols := range rows {
  141. if row == 0 {
  142. // 第一行是 钢联数据的备注
  143. continue
  144. }
  145. // 指标名称
  146. if row == 1 {
  147. lenRow = len(cols) - 1
  148. for i := 1; i <= lenRow; i++ {
  149. tmpIndexExcelDataList := make(map[string]string, 0)
  150. indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList)
  151. }
  152. }
  153. if row < 10 {
  154. for k, colCell := range cols {
  155. switch row {
  156. case 1: //指标名称
  157. indexNameMap[k-1] = colCell
  158. case 2: //单位
  159. unitMap[k-1] = colCell
  160. case 3: //数据来源
  161. sourceMap[k-1] = colCell
  162. case 4: //指标编码
  163. indexCodeMap[k-1] = colCell
  164. case 5: //频度
  165. tmpFrequency := colCell
  166. if !strings.Contains(tmpFrequency, "度") {
  167. tmpFrequency = tmpFrequency + "度"
  168. }
  169. frequencyMap[k-1] = tmpFrequency
  170. case 6: //时间区间
  171. dateArr := strings.Split(colCell, "~")
  172. if len(dateArr) >= 2 {
  173. startDateMap[k-1] = dateArr[0]
  174. endDateMap[k-1] = dateArr[1]
  175. }
  176. case 7: //备注
  177. describeMap[k-1] = colCell
  178. case 9:
  179. updateDateMap[k-1] = colCell
  180. }
  181. }
  182. } else {
  183. date := ``
  184. for k, col := range cols {
  185. if k == 0 {
  186. date = col
  187. continue
  188. }
  189. if date == `` {
  190. continue
  191. }
  192. if col != `` {
  193. indexExcelDataList[k-1][date] = col
  194. }
  195. }
  196. }
  197. }
  198. for k, excelDataMap := range indexExcelDataList {
  199. indexItem := new(HandleMysteelIndex)
  200. indexItem.IndexName = indexNameMap[k]
  201. indexItem.IndexCode = indexCodeMap[k]
  202. indexItem.Unit = unitMap[k]
  203. indexItem.Source = sourceMap[k]
  204. indexItem.Frequency = frequencyMap[k]
  205. indexItem.StartDate = startDateMap[k]
  206. indexItem.EndDate = endDateMap[k]
  207. indexItem.Describe = describeMap[k]
  208. indexItem.UpdateDate = updateDateMap[k]
  209. indexItem.ExcelDataMap = excelDataMap
  210. reqList = append(reqList, indexItem)
  211. //mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap)
  212. }
  213. resp := new(HandleMysteelIndexReq)
  214. resp.List = reqList
  215. postHandleMysteelIndex(resp)
  216. }
  217. wg.Done()
  218. }()
  219. wg.Wait()
  220. }
  221. // WatchIndexFileMergeRelease 监听生产的合并excel文件的处理
  222. func WatchIndexFileMergeReleaseV2(filePath string) {
  223. fmt.Println("filePath:", filePath)
  224. time.Sleep(5 * time.Second)
  225. if !utils.FileIsExist(filePath) {
  226. fmt.Println("filePath is not exist:" + filePath)
  227. return
  228. }
  229. //读取文件内容
  230. global.LOG.Info("WatchFile:" + filePath)
  231. f, err := excelize.OpenFile(filePath)
  232. global.LOG.Info("OpenFile:" + filePath)
  233. if err != nil {
  234. fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
  235. return
  236. }
  237. defer func() {
  238. if err := f.Close(); err != nil {
  239. fmt.Println("FileClose Err:" + err.Error())
  240. return
  241. }
  242. }()
  243. //runMode := "release"
  244. reqList := make([]*HandleMysteelIndex, 0)
  245. var wg = sync.WaitGroup{}
  246. wg.Add(1)
  247. go func() {
  248. sheetList := f.GetSheetList()
  249. for _, sv := range sheetList {
  250. lenRow := 0 //指标数
  251. // excel表的指标数据
  252. indexExcelDataList := make([]map[string]string, 0)
  253. indexNameMap := make(map[int]string)
  254. indexCodeMap := make(map[int]string)
  255. unitMap := make(map[int]string)
  256. sourceMap := make(map[int]string)
  257. frequencyMap := make(map[int]string)
  258. startDateMap := make(map[int]string)
  259. endDateMap := make(map[int]string)
  260. describeMap := make(map[int]string)
  261. updateDateMap := make(map[int]string)
  262. rows, err := f.GetRows(sv)
  263. if err != nil {
  264. fmt.Println("GetRows Err:", err)
  265. return
  266. }
  267. for row, cols := range rows {
  268. if row == 0 {
  269. // 第一行是 钢联数据的备注
  270. continue
  271. }
  272. // 指标名称
  273. if row == 1 {
  274. lenRow = len(cols) - 1
  275. for i := 1; i <= lenRow; i++ {
  276. tmpIndexExcelDataList := make(map[string]string, 0)
  277. indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList)
  278. }
  279. }
  280. if row < 10 {
  281. for k, colCell := range cols {
  282. switch row {
  283. case 1: //指标名称
  284. indexNameMap[k-1] = colCell
  285. case 2: //单位
  286. unitMap[k-1] = colCell
  287. case 3: //数据来源
  288. sourceMap[k-1] = colCell
  289. case 4: //指标编码
  290. indexCodeMap[k-1] = colCell
  291. case 5: //频度
  292. tmpFrequency := colCell
  293. if !strings.Contains(tmpFrequency, "度") {
  294. tmpFrequency = tmpFrequency + "度"
  295. }
  296. frequencyMap[k-1] = tmpFrequency
  297. case 6: //时间区间
  298. dateArr := strings.Split(colCell, "~")
  299. if len(dateArr) >= 2 {
  300. startDateMap[k-1] = dateArr[0]
  301. endDateMap[k-1] = dateArr[1]
  302. }
  303. case 7: //备注
  304. describeMap[k-1] = colCell
  305. case 9:
  306. updateDateMap[k-1] = colCell
  307. }
  308. }
  309. } else {
  310. date := ``
  311. for k, col := range cols {
  312. if k == 0 {
  313. date = col
  314. continue
  315. }
  316. if date == `` {
  317. continue
  318. }
  319. if col != `` {
  320. indexExcelDataList[k-1][date] = col
  321. }
  322. }
  323. }
  324. }
  325. for k, excelDataMap := range indexExcelDataList {
  326. indexItem := new(HandleMysteelIndex)
  327. indexItem.IndexName = indexNameMap[k]
  328. indexItem.IndexCode = indexCodeMap[k]
  329. indexItem.Unit = unitMap[k]
  330. indexItem.Source = sourceMap[k]
  331. indexItem.Frequency = frequencyMap[k]
  332. indexItem.StartDate = startDateMap[k]
  333. indexItem.EndDate = endDateMap[k]
  334. indexItem.Describe = describeMap[k]
  335. indexItem.UpdateDate = updateDateMap[k]
  336. indexItem.ExcelDataMap = excelDataMap
  337. reqList = append(reqList, indexItem)
  338. //mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], updateDateMap[k], excelDataMap)
  339. }
  340. resp := new(HandleMysteelIndexReq)
  341. resp.List = reqList
  342. postHandleMysteelIndex(resp)
  343. }
  344. wg.Done()
  345. }()
  346. wg.Wait()
  347. }