base_from_python.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package models
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "github.com/shopspring/decimal"
  7. "hongze/hongze_edb_lib/services"
  8. "hongze/hongze_edb_lib/utils"
  9. "strings"
  10. "time"
  11. )
  12. // EdbDataPython python指标数据结构体
  13. type EdbDataPython struct {
  14. EdbDataId int `orm:"column(edb_data_id);pk"`
  15. EdbInfoId int
  16. EdbCode string
  17. DataTime string
  18. Value float64
  19. CreateTime time.Time
  20. ModifyTime time.Time
  21. DataTimestamp int64
  22. }
  23. // AddPythonEdb 新增python运算指标
  24. func AddPythonEdb(edbInfoId int, edbCode string, item services.EdbDataFromPython) (err error) {
  25. var errMsg string
  26. o := orm.NewOrm()
  27. defer func() {
  28. if err != nil {
  29. go utils.SendEmail(utils.APP_NAME_CN+"【"+utils.RunMode+"】"+"失败提醒", " 同花顺数据获取失败:err:"+errMsg, utils.EmailSendToUsers)
  30. }
  31. }()
  32. var isAdd bool
  33. addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  34. for k, dateTimeStr := range item.Date {
  35. //格式化时间
  36. currentDate, tmpErr := time.Parse(utils.FormatDate, dateTimeStr)
  37. if tmpErr != nil {
  38. err = tmpErr
  39. return
  40. }
  41. timestamp := currentDate.UnixNano() / 1e6
  42. timestampStr := fmt.Sprintf("%d", timestamp)
  43. //值
  44. val := item.Value[k]
  45. saveVal := utils.SubFloatToString(val, 20)
  46. addSql += GetAddSql(fmt.Sprint(edbInfoId), edbCode, dateTimeStr, timestampStr, saveVal)
  47. isAdd = true
  48. }
  49. if isAdd {
  50. addSql = strings.TrimRight(addSql, ",")
  51. _, err = o.Raw(addSql).Exec()
  52. if err != nil {
  53. errMsg = " tx.Exec Err :" + err.Error()
  54. return
  55. }
  56. }
  57. return
  58. }
  59. // EditPythonEdb 编辑python运算指标
  60. func EditPythonEdb(edbInfoId int, edbCode string, item services.EdbDataFromPython) (err error) {
  61. var errMsg string
  62. o := orm.NewOrm()
  63. defer func() {
  64. if err != nil {
  65. go utils.SendEmail(utils.APP_NAME_CN+"【"+utils.RunMode+"】"+"失败提醒", " 同花顺数据获取失败:err:"+errMsg, utils.EmailSendToUsers)
  66. }
  67. }()
  68. var isAdd bool
  69. addSql := ` INSERT INTO edb_data_python (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  70. for k, dateTimeStr := range item.Date {
  71. //格式化时间
  72. currentDate, tmpErr := time.Parse(utils.FormatDate, dateTimeStr)
  73. if tmpErr != nil {
  74. err = tmpErr
  75. return
  76. }
  77. timestamp := currentDate.UnixNano() / 1e6
  78. timestampStr := fmt.Sprintf("%d", timestamp)
  79. //值
  80. val := item.Value[k]
  81. saveVal := utils.SubFloatToString(val, 20)
  82. addSql += GetAddSql(fmt.Sprint(edbInfoId), edbCode, dateTimeStr, timestampStr, saveVal)
  83. isAdd = true
  84. }
  85. if isAdd {
  86. addSql = strings.TrimRight(addSql, ",")
  87. _, err = o.Raw(addSql).Exec()
  88. if err != nil {
  89. errMsg = " tx.Exec Err :" + err.Error()
  90. return
  91. }
  92. }
  93. return
  94. }
  95. // RefreshAllPythonEdb 刷新所有 python运算指标
  96. func RefreshAllPythonEdb(edbInfo *EdbInfo, item services.EdbDataFromPython) (err error) {
  97. o := orm.NewOrm()
  98. to, err := o.Begin()
  99. if err != nil {
  100. return
  101. }
  102. defer func() {
  103. if err != nil {
  104. fmt.Println("RefreshAllPythonEdb,Err:" + err.Error())
  105. _ = to.Rollback()
  106. } else {
  107. _ = to.Commit()
  108. }
  109. }()
  110. pythonDataMap := make(map[string]float64)
  111. pythonDate := make([]string, 0)
  112. for k, dateTimeStr := range item.Date {
  113. pythonDataMap[dateTimeStr] = item.Value[k]
  114. pythonDate = append(pythonDate, dateTimeStr)
  115. }
  116. //查询当前指标现有的数据
  117. var condition string
  118. var pars []interface{}
  119. condition += " AND edb_info_id=? "
  120. pars = append(pars, edbInfo.EdbInfoId)
  121. //所有的数据
  122. dataList, err := GetAllEdbDataPythonByEdbInfoId(edbInfo.EdbInfoId)
  123. if err != nil {
  124. return err
  125. }
  126. //待修改的指标数据map(index:日期,value:值)
  127. updateEdbDataMap := make(map[string]float64)
  128. removeDateList := make([]string, 0) //需要删除的日期
  129. for _, v := range dataList {
  130. currDataTime := v.DataTime
  131. pythonData, ok := pythonDataMap[currDataTime]
  132. if !ok {
  133. // 如果python运算出来的数据中没有该日期,那么需要移除该日期的数据
  134. removeDateList = append(removeDateList, currDataTime)
  135. } else {
  136. currValue, _ := decimal.NewFromFloat(pythonData).Truncate(4).Float64() //保留4位小数
  137. //如果计算出来的值与库里面的值不匹配,那么就去修改该值
  138. if v.Value != currValue {
  139. //将计算后的数据存入待拼接指标map里面,以便后续计算
  140. updateEdbDataMap[currDataTime] = currValue
  141. }
  142. }
  143. //移除python指标数据中当天的日期
  144. delete(pythonDataMap, currDataTime)
  145. }
  146. //sort.Strings(tbzEdbDataTimeList)
  147. //新增的数据入库
  148. {
  149. addDataList := make([]*EdbDataPython, 0)
  150. for dataTime, dataValue := range pythonDataMap {
  151. //时间戳
  152. currentDate, _ := time.Parse(utils.FormatDate, dataTime)
  153. timestamp := currentDate.UnixNano() / 1e6
  154. edbDataPython := &EdbDataPython{
  155. EdbInfoId: edbInfo.EdbInfoId,
  156. EdbCode: edbInfo.EdbCode,
  157. DataTime: dataTime,
  158. Value: dataValue,
  159. CreateTime: time.Now(),
  160. ModifyTime: time.Now(),
  161. DataTimestamp: timestamp,
  162. }
  163. addDataList = append(addDataList, edbDataPython)
  164. }
  165. //最后如果还有需要新增的数据,那么就统一入库
  166. if len(addDataList) > 0 {
  167. _, tmpErr := o.InsertMulti(len(addDataList), addDataList)
  168. if tmpErr != nil {
  169. err = tmpErr
  170. return
  171. }
  172. }
  173. }
  174. //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了)
  175. {
  176. if len(removeDateList) > 0 {
  177. removeDateStr := strings.Join(removeDateList, `","`)
  178. removeDateStr = `"` + removeDateStr + `"`
  179. //如果拼接指标变更了,那么需要删除所有的指标数据
  180. tableName := GetEdbDataTableName(edbInfo.Source)
  181. sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
  182. _, err = o.Raw(sql, edbInfo.EdbInfoId).Exec()
  183. if err != nil {
  184. err = errors.New("删除不存在的Python运算指标数据失败,Err:" + err.Error())
  185. return
  186. }
  187. }
  188. }
  189. //修改现有的数据中对应的值
  190. {
  191. tableName := GetEdbDataTableName(edbInfo.Source)
  192. for edbDate, edbDataValue := range updateEdbDataMap {
  193. sql := fmt.Sprintf(` UPDATE %s set value = ?,modify_time=now() WHERE edb_info_id = ? and data_time = ? `, tableName)
  194. _, err = o.Raw(sql, edbDataValue, edbInfo.EdbInfoId, edbDate).Exec()
  195. if err != nil {
  196. err = errors.New("更新现有的Python运算指标数据失败,Err:" + err.Error())
  197. return
  198. }
  199. }
  200. }
  201. return
  202. }
  203. // GetAllEdbDataPythonByEdbInfoId 根据指标id获取全部的数据
  204. func GetAllEdbDataPythonByEdbInfoId(edbInfoId int) (items []*EdbDataPython, err error) {
  205. o := orm.NewOrm()
  206. sql := ` SELECT * FROM edb_data_python WHERE edb_info_id=? ORDER BY data_time DESC `
  207. _, err = o.Raw(sql, edbInfoId).QueryRows(&items)
  208. return
  209. }
  210. // EdbInfoPythonSaveReq 计算(运算)指标请求参数
  211. type EdbInfoPythonSaveReq struct {
  212. AdminId int `description:"添加人id"`
  213. AdminName string `description:"添加人名称"`
  214. EdbName string `description:"指标名称"`
  215. Frequency string `description:"频率"`
  216. Unit string `description:"单位"`
  217. ClassifyId int `description:"分类id"`
  218. CalculateFormula string `description:"计算公式"`
  219. EdbInfoIdArr []struct {
  220. EdbInfoId int `description:"指标id"`
  221. FromTag string `description:"指标对应标签"`
  222. }
  223. }
  224. // ExecPythonEdbReq 执行python代码运算指标的请求参数
  225. type ExecPythonEdbReq struct {
  226. PythonCode string `description:"python代码"`
  227. }
  228. // AddPythonEdbReq 添加python代码运算指标的请求参数
  229. type AddPythonEdbReq struct {
  230. AdminId int `description:"添加人id"`
  231. AdminName string `description:"添加人名称"`
  232. EdbInfoId int `description:"指标id"`
  233. EdbName string `description:"指标名称"`
  234. Frequency string `description:"频度"`
  235. Unit string `description:"单位"`
  236. ClassifyId int `description:"分类id"`
  237. PythonCode string `description:"python代码"`
  238. }
  239. // EdbPythonEdbReq 编辑python代码运算指标的请求参数
  240. type EdbPythonEdbReq struct {
  241. EdbName string `description:"指标名称"`
  242. Frequency string `description:"频度"`
  243. Unit string `description:"单位"`
  244. ClassifyId int `description:"分类id"`
  245. PythonCode string `description:"python代码"`
  246. }