watch.go 19 KB


  1. package watch
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "hongze/mysteel_watch/global"
  6. "hongze/mysteel_watch/models/index"
  7. "hongze/mysteel_watch/models"
  8. "hongze/mysteel_watch/utils"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "os"
  13. "path/filepath"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/fsnotify/fsnotify"
  18. "github.com/xuri/excelize/v2"
  19. )
  20. func ListenFolderNew() {
  21. fmt.Println("-----文件夹监听-------")
  22. watcher, err := fsnotify.NewWatcher()
  23. if err != nil {
  24. fmt.Println("fsnotify.NewWatcher err:" + err.Error())
  25. log.Fatal(err)
  26. }
  27. defer watcher.Close()
  28. done2 := make(chan bool)
  29. go func() {
  30. for {
  31. select {
  32. case event, ok := <-watcher.Events:
  33. fmt.Println("event.Name", event.Name)
  34. fmt.Println(event.Op)
  35. if ok && event.Op == fsnotify.Create &&
  36. !strings.Contains(event.Name, "tmp") &&
  37. !strings.Contains(event.Name, ".TMP") &&
  38. !strings.Contains(event.Name, "~") &&
  39. (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
  40. WatchIndexFile(event.Name)
  41. }
  42. case err := <-watcher.Errors:
  43. fmt.Println("watcher.Errors:", err)
  44. log.Println("error:", err)
  45. case <-time.After(60 * time.Second):
  46. continue
  47. }
  48. }
  49. }()
  50. fmt.Println("watch dir:" + utils.IndexSaveDir)
  51. err = watcher.Add(utils.IndexSaveDir)
  52. if err != nil {
  53. fmt.Println("watcher.Add:" + err.Error())
  54. log.Fatal(err)
  55. }
  56. <-done2
  57. }
  58. // ListenFolderNewMerge 生产合并文件夹监听
  59. func ListenFolderNewMerge() {
  60. fmt.Println("-----生产合并文件夹监听-------")
  61. watcher, err := fsnotify.NewWatcher()
  62. if err != nil {
  63. log.Fatal(err)
  64. }
  65. defer watcher.Close()
  66. done2 := make(chan bool)
  67. go func() {
  68. for {
  69. select {
  70. case event, ok := <-watcher.Events:
  71. if ok && (event.Op == fsnotify.Create || event.Op == fsnotify.Write) &&
  72. !strings.Contains(event.Name, "tmp") &&
  73. !strings.Contains(event.Name, ".TMP") &&
  74. !strings.Contains(event.Name, "~") &&
  75. (strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
  76. WatchIndexFileMergeRelease(event.Name)
  77. }
  78. case err := <-watcher.Errors:
  79. log.Println("error:", err)
  80. case <-time.After(60 * time.Second):
  81. continue
  82. }
  83. }
  84. }()
  85. err = watcher.Add(utils.IndexMsergeSaveDir)
  86. if err != nil {
  87. log.Fatal(err)
  88. }
  89. <-done2
  90. }
  91. // WatchIndexFile 检测指标文件
  92. func WatchIndexFile(filePath string) {
  93. fmt.Println("filePath:", filePath)
  94. //filePath:D:\mysteel_data\CM0000568866_release.xlsx
  95. time.Sleep(10 * time.Second)
  96. if !utils.FileIsExist(filePath) {
  97. fmt.Println("filePath is not exist:" + filePath)
  98. return
  99. }
  100. //读取文件内容
  101. global.LOG.Info("WatchFile:" + filePath)
  102. f, err := excelize.OpenFile(filePath)
  103. global.LOG.Info("OpenFile:" + filePath)
  104. if err != nil {
  105. fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
  106. return
  107. }
  108. var newFilePath string
  109. defer func() {
  110. if err := f.Close(); err != nil {
  111. fmt.Println("FileClose Err:" + err.Error())
  112. return
  113. }
  114. //重命名文件
  115. if filePath != newFilePath {
  116. err := os.Rename(filePath, newFilePath)
  117. if err != nil {
  118. fmt.Println("os.Rename Err:" + err.Error())
  119. }
  120. }
  121. }()
  122. var runMode string
  123. if strings.Contains(filePath, "debug") {
  124. runMode = "debug"
  125. } else {
  126. runMode = "release"
  127. }
  128. dir, fp := filepath.Split(filePath)
  129. var wg = sync.WaitGroup{}
  130. wg.Add(1)
  131. go func() {
  132. sheetList := f.GetSheetList()
  133. for _, sv := range sheetList {
  134. var indexName, indexCode, unit, source, frequency, startDate, endDate, describe string
  135. var indexId int64
  136. rows, err := f.GetRows(sv)
  137. if err != nil {
  138. fmt.Println("f.GetRows:err:" + err.Error())
  139. return
  140. }
  141. indexObj := new(index.BaseFromMysteelChemicalIndex)
  142. dataList := make([]index.BaseFromMysteelChemicalData, 0)
  143. dataMap := make(map[string]string)
  144. for rk, row := range rows {
  145. if rk > 0 {
  146. if rk < 10 {
  147. for ck, colCell := range row {
  148. if ck == 1 {
  149. if rk == 1 {
  150. indexName = colCell
  151. }
  152. if rk == 2 {
  153. unit = colCell
  154. }
  155. if rk == 3 {
  156. source = colCell
  157. }
  158. if rk == 4 {
  159. indexCode = colCell
  160. }
  161. if rk == 5 {
  162. frequency = colCell
  163. if !strings.Contains(frequency, "度") {
  164. frequency = frequency + "度"
  165. }
  166. }
  167. if rk == 6 {
  168. dateArr := strings.Split(colCell, "~")
  169. if len(dateArr) >= 2 {
  170. startDate = dateArr[0]
  171. endDate = dateArr[1]
  172. }
  173. }
  174. if rk == 7 {
  175. describe = colCell
  176. }
  177. }
  178. }
  179. if rk == 9 {
  180. if indexName == "" {
  181. global.LOG.Info("未刷新到指标数据:filePath:" + filePath)
  182. break
  183. }
  184. //判断指标是否存在
  185. var isAdd int
  186. item, err := indexObj.GetIndexItem(runMode, indexCode)
  187. if err != nil {
  188. if err.Error() == "record not found" {
  189. isAdd = 1
  190. } else {
  191. isAdd = -1
  192. fmt.Println("GetIndexItem Err:" + err.Error())
  193. return
  194. }
  195. }
  196. if item != nil && item.BaseFromMysteelChemicalIndexId > 0 {
  197. fmt.Println("item:", item)
  198. isAdd = 2
  199. } else {
  200. isAdd = 1
  201. }
  202. fmt.Println("isAdd:", isAdd)
  203. if !strings.Contains(frequency, "度") {
  204. frequency = frequency + "度"
  205. }
  206. var frequencyStr string
  207. if strings.Contains(frequency, "日") {
  208. frequencyStr = "day"
  209. } else if strings.Contains(frequency, "周") {
  210. frequencyStr = "week"
  211. } else if strings.Contains(frequency, "月") || strings.Contains(frequency, "旬") {
  212. frequencyStr = "month"
  213. } else if strings.Contains(frequency, "年") {
  214. frequencyStr = "year"
  215. }
  216. frequencyStr = "_" + frequencyStr
  217. if !strings.Contains(filePath, frequencyStr) {
  218. fpArr := strings.Split(fp, "_")
  219. for k, v := range fpArr {
  220. if k == 0 {
  221. newFilePath = v + frequencyStr
  222. } else {
  223. newFilePath = newFilePath + "_" + v
  224. }
  225. }
  226. newFilePath = dir + newFilePath
  227. } else {
  228. newFilePath = filePath
  229. }
  230. if isAdd == 1 {
  231. indexObj.IndexCode = indexCode
  232. indexObj.IndexName = indexName
  233. indexObj.Unit = unit
  234. indexObj.Source = source
  235. indexObj.Describe = describe
  236. indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  237. indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  238. indexObj.Frequency = frequency
  239. indexObj.FilePath = newFilePath
  240. err = indexObj.Add(runMode)
  241. if err != nil {
  242. fmt.Println("add err:" + err.Error())
  243. return
  244. }
  245. indexId = indexObj.BaseFromMysteelChemicalIndexId
  246. } else if isAdd == 2 {
  247. indexObj.IndexCode = indexCode
  248. indexObj.IndexName = indexName
  249. indexObj.Unit = unit
  250. indexObj.Source = source
  251. indexObj.Describe = describe
  252. indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  253. indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  254. indexObj.Frequency = frequency
  255. indexObj.FilePath = newFilePath
  256. indexObj.ModifyTime = time.Now()
  257. indexId = item.BaseFromMysteelChemicalIndexId
  258. //修改数据
  259. updateColsArr := make([]string, 0)
  260. updateColsArr = append(updateColsArr, "index_name")
  261. updateColsArr = append(updateColsArr, "unit")
  262. updateColsArr = append(updateColsArr, "source")
  263. updateColsArr = append(updateColsArr, "frequency")
  264. updateColsArr = append(updateColsArr, "start_date")
  265. updateColsArr = append(updateColsArr, "end_date")
  266. updateColsArr = append(updateColsArr, "describe")
  267. updateColsArr = append(updateColsArr, "end_date")
  268. updateColsArr = append(updateColsArr, "modify_time")
  269. updateColsArr = append(updateColsArr, "file_path")
  270. indexObj.Update(runMode, updateColsArr)
  271. dataObj := new(index.BaseFromMysteelChemicalData)
  272. //获取已存在的所有数据
  273. dataList, err := dataObj.GetIndexDataList(runMode, indexCode)
  274. if err != nil {
  275. fmt.Println("GetIndexDataList Err:" + err.Error())
  276. return
  277. }
  278. fmt.Println("dataListLen:", len(dataList))
  279. for _, v := range dataList {
  280. dateStr := v.DataTime.Format(utils.FormatDate)
  281. dataMap[dateStr] = v.Value
  282. }
  283. }
  284. }
  285. } else {
  286. var date, value string
  287. for ck, colCell := range row {
  288. if ck == 0 {
  289. date = colCell
  290. } else {
  291. value = colCell
  292. }
  293. }
  294. if _, ok := dataMap[date]; !ok {
  295. dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local)
  296. if err != nil {
  297. fmt.Println("time.ParseInLocation Err:" + err.Error())
  298. return
  299. }
  300. dataItem := new(index.BaseFromMysteelChemicalData)
  301. dataItem.BaseFromMysteelChemicalIndexId = indexId
  302. dataItem.IndexCode = indexCode
  303. dataItem.DataTime = dateTime
  304. dataItem.Value = value
  305. dataItem.CreateTime = time.Now()
  306. dataItem.ModifyTime = time.Now()
  307. dataList = append(dataList, *dataItem)
  308. }
  309. }
  310. }
  311. }
  312. if len(dataList) > 0 {
  313. dataObj := new(index.BaseFromMysteelChemicalData)
  314. err = dataObj.Add(runMode, dataList)
  315. if err != nil {
  316. fmt.Println("dataObj.Add() Err:" + err.Error())
  317. }
  318. }
  319. }
  320. wg.Done()
  321. }()
  322. wg.Wait()
  323. }
  324. // WatchIndexFileMergeRelease 监听生产的合并excel文件的处理
  325. func WatchIndexFileMergeRelease(filePath string) {
  326. fmt.Println("filePath:", filePath)
  327. //return
  328. //filePath:D:\mysteel_data\CM0000568866_release.xlsx
  329. time.Sleep(5 * time.Second)
  330. if !utils.FileIsExist(filePath) {
  331. fmt.Println("filePath is not exist:" + filePath)
  332. return
  333. }
  334. //读取文件内容
  335. global.LOG.Info("WatchFile:" + filePath)
  336. f, err := excelize.OpenFile(filePath)
  337. global.LOG.Info("OpenFile:" + filePath)
  338. if err != nil {
  339. fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
  340. return
  341. }
  342. defer func() {
  343. if err := f.Close(); err != nil {
  344. fmt.Println("FileClose Err:" + err.Error())
  345. return
  346. }
  347. }()
  348. runMode := "release"
  349. var wg = sync.WaitGroup{}
  350. wg.Add(1)
  351. go func() {
  352. sheetList := f.GetSheetList()
  353. for _, sv := range sheetList {
  354. lenRow := 0 //指标数
  355. // excel表的指标数据
  356. indexExcelDataList := make([]map[string]string, 0)
  357. indexNameMap := make(map[int]string)
  358. indexCodeMap := make(map[int]string)
  359. unitMap := make(map[int]string)
  360. sourceMap := make(map[int]string)
  361. frequencyMap := make(map[int]string)
  362. startDateMap := make(map[int]string)
  363. endDateMap := make(map[int]string)
  364. describeMap := make(map[int]string)
  365. rows, err := f.GetRows(sv)
  366. if err != nil {
  367. fmt.Println("GetRows Err:", err)
  368. return
  369. }
  370. for row, cols := range rows {
  371. if row == 0 {
  372. // 第一行是 钢联数据的备注
  373. continue
  374. }
  375. // 指标名称
  376. if row == 1 {
  377. lenRow = len(cols) - 1
  378. for i := 1; i <= lenRow; i++ {
  379. tmpIndexExcelDataList := make(map[string]string, 0)
  380. indexExcelDataList = append(indexExcelDataList, tmpIndexExcelDataList)
  381. }
  382. }
  383. if row < 10 {
  384. for k, colCell := range cols {
  385. switch row {
  386. case 1: //指标名称
  387. indexNameMap[k-1] = colCell
  388. case 2: //单位
  389. unitMap[k-1] = colCell
  390. case 3: //数据来源
  391. sourceMap[k-1] = colCell
  392. case 4: //指标编码
  393. indexCodeMap[k-1] = colCell
  394. case 5: //频度
  395. tmpFrequency := colCell
  396. if !strings.Contains(tmpFrequency, "度") {
  397. tmpFrequency = tmpFrequency + "度"
  398. }
  399. frequencyMap[k-1] = tmpFrequency
  400. case 6: //时间区间
  401. dateArr := strings.Split(colCell, "~")
  402. if len(dateArr) >= 2 {
  403. startDateMap[k-1] = dateArr[0]
  404. endDateMap[k-1] = dateArr[1]
  405. }
  406. case 7: //备注
  407. describeMap[k-1] = colCell
  408. }
  409. }
  410. } else {
  411. date := ``
  412. for k, col := range cols {
  413. if k == 0 {
  414. date = col
  415. continue
  416. }
  417. if date == `` {
  418. continue
  419. }
  420. if col != `` {
  421. indexExcelDataList[k-1][date] = col
  422. }
  423. }
  424. }
  425. }
  426. for k, excelDataMap := range indexExcelDataList {
  427. mysteelIndexHandle(runMode, indexNameMap[k], indexCodeMap[k], unitMap[k], sourceMap[k], frequencyMap[k], startDateMap[k], endDateMap[k], describeMap[k], excelDataMap)
  428. }
  429. }
  430. wg.Done()
  431. }()
  432. wg.Wait()
  433. }
  434. // mysteelIndexHandle 钢联数据处理
  435. func mysteelIndexHandle(runMode, indexName, indexCode, unit, source, frequency, startDate, endDate, describe string, excelDataMap map[string]string) {
  436. var err error
  437. //return
  438. indexObj := new(index.BaseFromMysteelChemicalIndex)
  439. var indexId int64
  440. addDataList := make([]index.BaseFromMysteelChemicalData, 0)
  441. exitDataMap := make(map[string]*index.BaseFromMysteelChemicalData)
  442. // 修改指标信息
  443. if indexName == "" {
  444. global.LOG.Info("未刷新到指标数据:indexName:" + indexName)
  445. return
  446. }
  447. //判断指标是否存在
  448. var isAdd int
  449. item, err := indexObj.GetIndexItem(runMode, indexCode)
  450. if err != nil {
  451. if err.Error() == "record not found" {
  452. isAdd = 1
  453. } else {
  454. isAdd = -1
  455. fmt.Println("GetIndexItem Err:" + err.Error())
  456. return
  457. }
  458. }
  459. if item != nil && item.BaseFromMysteelChemicalIndexId > 0 {
  460. fmt.Println("item:", item)
  461. isAdd = 2
  462. } else {
  463. isAdd = 1
  464. }
  465. fmt.Println("isAdd:", isAdd)
  466. if !strings.Contains(frequency, "度") {
  467. frequency = frequency + "度"
  468. }
  469. if isAdd == 1 {
  470. indexObj.IndexCode = indexCode
  471. indexObj.IndexName = indexName
  472. indexObj.Unit = unit
  473. indexObj.Source = source
  474. indexObj.Describe = describe
  475. indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  476. indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  477. indexObj.Frequency = frequency
  478. err = indexObj.Add(runMode)
  479. if err != nil {
  480. fmt.Println("add err:" + err.Error())
  481. return
  482. }
  483. indexId = indexObj.BaseFromMysteelChemicalIndexId
  484. } else if isAdd == 2 {
  485. indexObj.IndexCode = indexCode
  486. indexObj.IndexName = indexName
  487. indexObj.Unit = unit
  488. indexObj.Source = source
  489. indexObj.Describe = describe
  490. indexObj.StartDate, _ = time.ParseInLocation(utils.FormatDate, startDate, time.Local)
  491. indexObj.EndDate, _ = time.ParseInLocation(utils.FormatDate, endDate, time.Local)
  492. indexObj.Frequency = frequency
  493. indexObj.ModifyTime = time.Now()
  494. indexId = item.BaseFromMysteelChemicalIndexId
  495. //修改数据
  496. updateColsArr := make([]string, 0)
  497. updateColsArr = append(updateColsArr, "index_name")
  498. updateColsArr = append(updateColsArr, "unit")
  499. updateColsArr = append(updateColsArr, "source")
  500. updateColsArr = append(updateColsArr, "frequency")
  501. updateColsArr = append(updateColsArr, "start_date")
  502. updateColsArr = append(updateColsArr, "end_date")
  503. updateColsArr = append(updateColsArr, "describe")
  504. updateColsArr = append(updateColsArr, "end_date")
  505. updateColsArr = append(updateColsArr, "modify_time")
  506. indexObj.Update(runMode, updateColsArr)
  507. dataObj := new(index.BaseFromMysteelChemicalData)
  508. //获取已存在的所有数据
  509. exitDataList, err := dataObj.GetIndexDataList(runMode, indexCode)
  510. if err != nil {
  511. fmt.Println("GetIndexDataList Err:" + err.Error())
  512. return
  513. }
  514. fmt.Println("exitDataListLen:", len(exitDataList))
  515. for _, v := range exitDataList {
  516. dateStr := v.DataTime.Format(utils.FormatDate)
  517. exitDataMap[dateStr] = v
  518. }
  519. }
  520. dataObj := new(index.BaseFromMysteelChemicalData)
  521. // 遍历excel数据,然后跟现有的数据做校验,不存在则入库
  522. for date, value := range excelDataMap {
  523. if findData, ok := exitDataMap[date]; !ok {
  524. dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local)
  525. if err != nil {
  526. fmt.Println("time.ParseInLocation Err:" + err.Error())
  527. return
  528. }
  529. dataItem := new(index.BaseFromMysteelChemicalData)
  530. dataItem.BaseFromMysteelChemicalIndexId = indexId
  531. dataItem.IndexCode = indexCode
  532. dataItem.DataTime = dateTime
  533. dataItem.Value = value
  534. dataItem.CreateTime = time.Now()
  535. dataItem.ModifyTime = time.Now()
  536. addDataList = append(addDataList, *dataItem)
  537. } else {
  538. if findData != nil && findData.Value != value { //修改数据
  539. dataObj.Value = value
  540. dataObj.ModifyTime = time.Now()
  541. dataObj.BaseFromMysteelChemicalDataId = findData.BaseFromMysteelChemicalDataId
  542. updateDataColsArr := make([]string, 0)
  543. updateDataColsArr = append(updateDataColsArr, "value")
  544. updateDataColsArr = append(updateDataColsArr, "modify_time")
  545. dataObj.Update(runMode, updateDataColsArr)
  546. global.LOG.Info(findData.IndexCode + " " + findData.Value + "-" + value)
  547. }
  548. }
  549. }
  550. if len(addDataList) > 0 {
  551. err = dataObj.Add(runMode, addDataList)
  552. if err != nil {
  553. fmt.Println("dataObj.Add() Err:" + err.Error())
  554. }
  555. }
  556. go syncEdbDataMysteelChemical(runMode, indexCode)
  557. }
  558. func syncEdbDataMysteelChemical(runMode, indexCode string) {
  559. indexObj := new(models.EdbInfo)
  560. var isAdd int
  561. item, err := indexObj.GetEdbInfoItem(runMode, indexCode)
  562. if err != nil {
  563. if err.Error() == "record not found" {
  564. isAdd = 1
  565. } else {
  566. isAdd = -1
  567. fmt.Println("GetEdbInfoItem Err:" + err.Error())
  568. return
  569. }
  570. }
  571. if item != nil && item.EdbInfoId > 0 {
  572. fmt.Println("item:", item)
  573. isAdd = 2
  574. } else {
  575. isAdd = 1 //
  576. }
  577. if isAdd == 1 { //新增
  578. return
  579. }
  580. param := make(map[string]interface{})
  581. param["EdbCode"] = indexCode
  582. param["EdbInfoId"] = item.EdbInfoId
  583. param["StartDate"] = item.EndDate
  584. postRefreshEdbData(param)
  585. }
  586. type BaseResponse struct {
  587. Ret int
  588. Msg string
  589. ErrMsg string
  590. ErrCode string
  591. Data interface{}
  592. Success bool `description:"true 执行成功,false 执行失败"`
  593. IsSendEmail bool `json:"-" description:"true 发送邮件,false 不发送邮件"`
  594. IsAddLog bool `json:"-" description:"true 新增操作日志,false 不新增操作日志" `
  595. }
  596. // postRefreshEdbData 刷新指标数据
  597. func postRefreshEdbData(param map[string]interface{}) (resp *BaseResponse, err error) {
  598. urlStr := "mysteel_chemical/refresh"
  599. EDB_LIB_URL := "http://47.102.213.75:8300/edbapi/"
  600. postUrl := EDB_LIB_URL + urlStr
  601. postData, err := json.Marshal(param)
  602. if err != nil {
  603. return
  604. }
  605. result, err := HttpPost(postUrl, string(postData), "application/json")
  606. if err != nil {
  607. return
  608. }
  609. global.LOG.Info(" Refresh Result: " + string(result))
  610. err = json.Unmarshal(result, &resp)
  611. if err != nil {
  612. return
  613. }
  614. return resp, nil
  615. }
  616. func HttpPost(url, postData string, params ...string) ([]byte, error) {
  617. body := ioutil.NopCloser(strings.NewReader(postData))
  618. client := &http.Client{}
  619. req, err := http.NewRequest("POST", url, body)
  620. if err != nil {
  621. return nil, err
  622. }
  623. contentType := "application/x-www-form-urlencoded;charset=utf-8"
  624. if len(params) > 0 && params[0] != "" {
  625. contentType = params[0]
  626. }
  627. req.Header.Set("Content-Type", contentType)
  628. req.Header.Set("authorization", utils.MD5(utils.APP_EDB_LIB_NAME_EN+utils.EDB_LIB_Md5_KEY))
  629. resp, err := client.Do(req)
  630. defer resp.Body.Close()
  631. b, err := ioutil.ReadAll(resp.Body)
  632. fmt.Println("HttpPost:" + string(b))
  633. return b, err
  634. }
  635. /*
  636. CREATE动作即临时文件的创建
  637. WRITE写文件动作
  638. CHMOD修改文件属性
  639. REMOVE删除临时文件。
  640. */