xiangyu.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package eta_bridge
  2. import (
  3. "encoding/json"
  4. "eta/eta_data_init/models"
  5. "eta/eta_data_init/utils"
  6. "fmt"
  7. "github.com/rdlucklib/rdluck_tools/uuid"
  8. "strings"
  9. "sync"
  10. )
  11. // 用户同步的锁
  12. var lockSyncUser sync.Mutex
  13. // SyncUser
  14. // @Description: 定时同步ETA指标信息变更数据至第三方
  15. // @author: Roc
  16. // @datetime 2024-02-28 14:00:45
  17. // @param cont context.Context
  18. // @return err error
  19. func SyncUser() (err error) {
  20. lockSyncUser.Lock()
  21. defer func() {
  22. if err != nil {
  23. tips := "SyncUser-定时将第三方的用户数据同步到ETA失败, ErrMsg:\n" + err.Error()
  24. utils.FileLog.Info(tips)
  25. }
  26. lockSyncUser.Unlock()
  27. }()
  28. uri := "/xy/user/sync"
  29. _, err, _ = HttpEtaBridgeGet(uri)
  30. if err != nil {
  31. return
  32. }
  33. return
  34. }
  35. // PushBaseParamReq
  36. // @Description: 业务报文
  37. type PushBaseParamReq struct {
  38. SerialID string `json:"serialID" description:"流水号"`
  39. TableCode string `json:"tableCode" description:"数据表编码"`
  40. Total int `json:"total" description:"本次落表数据总数"`
  41. IsEmailWarn int `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"`
  42. Data interface{} `json:"data" description:"报文体"`
  43. }
  44. // PushIndexParamDataReq
  45. // @Description: 指标数据结构
  46. type PushIndexParamDataReq struct {
  47. SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"`
  48. IndexCode string `json:"index_code" description:"数仓加工的指标ID,来源+来源ID,使用下划线连接,MST000ID00013242"`
  49. IndexName string `json:"index_name" description:"外部来源的指标名称"`
  50. IndexShortName string `json:"index_short_name" description:"系统内的指标名称/简称"`
  51. FrequenceName string `json:"frequence_name" description:"指标频度,如:日度、周度、月度"`
  52. UnitName string `json:"unit_name" description:"指标单位,如:元/吨、千克、立方米"`
  53. //CountryName string `json:"country_name" description:""`
  54. //ProvinceName string `json:"province_name" description:""`
  55. //AreaName string `json:"area_name" description:""`
  56. //CityName string `json:"city_name" description:""`
  57. //CountyName string `json:"county_name" description:""`
  58. //RegionName string `json:"region_name" description:""`
  59. //CompanyName string `json:"company_name" description:""`
  60. //BreedName string `json:"breed_name" description:""`
  61. //MaterialName string `json:"material_name" description:""`
  62. //SpecName string `json:"spec_name" description:""`
  63. //MarketName string `json:"market_name" description:""`
  64. //DerivativeType string `json:"derivative_type" description:""`
  65. //ContractName string `json:"contract_name" description:""`
  66. //AuthKindName string `json:"auth_kind_name" description:""`
  67. //CustomSmallClassName string `json:"custom_small_class_name" description:""`
  68. AssetBeginDate string `json:"asset_begin_date" description:"业务字段,指标明细数据的业务日期开始时间;格式yyyy-mm-dd"`
  69. AssetEndDate string `json:"asset_end_date" description:"业务字段,指标明细数据的业务日期结束时间;格式yyyy-mm-dd"`
  70. CreateUser string `json:"create_user" description:"创建人姓名"`
  71. IndexCreateTime string `json:"index_create_time" description:"指标基础信息创建时间戳;格式yyyy-mm-dd hh:mi:ss"`
  72. UpdateUser string `json:"update_user" description:"更新人姓名"`
  73. DetailUpdateTime string `json:"detail_update_time" description:"指标明细信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"`
  74. IndexUpdateTime string `json:"index_update_time" description:"指标基础信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"`
  75. //DutyDept string `json:"duty_dept" description:""`
  76. //BusinessDept string `json:"business_dept" description:""`
  77. OrginSource string `json:"orgin_source" description:"外部数据原始来源,如国家统计局、钢联等"`
  78. OrginSysSource string `json:"orgin_sys_source" description:"外部来源系统,即数据供应商,如钢联、wind、同花顺"`
  79. SysSource string `json:"sys_source" description:"内部来源系统,如产研平台、市价平台"`
  80. SourceType string `json:"source_type" description:"数据接入方式,手工、接口、RPA"`
  81. //EtlTime string `json:"etl_time" description:""`
  82. Status int `json:"status" description:"逻辑删除:0-失效,1-有效"`
  83. }
  84. // PushIndexValueItemReq
  85. // @Description: 指标日期值数据结构
  86. type PushIndexValueItemReq struct {
  87. Id string `json:"id"`
  88. IndexCode string `json:"index_code" description:"指标代码"`
  89. Value string `json:"value" description:"数值"`
  90. BusinessDate string `json:"business_date" description:"业务日期(数据日期)"`
  91. CreateTime string `json:"create_time" description:"数据进入ETA的时间"`
  92. UpdateTime string `json:"update_time" description:"eta库中修改数据的时间"`
  93. Status string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"`
  94. }
  95. // PushClassifyItemReq
  96. // @Description: 指标分类数据结构
  97. type PushClassifyItemReq struct {
  98. ClassifyId int `json:"classify_id" description:"自增id"`
  99. ClassifyType int `json:"classify_type" description:"分类类型,0:普通指标分类,1:预测指标分类"`
  100. ClassifyName string `json:"classify_name" description:"分类名称"`
  101. ParentId int `json:"parent_id" description:"父级id"`
  102. HasData int `json:"has_data" description:"是否存在指标数据,1:有,2:无"`
  103. CreateTime string `json:"create_time" description:"创建时间"`
  104. UpdateTime string `json:"update_time" description:"修改时间"`
  105. SysUserId int `json:"sys_user_id" description:"创建人id"`
  106. SysUserRealName string `json:"sys_user_real_name" description:"创建人姓名"`
  107. Level int `json:"level" description:"层级"`
  108. UniqueCode string `json:"unique_code" description:"唯一编码"`
  109. SortColumn int `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"`
  110. }
  111. // PushEdbClassifyItemReq
  112. // @Description: 指标与目录的关系请求结构
  113. type PushEdbClassifyItemReq struct {
  114. Id string `json:"id" description:"唯一主键"`
  115. ClassifyId int `json:"classify_id" description:"目录分类ID"`
  116. IndexCode string `json:"index_code" description:"指标ID"`
  117. CreateTime string `json:"create_time" description:"创建时间"`
  118. CreateUser string `json:"create_user" description:"创建人"`
  119. UpdateTime string `json:"update_time" description:"修改时间"`
  120. UpdateUser string `json:"update_user" description:"修改人"`
  121. }
  122. // 同步指标分类锁
  123. var lockSyncClassify sync.Mutex
  124. // SyncClassifyList
  125. // @Description: 同步分类
  126. // @author: Roc
  127. // @datetime 2024-03-14 10:15:53
  128. // @param num int 如果小于等于0,那么是同步所有的,如果大于0,那么是同步num条数据
  129. // @return err error
  130. func SyncClassifyList(num int) (err error) {
  131. lockSyncClassify.Lock()
  132. defer func() {
  133. if err != nil {
  134. tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error()
  135. utils.FileLog.Info(tips)
  136. }
  137. lockSyncClassify.Unlock()
  138. }()
  139. // 需要同步的数据下标
  140. syncIndex := num - 1
  141. var condition string
  142. var pars []interface{}
  143. // 普通指标分类
  144. condition = " AND classify_type = ? "
  145. pars = append(pars, 0)
  146. list, err := models.GetAllEdbClassifyListByCondition(condition, pars)
  147. if err != nil {
  148. fmt.Println(err)
  149. return
  150. }
  151. dataLimitList := make([][]PushClassifyItemReq, 0)
  152. dataList := make([]PushClassifyItemReq, 0)
  153. for k, v := range list {
  154. dataList = append(dataList, PushClassifyItemReq{
  155. ClassifyId: int(v.ClassifyID),
  156. ClassifyType: int(v.ClassifyType),
  157. ClassifyName: v.ClassifyName,
  158. ParentId: int(v.ParentID),
  159. HasData: int(v.HasData),
  160. CreateTime: v.CreateTime.Format(utils.FormatDateTime),
  161. UpdateTime: v.ModifyTime.Format(utils.FormatDateTime),
  162. SysUserId: int(v.SysUserID),
  163. SysUserRealName: v.SysUserRealName,
  164. Level: int(v.Level),
  165. UniqueCode: v.UniqueCode,
  166. SortColumn: int(v.Sort),
  167. })
  168. // 如果指定数量了,且当前下标等于指定的下标,那么就跳出循环
  169. if num > 0 && syncIndex == k {
  170. break
  171. }
  172. if len(dataList) >= 100 {
  173. dataLimitList = append(dataLimitList, dataList)
  174. dataList = make([]PushClassifyItemReq, 0)
  175. }
  176. }
  177. lenData := len(dataList)
  178. if lenData > 0 {
  179. dataLimitList = append(dataLimitList, dataList)
  180. }
  181. if len(dataLimitList) < 0 {
  182. fmt.Println("无分类数据推送")
  183. return
  184. }
  185. for k, tmpDataList := range dataLimitList {
  186. req := PushBaseParamReq{
  187. SerialID: uuid.NewUUID().Hex32(),
  188. TableCode: "",
  189. Total: len(tmpDataList),
  190. IsEmailWarn: 0,
  191. Data: tmpDataList,
  192. }
  193. uri := "/xy/index/pushClassify"
  194. _, e, _ := HttpEtaBridgePost(uri, req)
  195. if e != nil {
  196. err = fmt.Errorf("第%d组分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error())
  197. fmt.Println(err)
  198. continue
  199. }
  200. }
  201. return
  202. }
  203. // 同步指标锁
  204. var lockSyncIndex sync.Mutex
  205. // SyncIndexList
  206. // @Description: 批量初始化指标
  207. // @author: Roc
  208. // @datetime 2024-03-07 19:20:19
  209. // @param currLogIdStr int64
  210. // @param logMaxId int64
  211. // @return lastId int64
  212. func SyncIndexList(num int) (err error) {
  213. lockSyncIndex.Lock()
  214. defer func() {
  215. if err != nil {
  216. tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error()
  217. utils.FileLog.Info(tips)
  218. }
  219. lockSyncIndex.Unlock()
  220. }()
  221. // 需要同步的数据下标
  222. syncIndex := num - 1
  223. // 初始化数据表的关系
  224. models.InitEdbSourceVar()
  225. // 查询当次需要同步的数据
  226. var condition string
  227. var pars []interface{}
  228. condition += " AND edb_info_type = ? AND edb_type = ? "
  229. pars = append(pars, 0, 1)
  230. // 获取指标数
  231. list, err := models.GetAllEdbInfoListByCondition(condition, pars)
  232. if err != nil {
  233. fmt.Println(err)
  234. return
  235. }
  236. pushIndexParamDataReqList := make([]*PushIndexParamDataReq, 0)
  237. pushEdbClassifyItemReqList := make([]*PushEdbClassifyItemReq, 0)
  238. for k, edbInfo := range list {
  239. // 获取数据源中指标的基础信息
  240. origInfo := getOrigInfo(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.EdbName)
  241. // 指标信息
  242. pushIndexData := &PushIndexParamDataReq{
  243. SourceIndexCode: edbInfo.EdbCode,
  244. IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode),
  245. IndexName: origInfo.EdbName,
  246. IndexShortName: edbInfo.EdbName,
  247. FrequenceName: edbInfo.Frequency,
  248. UnitName: edbInfo.Unit,
  249. AssetBeginDate: edbInfo.StartDate,
  250. AssetEndDate: edbInfo.EndDate,
  251. CreateUser: edbInfo.SysUserRealName,
  252. IndexCreateTime: edbInfo.CreateTime,
  253. UpdateUser: edbInfo.SysUserRealName,
  254. //DetailUpdateTime: getMaxModifyTime(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbCode, edbInfo.ModifyTime),
  255. DetailUpdateTime: edbInfo.ModifyTime,
  256. IndexUpdateTime: edbInfo.ModifyTime,
  257. OrginSource: edbInfo.SourceName,
  258. OrginSysSource: origInfo.SourceName,
  259. SysSource: "产研平台",
  260. SourceType: getSourceType(edbInfo.Source),
  261. Status: 1,
  262. }
  263. // 指标与分类的关系信息
  264. pushEdbClassify := &PushEdbClassifyItemReq{
  265. Id: fmt.Sprint(edbInfo.EdbInfoId),
  266. ClassifyId: edbInfo.ClassifyId,
  267. IndexCode: getIndexCode(edbInfo.Source, edbInfo.EdbCode),
  268. CreateTime: edbInfo.CreateTime,
  269. CreateUser: edbInfo.SysUserRealName,
  270. UpdateTime: edbInfo.ModifyTime,
  271. UpdateUser: edbInfo.SysUserRealName,
  272. }
  273. pushIndexParamDataReqList = append(pushIndexParamDataReqList, pushIndexData)
  274. pushEdbClassifyItemReqList = append(pushEdbClassifyItemReqList, pushEdbClassify)
  275. //pushIndexValueList, tmpErr := getPushIndexValueItemReqList(edbInfo.Source, edbInfo.SubSource, edbInfo.EdbInfoId)
  276. //if tmpErr != nil {
  277. // fmt.Printf("%s渠道的%s指标数据获取失败\n", edbInfo.SourceName, edbInfo.EdbName)
  278. // continue
  279. //}
  280. //pushIndex([]*PushIndexParamDataReq{pushIndexData})
  281. //pushIndexClassify([]*PushEdbClassifyItemReq{pushEdbClassify})
  282. //pushIndexValue(pushIndexValueList)
  283. // 如果指定数量了,且当前下标等于指定的下标,那么就跳出循环
  284. if num > 0 && syncIndex == k {
  285. break
  286. }
  287. }
  288. pushIndex(pushIndexParamDataReqList)
  289. pushIndexClassify(pushEdbClassifyItemReqList)
  290. return
  291. }
  292. const pushBatchSize = 100
  293. // pushIndex
  294. // @Description: 指标信息数据推送
  295. // @author: Roc
  296. // @datetime 2024-03-07 16:35:02
  297. // @param allPushList []*PushIndexParamDataReq
  298. func pushIndex(allPushList []*PushIndexParamDataReq) {
  299. lenDataList := len(allPushList)
  300. if lenDataList <= 0 {
  301. return
  302. }
  303. uri := utils.SyncIndexPath + "/pushIndexData"
  304. errDataList := make([]*PushIndexParamDataReq, 0)
  305. errList := make([]string, 0)
  306. defer func() {
  307. if len(errList) > 0 {
  308. dataByte, err := json.Marshal(errDataList)
  309. if err != nil {
  310. dataByte = []byte("序列化指标信息数据失败" + err.Error())
  311. }
  312. utils.FileLog.Info("pushIndex errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  313. }
  314. }()
  315. dataLimitList := make([][]*PushIndexParamDataReq, 0)
  316. for i := 0; i < lenDataList; i += pushBatchSize {
  317. endIndex := min(i+pushBatchSize, lenDataList)
  318. tempSlice := allPushList[i:endIndex]
  319. dataLimitList = append(dataLimitList, tempSlice)
  320. }
  321. for k, dataList := range dataLimitList {
  322. req := PushBaseParamReq{
  323. SerialID: uuid.NewUUID().Hex32(),
  324. TableCode: "",
  325. Total: len(dataList),
  326. IsEmailWarn: 0,
  327. Data: dataList,
  328. }
  329. _, e, _ := HttpEtaBridgePost(uri, req)
  330. if e != nil {
  331. errList = append(errList, fmt.Sprintf("第%d组指标信息数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  332. errDataList = append(errDataList, dataList...)
  333. continue
  334. }
  335. }
  336. }
  337. // pushIndexValue
  338. // @Description: 指标明细数据推送
  339. // @author: Roc
  340. // @datetime 2024-03-07 16:32:47
  341. // @param allPushList []*PushIndexValueItemReq
  342. func pushIndexValue(allPushList []*PushIndexValueItemReq) {
  343. lenDataList := len(allPushList)
  344. if lenDataList <= 0 {
  345. return
  346. }
  347. uri := utils.SyncIndexPath + "/pushIndexValue"
  348. errDataList := make([]*PushIndexValueItemReq, 0)
  349. errList := make([]string, 0)
  350. defer func() {
  351. if len(errList) > 0 {
  352. dataByte, err := json.Marshal(errDataList)
  353. if err != nil {
  354. dataByte = []byte("序列化指标明细数据失败" + err.Error())
  355. }
  356. utils.FileLog.Info("pushIndexValue errList:%s;推送失败的指标明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  357. }
  358. }()
  359. dataLimitList := make([][]*PushIndexValueItemReq, 0)
  360. for i := 0; i < lenDataList; i += pushBatchSize {
  361. endIndex := min(i+pushBatchSize, lenDataList)
  362. tempSlice := allPushList[i:endIndex]
  363. dataLimitList = append(dataLimitList, tempSlice)
  364. }
  365. for k, dataList := range dataLimitList {
  366. req := PushBaseParamReq{
  367. SerialID: uuid.NewUUID().Hex32(),
  368. TableCode: "",
  369. Total: len(dataList),
  370. IsEmailWarn: 0,
  371. Data: dataList,
  372. }
  373. _, e, _ := HttpEtaBridgePost(uri, req)
  374. if e != nil {
  375. errList = append(errList, fmt.Sprintf("第%d组指标明细数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  376. errDataList = append(errDataList, dataList...)
  377. continue
  378. }
  379. }
  380. return
  381. }
  382. // pushIndexClassify
  383. // @Description: 指标与分类的关系推送
  384. // @author: Roc
  385. // @datetime 2024-03-07 16:32:47
  386. // @param allPushList []*PushIndexValueItemReq
  387. func pushIndexClassify(allPushList []*PushEdbClassifyItemReq) {
  388. lenDataList := len(allPushList)
  389. if lenDataList <= 0 {
  390. return
  391. }
  392. uri := utils.SyncIndexPath + "/pushEdbClassify"
  393. errDataList := make([]*PushEdbClassifyItemReq, 0)
  394. errList := make([]string, 0)
  395. defer func() {
  396. if len(errList) > 0 {
  397. dataByte, err := json.Marshal(errDataList)
  398. if err != nil {
  399. dataByte = []byte("序列化指标明细数据失败" + err.Error())
  400. }
  401. utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的指标所属分类明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  402. }
  403. }()
  404. dataLimitList := make([][]*PushEdbClassifyItemReq, 0)
  405. for i := 0; i < lenDataList; i += pushBatchSize {
  406. endIndex := min(i+pushBatchSize, lenDataList)
  407. tempSlice := allPushList[i:endIndex]
  408. dataLimitList = append(dataLimitList, tempSlice)
  409. }
  410. for k, dataList := range dataLimitList {
  411. req := PushBaseParamReq{
  412. SerialID: uuid.NewUUID().Hex32(),
  413. TableCode: "",
  414. Total: len(dataList),
  415. IsEmailWarn: 0,
  416. Data: dataList,
  417. }
  418. _, e, _ := HttpEtaBridgePost(uri, req)
  419. if e != nil {
  420. errList = append(errList, fmt.Sprintf("第%d组指标所属分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  421. errDataList = append(errDataList, dataList...)
  422. continue
  423. }
  424. }
  425. return
  426. }
  427. // getIndexCode
  428. // @Description: 指标编码生成
  429. // @author: Roc
  430. // @datetime 2024-03-20 17:29:27
  431. // @param source int
  432. // @param edbCode string
  433. // @return string
  434. func getIndexCode(source int, edbCode string) string {
  435. return fmt.Sprint(source, "_", edbCode)
  436. }
  437. // 辅助函数:返回a和b中的较小值
  438. func min(a, b int) int {
  439. if a < b {
  440. return a
  441. }
  442. return b
  443. }