edb_source.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package data_manage
  2. import (
  3. "eta_gn/eta_task/global"
  4. "eta_gn/eta_task/utils"
  5. "fmt"
  6. "strings"
  7. )
  8. var (
  9. EdbDataTableNameMap map[int]string // 指标来源对应数据表名
  10. EdbDataRefreshMethodMap map[int]string // 指标来源对应的刷新指标方法
  11. EdbTableNameSourceMap map[string]*EdbSource // 数据表名对应的指标来源
  12. EdbSourceIdMap map[int]*EdbSource // 指标来源
  13. EdbSourceExtendIdMap map[string]int // 指标来源字符串对应来源ID
  14. EdbNameSourceMap map[string]*EdbSource // 数据来源名对应的指标来源
  15. )
  16. // EdbSource 指标来源表
  17. type EdbSource struct {
  18. EdbSourceId int `gorm:"column:edb_source_id;primaryKey"` // `orm:"column(edb_source_id);pk"`
  19. SourceName string `description:"指标来源名称"`
  20. TableName string `description:"数据表名"`
  21. EdbAddMethod string `description:"指标新增接口"`
  22. EdbRefreshMethod string `description:"指标刷新接口"`
  23. IsBase int `description:"是否为基础指标: 0-否; 1-是"`
  24. FromBridge int `description:"是否来源于桥接服务: 0-否; 1-是"`
  25. BridgeFlag string `description:"桥接服务对象标识"`
  26. SourceExtend string `description:"扩展字段做查询用"`
  27. EdbCodeRequired int32 `gorm:"column:edb_code_required;type:tinyint(4);comment:指标编码是否必填: 0-否; 1-是;not null;default:0;"` // 指标编码是否必填: 0-否; 1-是
  28. IndexTableName string `gorm:"column:index_table_name;type:varchar(128);comment:源指标表名;not null;"` // 源指标表名
  29. SourceNameEn string `gorm:"column:source_name_en;type:varchar(128);comment:指标来源名称-英文;not null;"` // 指标来源名称-英文
  30. }
  31. // GetEdbSourceItemsByCondition 获取指标来源列表
  32. func GetEdbSourceItemsByCondition(condition string, pars []interface{}, fieldArr []string, orderRule string) (items []*EdbSource, err error) {
  33. //o := orm.NewOrmUsingDB("data")
  34. fields := strings.Join(fieldArr, ",")
  35. if len(fieldArr) == 0 {
  36. fields = `*`
  37. }
  38. order := `ORDER BY edb_source_id ASC`
  39. if orderRule != "" {
  40. order = ` ORDER BY ` + orderRule
  41. }
  42. sql := fmt.Sprintf(`SELECT %s FROM edb_source WHERE 1=1 %s %s`, fields, condition, order)
  43. //_, err = o.Raw(sql, pars).QueryRows(&items)
  44. err = global.DmSQL["data"].Raw(sql, pars...).Find(&items).Error
  45. return
  46. }
  47. // GetEdbSourceItemByCondition 获取指标来源
  48. func GetEdbSourceItemByCondition(condition string, pars []interface{}) (item *EdbSource, err error) {
  49. //o := orm.NewOrmUsingDB("data")
  50. sql := fmt.Sprintf(`SELECT * FROM edb_source WHERE 1=1 %s`, condition)
  51. //err = o.Raw(sql, pars).QueryRow(&item)
  52. err = global.DmSQL["data"].Raw(sql, pars...).First(&item).Error
  53. return
  54. }
  55. // InitEdbSourceVar 初始化时加载指标来源对应信息, 避免循环中查库, 注意edb_source表修改table_name的话需要重启服务
  56. func InitEdbSourceVar() {
  57. EdbDataTableNameMap = make(map[int]string)
  58. EdbDataRefreshMethodMap = make(map[int]string)
  59. EdbTableNameSourceMap = make(map[string]*EdbSource)
  60. EdbNameSourceMap = make(map[string]*EdbSource)
  61. EdbSourceIdMap = make(map[int]*EdbSource)
  62. EdbSourceExtendIdMap = make(map[string]int)
  63. sources, e := GetEdbSourceItemsByCondition(``, make([]interface{}, 0), []string{}, "")
  64. if e != nil {
  65. utils.FileLog.Info("init source table err: %s", e.Error())
  66. return
  67. }
  68. for _, v := range sources {
  69. EdbDataTableNameMap[v.EdbSourceId] = v.TableName
  70. EdbDataRefreshMethodMap[v.EdbSourceId] = v.EdbRefreshMethod
  71. EdbTableNameSourceMap[v.TableName] = v
  72. EdbNameSourceMap[v.SourceName] = v
  73. EdbSourceIdMap[v.EdbSourceId] = v
  74. if v.SourceExtend != "" {
  75. arr := strings.Split(v.SourceExtend, ",")
  76. if len(arr) == 0 {
  77. continue
  78. }
  79. for _, s := range arr {
  80. EdbSourceExtendIdMap[s] = v.EdbSourceId
  81. }
  82. }
  83. }
  84. }
  85. // GetEdbSourceItemsSourceId
  86. // @Description: 根据来源id获取指标来源
  87. // @param sourceId
  88. // @return item
  89. // @return err
  90. func GetEdbSourceItemsSourceId(sourceId int) (item *EdbSource, err error) {
  91. sql := `SELECT * FROM edb_source WHERE 1=1 AND edb_source_id = ? `
  92. err = global.DmSQL["data"].Raw(sql, sourceId).First(&item).Error
  93. return
  94. }
  95. // GetEdbSourceBySourceId
  96. // @Description: 根据来源id获取指标来源
  97. // @param sourceId
  98. // @return sourceItem
  99. func GetEdbSourceBySourceId(sourceId int) (sourceItem *EdbSource) {
  100. sourceItem, ok := EdbSourceIdMap[sourceId]
  101. if !ok {
  102. item, err := GetEdbSourceItemsSourceId(sourceId)
  103. if err != nil {
  104. return
  105. }
  106. if item.EdbSourceId > 0 {
  107. sourceItem = item
  108. // 写入到内存中
  109. EdbSourceIdMap[sourceId] = sourceItem
  110. EdbDataTableNameMap[sourceId] = sourceItem.TableName
  111. EdbDataRefreshMethodMap[sourceItem.EdbSourceId] = sourceItem.EdbRefreshMethod
  112. EdbTableNameSourceMap[sourceItem.TableName] = sourceItem
  113. EdbNameSourceMap[sourceItem.SourceName] = sourceItem
  114. }
  115. }
  116. return
  117. }
  118. // GetEdbSourceItemsSourceName
  119. // @Description: 根据来源名称获取指标来源
  120. // @param sourceName
  121. // @return item
  122. // @return err
  123. func GetEdbSourceItemsSourceName(sourceName string) (item *EdbSource, err error) {
  124. sql := `SELECT * FROM edb_source WHERE 1=1 AND source_name = ? `
  125. err = global.DmSQL["data"].Raw(sql, sourceName).First(&item).Error
  126. return
  127. }
  128. // GetEdbSourceBySourceName
  129. // @Description: 根据来源名称获取指标来源
  130. // @param sourceId
  131. // @return sourceItem
  132. func GetEdbSourceBySourceName(sourceName string) (sourceItem *EdbSource) {
  133. sourceItem, ok := EdbNameSourceMap[sourceName]
  134. if !ok {
  135. item, err := GetEdbSourceItemsSourceName(sourceName)
  136. if err != nil {
  137. return
  138. }
  139. if item.EdbSourceId > 0 {
  140. sourceItem = item
  141. // 写入到内存中
  142. EdbSourceIdMap[sourceItem.EdbSourceId] = sourceItem
  143. EdbDataTableNameMap[sourceItem.EdbSourceId] = sourceItem.TableName
  144. EdbDataRefreshMethodMap[sourceItem.EdbSourceId] = sourceItem.EdbRefreshMethod
  145. EdbTableNameSourceMap[sourceItem.TableName] = sourceItem
  146. EdbNameSourceMap[sourceItem.SourceName] = sourceItem
  147. }
  148. }
  149. return
  150. }
  151. // GetEdbSourceTableNameBySourceId
  152. // @Description: 根据来源id获取指标来源的归属表
  153. // @param sourceId
  154. // @return sourceItem
  155. func GetEdbSourceTableNameBySourceId(sourceId int) (tableName string) {
  156. sourceItem := GetEdbSourceBySourceId(sourceId)
  157. if sourceItem != nil {
  158. tableName = sourceItem.TableName
  159. }
  160. return
  161. }
  162. // GetEdbSourceRefreshMethodBySourceId
  163. // @Description: 根据来源id获取指标来源的刷新路径
  164. // @param sourceId
  165. // @return sourceItem
  166. func GetEdbSourceRefreshMethodBySourceId(sourceId int) (refreshMethod string) {
  167. sourceItem := GetEdbSourceBySourceId(sourceId)
  168. if sourceItem != nil {
  169. refreshMethod = sourceItem.EdbRefreshMethod
  170. }
  171. return
  172. }
  173. // AddEdbSource
  174. // @Description: 添加一个新的数据源
  175. // @param item
  176. // @return err
  177. func AddEdbSource(item *EdbSource, indexNamePrefix string) (err error) {
  178. o := global.DmSQL["data"].Begin()
  179. if err != nil {
  180. return
  181. }
  182. defer func() {
  183. if err != nil {
  184. _ = o.Rollback()
  185. return
  186. }
  187. _ = o.Commit()
  188. }()
  189. indexName1 := fmt.Sprintf(`INDEX_%s_EDB_CODE`, indexNamePrefix)
  190. indexName2 := fmt.Sprintf(`INDEX_%s_EDB_INFO_ID`, indexNamePrefix)
  191. sqlStatements := []string{
  192. fmt.Sprintf(`CREATE TABLE "%s"
  193. (
  194. "edb_data_id" INT IDENTITY(1, 1) NOT NULL,
  195. "edb_info_id" INT,
  196. "edb_code" VARCHAR(50),
  197. "data_time" DATE,
  198. "value" DOUBLE,
  199. "create_time" TIMESTAMP(0),
  200. "modify_time" TIMESTAMP(0),
  201. "data_timestamp" BIGINT DEFAULT 0,
  202. NOT CLUSTER PRIMARY KEY("edb_data_id"),
  203. UNIQUE("edb_code", "data_time")) STORAGE(ON "MAIN", CLUSTERBTR) ;
  204. `, item.TableName),
  205. fmt.Sprintf(`COMMENT ON COLUMN "%s"."create_time" IS '创建时间';`, item.TableName),
  206. fmt.Sprintf(`COMMENT ON COLUMN "%s"."data_time" IS '数据日期';`, item.TableName),
  207. fmt.Sprintf(`COMMENT ON COLUMN "%s"."data_timestamp" IS '数据日期时间戳';`, item.TableName),
  208. fmt.Sprintf(`COMMENT ON COLUMN "%s"."edb_code" IS '指标编码';`, item.TableName),
  209. fmt.Sprintf(`COMMENT ON COLUMN "%s"."edb_info_id" IS '指标id';`, item.TableName),
  210. fmt.Sprintf(`COMMENT ON COLUMN "%s"."modify_time" IS '修改时间';`, item.TableName),
  211. fmt.Sprintf(`COMMENT ON COLUMN "%s"."value" IS '数据值';`, item.TableName),
  212. fmt.Sprintf(`CREATE OR REPLACE INDEX "%s" ON "%s"("edb_code" ASC) STORAGE(ON "MAIN", CLUSTERBTR) ;`, indexName1, item.TableName),
  213. fmt.Sprintf(`CREATE OR REPLACE INDEX "%s" ON "%s"("edb_info_id" ASC) STORAGE(ON "MAIN", CLUSTERBTR) ;`, indexName2, item.TableName),
  214. }
  215. // 创建表和索引
  216. for _, sql := range sqlStatements {
  217. err = o.Exec(sql).Error
  218. if err != nil {
  219. return
  220. }
  221. }
  222. // 添加来源
  223. err = o.Create(item).Error
  224. if err != nil {
  225. return
  226. }
  227. return
  228. }