Selaa lähdekoodia

新增指标文件监听

longyu 2 vuotta sitten
vanhempi
commit
f1653878cd
10 muutettua tiedostoa jossa 308 lisäystä ja 60 poistoa
  1. 9 4
      controller/index/index.go
  2. 1 1
      core/run_server.go
  3. 2 1
      init_serve/task.go
  4. 2 0
      main.go
  5. 14 1
      models/index/index.go
  6. 41 0
      models/index/index_data.go
  7. 1 0
      routers/index.go
  8. 0 53
      task/watch.go
  9. 1 0
      utils/index_files.go
  10. 237 0
      watch/watch.go

+ 9 - 4
controller/index/index.go

@@ -5,12 +5,12 @@ import (
 	"fmt"
 	"github.com/gin-gonic/gin"
 	"github.com/go-playground/validator/v10"
+	"github.com/xuri/excelize/v2"
 	"hongze/mysteel_watch/controller/resp"
 	"hongze/mysteel_watch/global"
 	"hongze/mysteel_watch/models/index"
 	"hongze/mysteel_watch/utils"
-
-	"github.com/xuri/excelize/v2"
+	"hongze/mysteel_watch/watch"
 )
 
 type IndexController struct {
@@ -35,7 +35,6 @@ func (s *IndexController) Create(c *gin.Context) {
 		return
 	}
 	fmt.Println("indexCode:" + req.IndexCode)
-	fmt.Println("indexName:" + req.IndexName)
 
 	//fileName := req.IndexName + "_" + req.IndexCode + ".xlsx"
 	fileName := req.IndexCode + ".xlsx"
@@ -76,6 +75,12 @@ func (s *IndexController) Create(c *gin.Context) {
 	return
 }
 
+func (s *IndexController) Test(c *gin.Context) {
+	watch.TestWatch()
+	resp.OkData("检测成功", 1, c)
+	return
+}
+
 type IndexComment struct {
 	BlankValue      string       `json:"BlankValue"`
 	CanMark         bool         `json:"CanMark"`
@@ -173,4 +178,4 @@ type ModelsInfo struct {
 //commentItem.Ver = 3
 
 //8.136.199.33
-//datamysteel.hzinsights.com
+//datamysteel.hzinsights.com

+ 1 - 1
core/run_server.go

@@ -10,7 +10,7 @@ func RunServe() {
 	//初始化路由
 	r := init_serve.InitRouter()
 	//初始化mysql数据库
-	//init_serve.Mysql()
+	init_serve.Mysql()
 	//如果配置了redis,那么链接redis
 	//if global.CONFIG.Serve.UseRedis {
 	//	//初始化redis

+ 2 - 1
init_serve/task.go

@@ -2,6 +2,7 @@ package init_serve
 
 import (
 	"hongze/mysteel_watch/task"
+	"hongze/mysteel_watch/watch"
 )
 
 func InitTask()  {
@@ -12,5 +13,5 @@ func InitTask()  {
 		}
 	}()
 
-	go task.ListenFolderNew()
+	go watch.ListenFolderNew()
 }

+ 2 - 0
main.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"hongze/mysteel_watch/core"
+	"hongze/mysteel_watch/watch"
 )
 
 // @title 弘则人力资源管理系统API接口文档
@@ -20,4 +21,5 @@ import (
 // @BasePath /
 func main() {
 	core.RunServe()
+	go watch.ListenFolderNew()
 }

+ 14 - 1
models/index/index.go

@@ -1,6 +1,8 @@
 package index
 
 import (
+	"context"
+	"fmt"
 	"hongze/mysteel_watch/global"
 	"hongze/mysteel_watch/models/base"
 )
@@ -12,10 +14,11 @@ type BaseFromMysteelChemicalIndex struct {
 	IndexName                      string `gorm:"column:index_name" json:"index_name"`
 	Unit                           string `gorm:"column:unit" json:"unit"`
 	Source                         string `gorm:"column:source" json:"source"`
+	Frequency                      string `gorm:"column:frequency" json:"frequency"`
 	StartDate                      string `gorm:"column:start_date" json:"start_date"`
 	EndDate                        string `gorm:"column:end_date" json:"end_date"`
 	Describe                       string `gorm:"column:describe" json:"describe"`
-	Update_week                    string `gorm:"column:update_week" json:"update_week"`
+	UpdateWeek                     string `gorm:"column:update_week" json:"update_week"`
 	SysUserId                      int    `gorm:"column:sys_user_id" json:"sys_user_id"`
 	SysUserRealName                string `gorm:"column:sys_user_real_name" json:"sys_user_real_name"`
 	base.TimeBase
@@ -29,6 +32,8 @@ func (r *BaseFromMysteelChemicalIndex) TableName() string {
 // 新增
 func (r *BaseFromMysteelChemicalIndex) Add() (err error) {
 	err = global.DEFAULT_MYSQL.Create(r).Error
+	fmt.Println("主键")
+	fmt.Println(r.BaseFromMysteelChemicalIndexId)
 	return
 }
 
@@ -41,3 +46,11 @@ func (r *BaseFromMysteelChemicalIndex) Update(updateCols []string) (err error) {
 type IndexAddReq struct {
 	IndexCode string `json:"IndexCode" binding:"required"` //指标编码
 }
+
+func (d *BaseFromMysteelChemicalIndex) GetIndexItem(indexCode string) (item *BaseFromMysteelChemicalIndex, err error) {
+	err = global.DEFAULT_MYSQL.WithContext(context.TODO()).Model(d).
+		Where("index_code = ?", indexCode).First(&item).Error
+	return
+}
+
+

+ 41 - 0
models/index/index_data.go

@@ -0,0 +1,41 @@
+package index
+
+import (
+	"context"
+	"hongze/mysteel_watch/global"
+	"hongze/mysteel_watch/models/base"
+	"time"
+)
+
+// 钢联化工指标数据
+type BaseFromMysteelChemicalData struct {
+	BaseFromMysteelChemicalDataId  int64     `gorm:"primaryKey;column:base_from_mysteel_chemical_data_id" json:"base_from_mysteel_chemical_data_id"` //序号
+	BaseFromMysteelChemicalIndexId int64     `gorm:"column:base_from_mysteel_chemical_index_id" json:"base_from_mysteel_chemical_index_id"`
+	IndexCode                      string    `gorm:"column:index_code" json:"index_code"`
+	DataTime                       time.Time `gorm:"column:data_time" json:"data_time"`
+	Value                          string    `gorm:"column:value" json:"value"`
+	base.TimeBase
+}
+
+// TableName get sql table name.获取数据库表名
+func (r *BaseFromMysteelChemicalData) TableName() string {
+	return "base_from_mysteel_chemical_data"
+}
+
+// 新增
+func (r *BaseFromMysteelChemicalData) Add(list []BaseFromMysteelChemicalData) (err error) {
+	err = global.DEFAULT_MYSQL.Create(list).Error
+	return
+}
+
+// 修改
+func (r *BaseFromMysteelChemicalData) Update(updateCols []string) (err error) {
+	err = global.DEFAULT_MYSQL.Model(r).Select(updateCols).Updates(r).Error
+	return
+}
+
+func (d *BaseFromMysteelChemicalData) GetIndexDataList(indexCode string) (item []*BaseFromMysteelChemicalData, err error) {
+	err = global.DEFAULT_MYSQL.WithContext(context.TODO()).Model(d).
+		Where("index_code = ?", indexCode).Find(&item).Error
+	return
+}

+ 1 - 0
routers/index.go

@@ -9,5 +9,6 @@ func InitIndex(group *gin.RouterGroup) {
 	//指标
 	indexController := new(index.IndexController)
 	group.GET("server_check",indexController.ServerCheck)
+	group.GET("test",indexController.Test)
 	group.POST("create", indexController.Create)
 }

+ 0 - 53
task/watch.go

@@ -1,53 +0,0 @@
-package task
-
-import (
-	"fmt"
-	"hongze/mysteel_watch/utils"
-	"log"
-	"strings"
-	"time"
-
-	"github.com/fsnotify/fsnotify"
-)
-
-func ListenFolderNew() {
-	fmt.Println("-----文件夹监听-------")
-	watcher, err := fsnotify.NewWatcher()
-	if err != nil {
-		log.Fatal(err)
-	}
-	defer watcher.Close()
-
-	done2 := make(chan bool)
-	go func() {
-		for {
-			select {
-			case event, ok := <-watcher.Events:
-				fmt.Println("start event")
-				fmt.Println(event, ok)
-				fmt.Println("end event")
-				if !ok {
-					return
-				}
-				fmt.Println("event.Op=>%#v", event.Op)
-				fmt.Println("文件操作类型判断是不是新建一个文件:%#v", event.Op&fsnotify.Create == fsnotify.Create)
-				if event.Op&fsnotify.Create == fsnotify.Create {
-					fmt.Println("*Create**event")
-					fmt.Println("新的文件:", event.Name)
-					mate := strings.Split(event.Name, "\\")
-					fileName := mate[len(mate)-1]
-					fmt.Println("fileName:", fileName)
-				}
-			case err := <-watcher.Errors:
-				log.Println("error:", err)
-			case <-time.After(60 * time.Second):
-				continue
-			}
-		}
-	}()
-	err = watcher.Add(utils.IndexSaveDir)
-	if err != nil {
-		log.Fatal(err)
-	}
-	<-done2
-}

+ 1 - 0
utils/index_files.go

@@ -2,4 +2,5 @@ package utils
 
 const (
 	IndexSaveDir="D:\\hz\\mysteel_data\\"
+	//IndexSaveDir="E:\\files"
 )

+ 237 - 0
watch/watch.go

@@ -0,0 +1,237 @@
+package watch
+
+import (
+	"fmt"
+	"hongze/mysteel_watch/models/index"
+	"hongze/mysteel_watch/utils"
+	"log"
+	"strings"
+	"time"
+
+	"github.com/fsnotify/fsnotify"
+	"github.com/xuri/excelize/v2"
+)
+
+func ListenFolderNew() {
+	fmt.Println("-----文件夹监听-------")
+	watcher, err := fsnotify.NewWatcher()
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer watcher.Close()
+
+	done2 := make(chan bool)
+	go func() {
+		for {
+			select {
+			case event, ok := <-watcher.Events:
+				if ok && event.Op == fsnotify.Create &&
+					!strings.Contains(event.Name, "tmp") &&
+					!strings.Contains(event.Name, ".TMP") &&
+					!strings.Contains(event.Name, "~") &&
+					(strings.Contains(event.Name, "xlsx") || strings.Contains(event.Name, "xls")) {
+					WatchIndexFile(event.Name)
+				}
+			case err := <-watcher.Errors:
+				log.Println("error:", err)
+			case <-time.After(60 * time.Second):
+				continue
+			}
+		}
+	}()
+	err = watcher.Add(utils.IndexSaveDir)
+	if err != nil {
+		log.Fatal(err)
+	}
+	<-done2
+}
+
+//检测指标文件
+func WatchIndexFile(filePath string) {
+	fmt.Println("filePath:", filePath)
+	time.Sleep(2 * time.Second)
+	//读取文件内容
+	f, err := excelize.OpenFile(filePath)
+	if err != nil {
+		fmt.Println("OpenFile:" + filePath + ",Err:" + err.Error())
+		return
+	}
+	defer func() {
+		if err := f.Close(); err != nil {
+			fmt.Println("FileClose Err:" + err.Error())
+			return
+		}
+	}()
+
+	sheetList := f.GetSheetList()
+	for _, sv := range sheetList {
+
+		var indexName, indexCode, unit, source, frequency, startDate, endDate, describe string
+		var indexId int64
+		rows, err := f.GetRows(sv)
+		if err != nil {
+			fmt.Println("f.GetRows:err:" + err.Error())
+			return
+		}
+
+		indexObj := new(index.BaseFromMysteelChemicalIndex)
+		dataList := make([]index.BaseFromMysteelChemicalData, 0)
+
+		dataMap := make(map[string]string)
+		for rk, row := range rows {
+			if rk > 0 {
+				if rk < 10 {
+					for ck, colCell := range row {
+						if ck == 1 {
+							if rk == 1 {
+								indexName = colCell
+							}
+							if rk == 2 {
+								unit = colCell
+							}
+							if rk == 3 {
+								source = colCell
+							}
+							if rk == 4 {
+								indexCode = colCell
+							}
+							if rk == 5 {
+								frequency = colCell
+							}
+							if rk == 6 {
+								dateArr := strings.Split(colCell, "~")
+								if len(dateArr) >= 2 {
+									startDate = dateArr[0]
+									endDate = dateArr[1]
+								}
+							}
+							if rk == 7 {
+								describe = colCell
+							}
+						}
+					}
+
+					if rk == 9 {
+						//判断指标是否存在
+						var isAdd int
+						item, err := indexObj.GetIndexItem(indexCode)
+						if err != nil {
+							if err.Error() == "record not found" {
+								isAdd = 1
+							} else {
+								isAdd = -1
+								fmt.Println("GetIndexItem Err:" + err.Error())
+								return
+							}
+						}
+						if item != nil && item.BaseFromMysteelChemicalIndexId > 0 {
+							fmt.Println("item:", item)
+							isAdd = 2
+						} else {
+							isAdd = 1
+						}
+
+						fmt.Println("isAdd:", isAdd)
+
+						if isAdd == 1 {
+							indexObj.IndexCode = indexCode
+							indexObj.IndexName = indexName
+							indexObj.Unit = unit
+							indexObj.Source = source
+							indexObj.Describe = describe
+							indexObj.StartDate = startDate
+							indexObj.EndDate = endDate
+							indexObj.Frequency = frequency
+							err = indexObj.Add()
+							if err != nil {
+								fmt.Println("add err:" + err.Error())
+								return
+							}
+							indexId = indexObj.BaseFromMysteelChemicalIndexId
+						} else if isAdd == 2 {
+							indexObj.IndexCode = indexCode
+							indexObj.IndexName = indexName
+							indexObj.Unit = unit
+							indexObj.Source = source
+							indexObj.Describe = describe
+							indexObj.StartDate = startDate
+							indexObj.EndDate = endDate
+							indexObj.Frequency = frequency
+							indexObj.ModifyTime = time.Now()
+							indexId = item.BaseFromMysteelChemicalIndexId
+							//修改数据
+							updateColsArr := make([]string, 0)
+							updateColsArr = append(updateColsArr, "index_name")
+							updateColsArr = append(updateColsArr, "unit")
+							updateColsArr = append(updateColsArr, "source")
+							updateColsArr = append(updateColsArr, "frequency")
+							updateColsArr = append(updateColsArr, "start_date")
+							updateColsArr = append(updateColsArr, "end_date")
+							updateColsArr = append(updateColsArr, "describe")
+							updateColsArr = append(updateColsArr, "end_date")
+							updateColsArr = append(updateColsArr, "modify_time")
+							indexObj.Update(updateColsArr)
+
+							dataObj := new(index.BaseFromMysteelChemicalData)
+							//获取已存在的所有数据
+							dataList, err := dataObj.GetIndexDataList(indexCode)
+							if err != nil {
+								fmt.Println("GetIndexDataList Err:" + err.Error())
+								return
+							}
+							fmt.Println("dataListLen:", len(dataList))
+							for _, v := range dataList {
+								dateStr := v.DataTime.Format(utils.FormatDate)
+								dataMap[dateStr] = v.Value
+							}
+						}
+					}
+				} else {
+					var date, value string
+					for ck, colCell := range row {
+						if ck == 0 {
+							date = colCell
+						} else {
+							value = colCell
+						}
+					}
+					if _, ok := dataMap[date]; !ok {
+						dateTime, err := time.ParseInLocation(utils.FormatDate, date, time.Local)
+						if err != nil {
+							fmt.Println("time.ParseInLocation Err:" + err.Error())
+							return
+						}
+						dataItem := new(index.BaseFromMysteelChemicalData)
+						dataItem.BaseFromMysteelChemicalIndexId = indexId
+						dataItem.IndexCode = indexCode
+						dataItem.DataTime = dateTime
+						dataItem.Value = value
+						dataItem.CreateTime = time.Now()
+						dataItem.ModifyTime = time.Now()
+						dataList = append(dataList, *dataItem)
+					}
+				}
+			}
+		}
+
+		if len(dataList) > 0 {
+			dataObj := new(index.BaseFromMysteelChemicalData)
+			err = dataObj.Add(dataList)
+			if err != nil {
+				fmt.Println("dataObj.Add() Err:" + err.Error())
+			}
+		}
+	}
+}
+
+func TestWatch() {
+	filePath := `D:\hz\mysteel_data\钢联数据_煤炭:运费价格:印尼→中国:巴拿马型(日)_2022-9-4_1662303985672.xlsx`
+	WatchIndexFile(filePath)
+}
+
+/*
+CREATE动作即临时文件的创建
+WRITE写文件动作
+CHMOD修改文件属性
+REMOVE删除临时文件。
+*/