base_from_mtjh.go 7.9 KB


  1. package models
  2. import (
  3. "eta/eta_index_lib/utils"
  4. "fmt"
  5. "github.com/beego/beego/v2/client/orm"
  6. "strconv"
  7. "strings"
  8. "time"
  9. )
  10. type BaseFromMtjhMapping struct {
  11. BaseFromMtjhMappingId int `orm:"column(base_from_Mtjh_mapping_id);pk"`
  12. IndexName string `description:"持买单量指标名称"`
  13. IndexCode string `description:"持买单量指标编码"`
  14. CreateTime time.Time `description:"时间"`
  15. Area string `description:"区域"`
  16. Port string `description:"港口或码头"`
  17. Variety string `description:"品种"`
  18. Unit string `description:"单位"`
  19. Frequency string `description:"频率"`
  20. }
  21. type BaseFromMtjhIndex struct {
  22. BaseFromMtjhIndexId int `orm:"column(base_from_mtjh_index_id);pk"`
  23. IndexName string `description:"持买单量指标名称"`
  24. IndexCode string `description:"持买单量指标编码"`
  25. DealValue string `description:"成交量"`
  26. DataTime string `description:"数据日期"`
  27. Area string `description:"区域"`
  28. Port string `description:"港口或码头"`
  29. Variety string `description:"品种"`
  30. Unit string `description:"单位"`
  31. Frequency string `description:"频率"`
  32. CreateTime time.Time `description:"插入时间"`
  33. ModifyTime time.Time `description:"修改时间"`
  34. }
  35. // 查询指标
  36. func GetBaseFromMtjhMapping() (items []*BaseFromMtjhMapping, err error) {
  37. o := orm.NewOrm()
  38. sql := `SELECT * FROM base_from_mtjh_mapping`
  39. _, err = o.Raw(sql).QueryRows(&items)
  40. return
  41. }
  42. // 查询指标
  43. func GetBaseFromMtjhIndex() (items []*BaseFromMtjhIndex, err error) {
  44. o := orm.NewOrm()
  45. sql := `SELECT * FROM base_from_mtjh_index`
  46. _, err = o.Raw(sql).QueryRows(&items)
  47. return
  48. }
  49. // 添加数据
  50. func AddBaseFromMtjhIndex(item *BaseFromMtjhIndex) (lastId int64, err error) {
  51. o := orm.NewOrm()
  52. lastId, err = o.Insert(item)
  53. return
  54. }
  55. func AddBaseFromMtjhIndexMuti(items []*BaseFromMtjhIndex) (lastId int64, err error) {
  56. o := orm.NewOrm()
  57. lastId, err = o.InsertMulti(500, items)
  58. return
  59. }
  60. // 添加指标
  61. func AddBaseFromMtjhMapping(item *BaseFromMtjhMapping) (lastId int64, err error) {
  62. o := orm.NewOrm()
  63. lastId, err = o.Insert(item)
  64. return
  65. }
  66. func AddBaseFromMtjhMappingMuti(items []*BaseFromMtjhMapping) (lastId int64, err error) {
  67. o := orm.NewOrm()
  68. lastId, err = o.InsertMulti(500, items)
  69. return
  70. }
  71. func UpdateBaseFromMtjhIndex(item *BaseFromMtjhIndex) (err error) {
  72. o := orm.NewOrm()
  73. sql := `UPDATE base_from_mtjh_index SET deal_value=? WHERE index_name=? AND data_time = ?`
  74. _, err = o.Raw(sql, item.DealValue, item.IndexName, item.DataTime).Exec()
  75. return
  76. }
  77. func AddEdbDataFromMtjh(edbCode string) (err error) {
  78. o := orm.NewOrm()
  79. coalBaseDataAll, err := GetMtjhindexByCode(edbCode)
  80. if err != nil && err.Error() != utils.ErrNoRow() {
  81. return
  82. }
  83. var isAdd bool
  84. addSql := ` INSERT INTO edb_data_mtjh(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  85. existMap := make(map[string]string)
  86. for _, sv := range coalBaseDataAll {
  87. eDate := sv.DataTime
  88. var timeStr string
  89. var dataTime time.Time
  90. var sDataTime string
  91. var timestamp int64
  92. sDataTime = eDate
  93. dataTime, err = time.ParseInLocation(utils.FormatDate, eDate, time.Local)
  94. if err != nil {
  95. fmt.Println("time.Parse Err:" + eDate)
  96. return err
  97. }
  98. timestamp = dataTime.UnixNano() / 1e6
  99. timeStr = fmt.Sprintf("%d", timestamp)
  100. value := strings.Replace(sv.DealValue, "%", "", -1)
  101. if _, ok := existMap[sDataTime]; !ok {
  102. addSql += GetAddSql("0", edbCode, sDataTime, timeStr, value)
  103. fmt.Println("edbCode:", edbCode)
  104. fmt.Println("sDataTime:", sDataTime)
  105. fmt.Println("timeStr:", timeStr)
  106. fmt.Println("value:", value)
  107. isAdd = true
  108. }
  109. existMap[eDate] = value
  110. }
  111. if isAdd {
  112. addSql = strings.TrimRight(addSql, ",")
  113. utils.FileLog.Info("addSql:" + addSql)
  114. _, err = o.Raw(addSql).Exec()
  115. if err != nil {
  116. return err
  117. }
  118. }
  119. return
  120. }
  121. func RefreshEdbDataFromMtjh(edbInfoId int, edbCode, startDate string) (err error) {
  122. source := utils.DATA_SOURCE_MTJH
  123. subSource := utils.DATA_SUB_SOURCE_EDB
  124. o := orm.NewOrm()
  125. if err != nil {
  126. return
  127. }
  128. edbInfoIdStr := strconv.Itoa(edbInfoId)
  129. //计算数据
  130. var condition string
  131. var pars []interface{}
  132. if edbCode != "" {
  133. condition += " AND index_code=? "
  134. pars = append(pars, edbCode)
  135. }
  136. if startDate != "" {
  137. condition += " AND data_time>=? "
  138. pars = append(pars, startDate)
  139. }
  140. glDataList, err := GetMtjhDataByTradeCode(condition, pars)
  141. if err != nil {
  142. return
  143. }
  144. // 真实数据的最大日期 , 插入规则配置的日期
  145. var realDataMaxDate, edbDataInsertConfigDate time.Time
  146. var edbDataInsertConfig *EdbDataInsertConfig
  147. var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
  148. {
  149. edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfoId)
  150. if err != nil && err.Error() != utils.ErrNoRow() {
  151. return
  152. }
  153. if edbDataInsertConfig != nil {
  154. edbDataInsertConfigDate = edbDataInsertConfig.Date
  155. }
  156. }
  157. //获取指标所有数据
  158. var existCondition string
  159. var existPars []interface{}
  160. existCondition += " AND edb_info_id=? "
  161. existPars = append(existPars, edbInfoId)
  162. if startDate != "" {
  163. existCondition += " AND data_time>=? "
  164. existPars = append(existPars, startDate)
  165. }
  166. existList, err := GetEdbDataByCondition(source, subSource, existCondition, existPars)
  167. if err != nil {
  168. return err
  169. }
  170. existMap := make(map[string]*EdbInfoSearchData)
  171. for _, v := range existList {
  172. existMap[v.DataTime] = v
  173. }
  174. addSql := ` INSERT INTO edb_data_mtjh(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
  175. var isAdd bool
  176. for _, v := range glDataList {
  177. var value string
  178. value = strings.Replace(v.DealValue, "%", "", -1)
  179. item := v
  180. itemValue := value
  181. if _, ok := existMap[v.DataTime]; !ok {
  182. eDate := item.DataTime
  183. var timeStr string
  184. var dataTime time.Time
  185. var sDataTime string
  186. var timestamp int64
  187. sDataTime = eDate
  188. dataTime, err = time.ParseInLocation(utils.FormatDate, eDate, time.Local)
  189. if err != nil {
  190. fmt.Println("time.Parse Err:" + eDate)
  191. return err
  192. }
  193. timestamp = dataTime.UnixNano() / 1e6
  194. timeStr = fmt.Sprintf("%d", timestamp)
  195. sValue := itemValue
  196. if sValue != "" {
  197. saveValue := sValue
  198. if findItem, ok := existMap[eDate]; !ok {
  199. addSql += GetAddSql(edbInfoIdStr, edbCode, sDataTime, timeStr, saveValue)
  200. isAdd = true
  201. } else {
  202. if findItem != nil && utils.SubFloatToString(findItem.Value, 30) != sValue {
  203. err = ModifyEdbDataById(source, subSource, findItem.EdbDataId, sValue)
  204. if err != nil {
  205. return err
  206. }
  207. }
  208. }
  209. }
  210. // 下面代码主要目的是处理掉手动插入的数据判断
  211. {
  212. if realDataMaxDate.IsZero() || dataTime.After(realDataMaxDate) {
  213. realDataMaxDate = dataTime
  214. }
  215. if edbDataInsertConfigDate.IsZero() || dataTime.Equal(edbDataInsertConfigDate) {
  216. isFindConfigDateRealData = true
  217. }
  218. }
  219. }
  220. }
  221. // 处理手工数据补充的配置
  222. HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfoId, source, subSource, existMap, isFindConfigDateRealData)
  223. if isAdd {
  224. addSql = strings.TrimRight(addSql, ",")
  225. _, err = o.Raw(addSql).Exec()
  226. if err != nil {
  227. return err
  228. }
  229. }
  230. return
  231. }
  232. // GetMtjhindexByCode
  233. func GetMtjhindexByCode(indexCode string) (items []*BaseFromMtjhIndex, err error) {
  234. o := orm.NewOrm()
  235. sql := "SELECT * FROM base_from_mtjh_index WHERE index_code=? "
  236. _, err = o.Raw(sql, indexCode).QueryRows(&items)
  237. return
  238. }
  239. func GetMtjhDataByTradeCode(condition string, pars []interface{}) (item []*BaseFromMtjhIndex, err error) {
  240. sql := ` SELECT * FROM base_from_mtjh_index WHERE 1=1 `
  241. o := orm.NewOrm()
  242. if condition != "" {
  243. sql += condition
  244. }
  245. sql += ` ORDER BY data_time DESC `
  246. _, err = o.Raw(sql, pars).QueryRows(&item)
  247. return
  248. }