edb_data_coal.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package data_manage
  2. import (
  3. "fmt"
  4. "github.com/beego/beego/v2/client/orm"
  5. "hongze/hz_crm_api/utils"
  6. "strconv"
  7. "strings"
  8. "time"
  9. )
  10. type CoalData struct {
  11. InputValue string `orm:"column(DATA_VALUE)" description:"日期"`
  12. DataTime string `orm:"column(DATA_DATE)" description:"值"`
  13. }
  14. type BaseFromCoalDataSimple struct {
  15. Id int `orm:"column(base_from_trade_coal_index_id);pk"`
  16. DealCode string
  17. BuyCode string
  18. SoldCode string
  19. DataTime string
  20. DealValue string
  21. BuyValue string
  22. SoldValue string
  23. }
  24. type BaseInfoFromCoal struct {
  25. IndexName string
  26. Frequency string
  27. Unit string
  28. }
  29. type BaseFromTradeCoalIndex struct {
  30. BaseFromTradeCoalIndexId int `orm:"column(base_from_trade_coal_index_id);pk"`
  31. Rank int
  32. DealShortName string
  33. DealName string
  34. DealCode string
  35. DealValue string
  36. DealChange int
  37. BuyShortName string
  38. BuyName string
  39. BuyCode string
  40. BuyValue string
  41. BuyChange int
  42. SoldShortName string
  43. SoldName string
  44. SoldCode string
  45. SoldValue string
  46. SoldChange int
  47. Frequency string
  48. ClassifyName string
  49. ClassifyType string
  50. CreateTime time.Time
  51. ModifyTime time.Time
  52. DataTime string
  53. }
  54. func GetEdbDataCoalMaxOrMinDate(edbCode string) (minDate, maxDate string, err error) {
  55. o := orm.NewOrmUsingDB("data")
  56. sql := ` SELECT MIN(data_time) AS minDate,MAX(data_time) AS maxDate FROM edb_data_coal WHERE edb_code=? `
  57. err = o.Raw(sql, edbCode).QueryRow(&minDate, &maxDate)
  58. return
  59. }
  60. // RefreshEdbDataByCoal 刷新煤炭网指标数据
  61. func RefreshEdbDataByCoal(edbInfoId int, edbCode, startDate, endDate string) (err error) {
  62. o := orm.NewOrmUsingDB("data")
  63. to, err := o.Begin()
  64. if err != nil {
  65. return
  66. }
  67. defer func() {
  68. if err != nil {
  69. _ = to.Rollback()
  70. } else {
  71. _ = to.Commit()
  72. }
  73. }()
  74. var suffix string
  75. if strings.Contains(edbCode, "jsm") {
  76. suffix = "jsm_index"
  77. } else if strings.Contains(edbCode, "company") {
  78. suffix = "company_index"
  79. } else if strings.Contains(edbCode, "firm") {
  80. suffix = "firm_index"
  81. } else if strings.Contains(edbCode, "coastal") {
  82. suffix = "coastal_index"
  83. } else if strings.Contains(edbCode, "inland") {
  84. suffix = "inland_index"
  85. }
  86. edbInfoIdStr := strconv.Itoa(edbInfoId)
  87. //计算数据
  88. var condition string
  89. var pars []interface{}
  90. if edbCode != "" {
  91. condition += " AND index_code=? "
  92. pars = append(pars, edbCode)
  93. }
  94. if startDate != "" {
  95. condition += " AND data_time>=? "
  96. pars = append(pars, startDate)
  97. }
  98. if endDate != "" {
  99. condition += " AND data_time<=? "
  100. pars = append(pars, endDate)
  101. }
  102. coalDataList, err := GetCoalDataByTradeCode(condition, suffix, pars)
  103. addSql := ` INSERT INTO edb_data_coal(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values `
  104. var isAdd bool
  105. existMap := make(map[string]string)
  106. for _, v := range coalDataList {
  107. item := v
  108. itemValue := v.DealValue
  109. if _, ok := existMap[v.DataTime]; !ok {
  110. count, err := GetEdbDataCoalByCodeAndDate(edbCode, v.DataTime)
  111. if err != nil && err.Error() != utils.ErrNoRow() {
  112. return err
  113. }
  114. if count <= 0 {
  115. eDate := item.DataTime
  116. sValue := itemValue
  117. if sValue != "" {
  118. dataTime, err := time.Parse(utils.FormatDate, eDate)
  119. if err != nil {
  120. return err
  121. }
  122. timestamp := dataTime.UnixNano() / 1e6
  123. timeStr := fmt.Sprintf("%d", timestamp)
  124. addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, sValue)
  125. isAdd = true
  126. }
  127. } else {
  128. err = ModifyEdbDataCoal(int64(edbInfoId), v.DataTime, v.DealValue)
  129. if err != nil {
  130. return err
  131. }
  132. }
  133. }
  134. existMap[v.DataTime] = v.DealValue
  135. }
  136. if isAdd {
  137. addSql = strings.TrimRight(addSql, ",")
  138. _, err = to.Raw(addSql).Exec()
  139. if err != nil {
  140. return err
  141. }
  142. }
  143. return
  144. }
  145. // RefreshAllEdbDataByCoal 全部刷新中金所
  146. func RefreshAllEdbDataByCoal(edbInfoId, source int, edbCode, startDate, endDate string) (err error) {
  147. o := orm.NewOrmUsingDB("data")
  148. to, err := o.Begin()
  149. if err != nil {
  150. return
  151. }
  152. defer func() {
  153. if err != nil {
  154. _ = to.Rollback()
  155. } else {
  156. _ = to.Commit()
  157. }
  158. }()
  159. var suffix string
  160. if strings.Contains(edbCode, "jsm") {
  161. suffix = "jsm_index"
  162. } else if strings.Contains(edbCode, "company") {
  163. suffix = "company_index"
  164. } else if strings.Contains(edbCode, "firm") {
  165. suffix = "firm_index"
  166. } else if strings.Contains(edbCode, "coastal") {
  167. suffix = "coastal_index"
  168. } else if strings.Contains(edbCode, "inland") {
  169. suffix = "inland_index"
  170. }
  171. edbInfoIdStr := strconv.Itoa(edbInfoId)
  172. //计算数据
  173. var condition string
  174. var pars []interface{}
  175. if edbCode != "" {
  176. condition += " AND index_code=? "
  177. pars = append(pars, edbCode)
  178. }
  179. if startDate != "" {
  180. condition += " AND data_time>=? "
  181. pars = append(pars, startDate)
  182. }
  183. if endDate != "" {
  184. condition += " AND data_time<=? "
  185. pars = append(pars, endDate)
  186. }
  187. coalDataList, err := GetCoalDataByTradeCode(condition, suffix, pars)
  188. //获取指标所有数据
  189. dataList := make([]*EdbDataBase, 0)
  190. dataTableName := GetEdbDataTableName(source)
  191. sql := `SELECT * FROM %s WHERE edb_info_id=? `
  192. sql = fmt.Sprintf(sql, dataTableName)
  193. _, err = to.Raw(sql, edbInfoId).QueryRows(&dataList)
  194. if err != nil {
  195. return err
  196. }
  197. dataMap := make(map[string]string)
  198. for _, v := range dataList {
  199. dataMap[v.DataTime] = v.Value
  200. }
  201. addSql := ` INSERT INTO edb_data_coal(edb_info_id,edb_code,data_time,value,create_time,modify_time,status,data_timestamp) values `
  202. var isAdd bool
  203. existMap := make(map[string]string)
  204. for _, v := range coalDataList {
  205. item := v
  206. itemValue := v.DealValue
  207. if _, ok := existMap[v.DataTime]; !ok {
  208. eDate := item.DataTime
  209. sValue := itemValue
  210. if sValue != "" {
  211. dataTime, err := time.Parse(utils.FormatDate, eDate)
  212. if err != nil {
  213. return err
  214. }
  215. timestamp := dataTime.UnixNano() / 1e6
  216. timeStr := fmt.Sprintf("%d", timestamp)
  217. saveValue := sValue
  218. if existVal, ok := dataMap[eDate]; !ok {
  219. addSql += GetAddSql(edbInfoIdStr, edbCode, eDate, timeStr, saveValue)
  220. isAdd = true
  221. } else {
  222. if existVal != saveValue {
  223. sql := ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  224. sql = fmt.Sprintf(sql, dataTableName)
  225. _, err = to.Raw(sql, sValue, edbInfoId, eDate).Exec()
  226. if err != nil {
  227. return err
  228. }
  229. }
  230. }
  231. }
  232. }
  233. existMap[v.DataTime] = v.DataTime
  234. }
  235. if isAdd {
  236. addSql = strings.TrimRight(addSql, ",")
  237. _, err = to.Raw(addSql).Exec()
  238. if err != nil {
  239. return err
  240. }
  241. }
  242. return
  243. }
  244. // GetBaseInfoFromCoalByIndexCode 获取指标信息
  245. func GetBaseInfoFromCoalByIndexCode(indexCode, suffix string) (list []*BaseInfoFromCoal, err error) {
  246. o := orm.NewOrmUsingDB("data")
  247. sql := `SELECT * FROM base_from_coalmine_%s WHERE index_code=? `
  248. sql = fmt.Sprintf(sql, suffix)
  249. _, err = o.Raw(sql, indexCode).QueryRows(&list)
  250. return
  251. }
  252. func GetCoalDataByTradeCode(condition, suffix string, pars []interface{}) (item []*BaseFromCoalDataSimple, err error) {
  253. sql := ` SELECT * FROM base_from_coalmine_%s WHERE 1=1 `
  254. sql = fmt.Sprintf(sql, suffix)
  255. o := orm.NewOrmUsingDB("data")
  256. if condition != "" {
  257. sql += condition
  258. }
  259. sql += ` ORDER BY data_time DESC `
  260. _, err = o.Raw(sql, pars).QueryRows(&item)
  261. return
  262. }
  263. func GetEdbDataCoalByCodeAndDate(edbCode string, startDate string) (count int, err error) {
  264. o := orm.NewOrmUsingDB("data")
  265. sql := ` SELECT COUNT(1) AS count FROM edb_data_coal WHERE edb_code=? AND data_time=? `
  266. err = o.Raw(sql, edbCode, startDate).QueryRow(&count)
  267. return
  268. }
  269. func ModifyEdbDataCoal(edbInfoId int64, dataTime, value string) (err error) {
  270. o := orm.NewOrmUsingDB("data")
  271. sql := ` UPDATE edb_data_coal SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? `
  272. _, err = o.Raw(sql, value, edbInfoId, dataTime).Exec()
  273. return
  274. }