xy.go 22 KB


  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta/eta_task/models/data_manage"
  6. "eta/eta_task/services/alarm_msg"
  7. "eta/eta_task/services/eta_hub"
  8. "eta/eta_task/utils"
  9. "fmt"
  10. "github.com/rdlucklib/rdluck_tools/uuid"
  11. "net/url"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // 用户同步的锁
  17. var lockSyncUser sync.Mutex
  18. // SyncUser
  19. // @Description: 定时同步ETA指标信息变更数据至第三方
  20. // @author: Roc
  21. // @datetime 2024-02-28 14:00:45
  22. // @param cont context.Context
  23. // @return err error
  24. func SyncUser(cont context.Context) (err error) {
  25. lockSyncUser.Lock()
  26. defer func() {
  27. if err != nil {
  28. tips := "SyncUser-定时将第三方的用户数据同步到ETA失败, ErrMsg:\n" + err.Error()
  29. utils.FileLog.Info(tips)
  30. go alarm_msg.SendAlarmMsg(tips, 3)
  31. }
  32. lockSyncUser.Unlock()
  33. }()
  34. uri := "/xy/user/pull"
  35. _, err, _ = HttpEtaBridgeGet(uri)
  36. if err != nil {
  37. return
  38. }
  39. return
  40. }
  41. // PushBaseParamReq
  42. // @Description: 业务报文
  43. type PushBaseParamReq struct {
  44. SerialID string `json:"serialID" description:"流水号"`
  45. TableCode string `json:"tableCode" description:"数据表编码"`
  46. Total int `json:"total" description:"本次落表数据总数"`
  47. IsEmailWarn int `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"`
  48. Data interface{} `json:"data" description:"报文体"`
  49. }
  50. // PushIndexParamDataReq
  51. // @Description: 指标数据结构
  52. type PushIndexParamDataReq struct {
  53. SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"`
  54. IndexCode string `json:"index_code" description:"数仓加工的指标ID,来源+来源ID,使用下划线连接,MST000ID00013242"`
  55. IndexName string `json:"index_name" description:"外部来源的指标名称"`
  56. IndexShortName string `json:"index_short_name" description:"系统内的指标名称/简称"`
  57. FrequenceName string `json:"frequence_name" description:"指标频度,如:日度、周度、月度"`
  58. UnitName string `json:"unit_name" description:"指标单位,如:元/吨、千克、立方米"`
  59. //CountryName string `json:"country_name" description:""`
  60. //ProvinceName string `json:"province_name" description:""`
  61. //AreaName string `json:"area_name" description:""`
  62. //CityName string `json:"city_name" description:""`
  63. //CountyName string `json:"county_name" description:""`
  64. //RegionName string `json:"region_name" description:""`
  65. //CompanyName string `json:"company_name" description:""`
  66. //BreedName string `json:"breed_name" description:""`
  67. //MaterialName string `json:"material_name" description:""`
  68. //SpecName string `json:"spec_name" description:""`
  69. //MarketName string `json:"market_name" description:""`
  70. //DerivativeType string `json:"derivative_type" description:""`
  71. //ContractName string `json:"contract_name" description:""`
  72. //AuthKindName string `json:"auth_kind_name" description:""`
  73. //CustomSmallClassName string `json:"custom_small_class_name" description:""`
  74. AssetBeginDate string `json:"asset_begin_date" description:"业务字段,指标明细数据的业务日期开始时间;格式yyyy-mm-dd"`
  75. AssetEndDate string `json:"asset_end_date" description:"业务字段,指标明细数据的业务日期结束时间;格式yyyy-mm-dd"`
  76. CreateUser string `json:"create_user" description:"创建人姓名"`
  77. IndexCreateTime string `json:"index_create_time" description:"指标基础信息创建时间戳;格式yyyy-mm-dd hh:mi:ss"`
  78. UpdateUser string `json:"update_user" description:"更新人姓名"`
  79. DetailUpdateTime string `json:"detail_update_time" description:"指标明细信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"`
  80. IndexUpdateTime string `json:"index_update_time" description:"指标基础信息更新时间戳;格式yyyy-mm-dd hh:mi:ss"`
  81. //DutyDept string `json:"duty_dept" description:""`
  82. //BusinessDept string `json:"business_dept" description:""`
  83. OrginSource string `json:"orgin_source" description:"外部数据原始来源,如国家统计局、钢联等"`
  84. OrginSysSource string `json:"orgin_sys_source" description:"外部来源系统,即数据供应商,如钢联、wind、同花顺"`
  85. SysSource string `json:"sys_source" description:"内部来源系统,如产研平台、市价平台"`
  86. SourceType string `json:"source_type" description:"数据接入方式,手工、接口、RPA"`
  87. //EtlTime string `json:"etl_time" description:""`
  88. Status int `json:"status" description:"逻辑删除:0-失效,1-有效"`
  89. }
  90. // PushIndexValueItemReq
  91. // @Description: 指标日期值数据结构
  92. type PushIndexValueItemReq struct {
  93. Id string `json:"id"`
  94. IndexCode string `json:"index_code" description:"指标代码"`
  95. Value string `json:"value" description:"数值"`
  96. BusinessDate string `json:"business_date" description:"业务日期(数据日期)"`
  97. CreateTime string `json:"create_time" description:"数据进入ETA的时间"`
  98. UpdateTime string `json:"update_time" description:"eta库中修改数据的时间"`
  99. Status string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"`
  100. }
  101. // PushClassifyItemReq
  102. // @Description: 指标分类数据结构
  103. type PushClassifyItemReq struct {
  104. ClassifyId int `json:"classify_id" description:"自增id"`
  105. ClassifyType int `json:"classify_type" description:"分类类型,0:普通指标分类,1:预测指标分类"`
  106. ClassifyName string `json:"classify_name" description:"分类名称"`
  107. ParentId int `json:"parent_id" description:"父级id"`
  108. HasData int `json:"has_data" description:"是否存在指标数据,1:有,2:无"`
  109. CreateTime string `json:"create_time" description:"创建时间"`
  110. UpdateTime string `json:"update_time" description:"修改时间"`
  111. SysUserId int `json:"sys_user_id" description:"创建人id"`
  112. SysUserRealName string `json:"sys_user_real_name" description:"创建人姓名"`
  113. Level int `json:"level" description:"层级"`
  114. UniqueCode string `json:"unique_code" description:"唯一编码"`
  115. SortColumn int `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"`
  116. }
  117. // PushEdbClassifyItemReq
  118. // @Description: 指标与目录的关系请求结构
  119. type PushEdbClassifyItemReq struct {
  120. Id string `json:"id" description:"唯一主键"`
  121. ClassifyId int `json:"classify_id" description:"目录分类ID"`
  122. IndexCode string `json:"index_code" description:"指标ID"`
  123. CreateTime string `json:"create_time" description:"创建时间"`
  124. CreateUser string `json:"create_user" description:"创建人"`
  125. UpdateTime string `json:"update_time" description:"修改时间"`
  126. UpdateUser string `json:"update_user" description:"修改人"`
  127. }
  128. // 同步指标分类锁
  129. var lockSyncClassify sync.Mutex
  130. // SyncClassifyList
  131. // @Description: 定时同步ETA分类数据至第三方
  132. // @author: Roc
  133. // @datetime 2024-02-28 14:00:45
  134. // @param cont context.Context
  135. // @return err error
  136. func SyncClassifyList(cont context.Context) (err error) {
  137. lockSyncClassify.Lock()
  138. defer func() {
  139. if err != nil {
  140. tips := "SyncIndexList-定时同步ETA指标分类变更数据至第三方失败, ErrMsg:\n" + err.Error()
  141. utils.FileLog.Info(tips)
  142. go alarm_msg.SendAlarmMsg(tips, 3)
  143. }
  144. lockSyncClassify.Unlock()
  145. }()
  146. var condition string
  147. var pars []interface{}
  148. // 普通指标分类
  149. condition = " AND classify_type = ? "
  150. pars = append(pars, 0)
  151. list, err := data_manage.GetAllEdbClassifyListByCondition(condition, pars)
  152. if err != nil {
  153. utils.FileLog.Error("获取分类列表数据失败:" + err.Error())
  154. return
  155. }
  156. dataLimitList := make([][]PushClassifyItemReq, 0)
  157. dataList := make([]PushClassifyItemReq, 0)
  158. for _, v := range list {
  159. dataList = append(dataList, PushClassifyItemReq{
  160. ClassifyId: int(v.ClassifyID),
  161. ClassifyType: int(v.ClassifyType),
  162. ClassifyName: v.ClassifyName,
  163. ParentId: int(v.ParentID),
  164. HasData: int(v.HasData),
  165. CreateTime: v.CreateTime.Format(utils.FormatDateTime),
  166. UpdateTime: v.ModifyTime.Format(utils.FormatDateTime),
  167. SysUserId: int(v.SysUserID),
  168. SysUserRealName: v.SysUserRealName,
  169. Level: int(v.Level),
  170. UniqueCode: v.UniqueCode,
  171. SortColumn: int(v.Sort),
  172. })
  173. if len(dataList) >= 100 {
  174. dataLimitList = append(dataLimitList, dataList)
  175. dataList = make([]PushClassifyItemReq, 0)
  176. }
  177. }
  178. lenData := len(dataList)
  179. if lenData > 0 {
  180. dataLimitList = append(dataLimitList, dataList)
  181. }
  182. if len(dataLimitList) < 0 {
  183. //fmt.Println("无分类数据推送")
  184. return
  185. }
  186. errDataList := make([]PushClassifyItemReq, 0)
  187. errList := make([]string, 0)
  188. defer func() {
  189. if len(errList) > 0 {
  190. dataByte, err := json.Marshal(errDataList)
  191. if err != nil {
  192. dataByte = []byte("序列化分类列表数据失败" + err.Error())
  193. }
  194. utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的分类列表数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  195. }
  196. }()
  197. for k, tmpDataList := range dataLimitList {
  198. req := PushBaseParamReq{
  199. SerialID: uuid.NewUUID().Hex32(),
  200. TableCode: "",
  201. Total: len(tmpDataList),
  202. IsEmailWarn: 0,
  203. Data: tmpDataList,
  204. }
  205. uri := "/xy/index/pushClassify"
  206. _, e, _ := HttpEtaBridgePost(uri, req)
  207. if e != nil {
  208. errList = append(errList, fmt.Sprintf("第%d组分类列表数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  209. errDataList = append(errDataList, dataList...)
  210. continue
  211. }
  212. }
  213. return
  214. }
  215. // 同步指标信息锁
  216. var lockSyncIndex sync.Mutex
  217. // SyncIndex
  218. // @Description: 定时同步指标信息
  219. // @author: Roc
  220. // @datetime 2024-03-07 17:39:34
  221. // @param cont context.Context
  222. // @return err error
  223. func SyncIndex(cont context.Context) (err error) {
  224. lockSyncIndex.Lock()
  225. defer func() {
  226. if err != nil {
  227. tips := "SyncIndex-定时同步ETA指标信息变更数据至第三方失败, ErrMsg:\n" + err.Error()
  228. utils.FileLog.Info(tips)
  229. go alarm_msg.SendAlarmMsg(tips, 3)
  230. }
  231. lockSyncIndex.Unlock()
  232. }()
  233. // 获取当前最大ID
  234. logMaxId, err := data_manage.GetEdbUpdateLogMaxId()
  235. if err != nil {
  236. return
  237. }
  238. var currLogId int64
  239. // 当前已经操作的最大ID
  240. currLogId, err = utils.Rc.GetInt64(utils.CACHE_EDB_UPDATE_LOG_ID)
  241. err = fmt.Errorf(utils.RedisNoKeyErr)
  242. if err != nil {
  243. // 如果不是没找到key,那么说明是redis报错
  244. if err.Error() != utils.RedisNoKeyErr {
  245. return
  246. }
  247. err = nil
  248. // 查找当前已经处理了的日志最大ID
  249. currLogId, err = data_manage.GetEdbUpdateLogMaxHandleId()
  250. if err != nil {
  251. if err.Error() != utils.ErrNoRow() {
  252. utils.FileLog.Error("查找当前已经处理了的日志最大ID失败:" + err.Error())
  253. } else {
  254. err = nil
  255. }
  256. currLogId = 0
  257. }
  258. }
  259. // 遍历获取下一页的数据
  260. for currId := currLogId; currId < logMaxId; {
  261. currId = handlePush(currId, logMaxId)
  262. }
  263. return
  264. }
  265. // handlePush
  266. // @Description: 推送处理
  267. // @author: Roc
  268. // @datetime 2024-03-07 19:20:19
  269. // @param currLogIdStr int64
  270. // @param logMaxId int64
  271. // @return lastId int64
  272. func handlePush(currLogIdStr, logMaxId int64) (lastId int64) {
  273. lastId = currLogIdStr
  274. // 查询当次需要同步的数据
  275. var condition string
  276. var pars []interface{}
  277. condition += " AND id > ? AND id <= ?"
  278. pars = append(pars, currLogIdStr, logMaxId)
  279. list, err := data_manage.GetEdbUpdateLogByCondition(condition, pars)
  280. if err != nil {
  281. utils.FileLog.Error("获取变更日志失败:" + err.Error())
  282. return
  283. }
  284. pushIndexList := make([]*PushIndexParamDataReq, 0)
  285. pushEdbClassifyList := make([]*PushEdbClassifyItemReq, 0)
  286. pushIndexValueList := make([]*PushIndexValueItemReq, 0)
  287. idList := make([]int64, 0)
  288. for _, v := range list {
  289. lastId = v.Id
  290. idList = append(idList, v.Id)
  291. pushIndexData, pushEdbClassifyData, pushIndexValueData, tmpErr := handleData(v)
  292. if tmpErr != nil {
  293. err = tmpErr
  294. utils.FileLog.Error("获取待处理的数据失败:" + err.Error())
  295. continue
  296. }
  297. if pushIndexData != nil {
  298. pushIndexList = append(pushIndexList, pushIndexData)
  299. }
  300. if pushEdbClassifyData != nil {
  301. pushEdbClassifyList = append(pushEdbClassifyList, pushEdbClassifyData)
  302. }
  303. if pushIndexValueData != nil {
  304. pushIndexValueList = append(pushIndexValueList, pushIndexValueData)
  305. }
  306. }
  307. pushIndex(pushIndexList)
  308. pushIndexClassify(pushEdbClassifyList)
  309. pushIndexValue(pushIndexValueList)
  310. // 标记处理
  311. err = data_manage.HandleUpdateLogByIds(idList, time.Now().Format(utils.FormatDateTime))
  312. if err != nil {
  313. utils.FileLog.Error("批量处理指标更新记录失败:" + err.Error())
  314. }
  315. utils.Rc.Put(utils.CACHE_EDB_UPDATE_LOG_ID, lastId, 31*24*time.Hour)
  316. return
  317. }
  318. const pushBatchSize = 100
  319. // pushIndex
  320. // @Description: 指标信息数据推送
  321. // @author: Roc
  322. // @datetime 2024-03-07 16:35:02
  323. // @param allPushList []*PushIndexParamDataReq
  324. func pushIndex(allPushList []*PushIndexParamDataReq) {
  325. lenDataList := len(allPushList)
  326. if lenDataList <= 0 {
  327. return
  328. }
  329. uri := utils.SyncIndexPath + "/pushIndexData"
  330. errDataList := make([]*PushIndexParamDataReq, 0)
  331. errList := make([]string, 0)
  332. defer func() {
  333. if len(errList) > 0 {
  334. dataByte, err := json.Marshal(errDataList)
  335. if err != nil {
  336. dataByte = []byte("序列化指标信息数据失败" + err.Error())
  337. }
  338. utils.FileLog.Info("pushIndex errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  339. }
  340. }()
  341. dataLimitList := make([][]*PushIndexParamDataReq, 0)
  342. for i := 0; i < lenDataList; i += pushBatchSize {
  343. endIndex := min(i+pushBatchSize, lenDataList)
  344. tempSlice := allPushList[i:endIndex]
  345. dataLimitList = append(dataLimitList, tempSlice)
  346. }
  347. for k, dataList := range dataLimitList {
  348. req := PushBaseParamReq{
  349. SerialID: uuid.NewUUID().Hex32(),
  350. TableCode: "",
  351. Total: len(dataList),
  352. IsEmailWarn: 0,
  353. Data: dataList,
  354. }
  355. _, e, _ := HttpEtaBridgePost(uri, req)
  356. if e != nil {
  357. errList = append(errList, fmt.Sprintf("第%d组指标信息数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  358. errDataList = append(errDataList, dataList...)
  359. continue
  360. }
  361. }
  362. }
  363. // pushIndexValue
  364. // @Description: 指标明细数据推送
  365. // @author: Roc
  366. // @datetime 2024-03-07 16:32:47
  367. // @param allPushList []*PushIndexValueItemReq
  368. func pushIndexValue(allPushList []*PushIndexValueItemReq) {
  369. lenDataList := len(allPushList)
  370. if lenDataList <= 0 {
  371. return
  372. }
  373. uri := utils.SyncIndexPath + "/pushIndexValue"
  374. errDataList := make([]*PushIndexValueItemReq, 0)
  375. errList := make([]string, 0)
  376. defer func() {
  377. if len(errList) > 0 {
  378. dataByte, err := json.Marshal(errDataList)
  379. if err != nil {
  380. dataByte = []byte("序列化指标明细数据失败" + err.Error())
  381. }
  382. utils.FileLog.Info("pushIndexValue errList:%s;推送失败的指标信息数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  383. }
  384. }()
  385. dataLimitList := make([][]*PushIndexValueItemReq, 0)
  386. for i := 0; i < lenDataList; i += pushBatchSize {
  387. endIndex := min(i+pushBatchSize, lenDataList)
  388. tempSlice := allPushList[i:endIndex]
  389. dataLimitList = append(dataLimitList, tempSlice)
  390. }
  391. for k, dataList := range dataLimitList {
  392. req := PushBaseParamReq{
  393. SerialID: uuid.NewUUID().Hex32(),
  394. TableCode: "",
  395. Total: len(dataList),
  396. IsEmailWarn: 0,
  397. Data: dataList,
  398. }
  399. _, e, _ := HttpEtaBridgePost(uri, req)
  400. if e != nil {
  401. errList = append(errList, fmt.Sprintf("第%d组指标明细数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  402. errDataList = append(errDataList, dataList...)
  403. continue
  404. }
  405. }
  406. return
  407. }
  408. // pushIndexClassify
  409. // @Description: 指标与分类的关系推送
  410. // @author: Roc
  411. // @datetime 2024-03-07 16:32:47
  412. // @param allPushList []*PushIndexValueItemReq
  413. func pushIndexClassify(allPushList []*PushEdbClassifyItemReq) {
  414. lenDataList := len(allPushList)
  415. if lenDataList <= 0 {
  416. return
  417. }
  418. uri := utils.SyncIndexPath + "/pushEdbClassify"
  419. errDataList := make([]*PushEdbClassifyItemReq, 0)
  420. errList := make([]string, 0)
  421. defer func() {
  422. if len(errList) > 0 {
  423. dataByte, err := json.Marshal(errDataList)
  424. if err != nil {
  425. dataByte = []byte("序列化指标明细数据失败" + err.Error())
  426. }
  427. utils.FileLog.Info("pushIndexClassify errList:%s;推送失败的指标所属分类明细数据列表:%s", strings.Join(errList, "\n"), string(dataByte))
  428. }
  429. }()
  430. dataLimitList := make([][]*PushEdbClassifyItemReq, 0)
  431. for i := 0; i < lenDataList; i += pushBatchSize {
  432. endIndex := min(i+pushBatchSize, lenDataList)
  433. tempSlice := allPushList[i:endIndex]
  434. dataLimitList = append(dataLimitList, tempSlice)
  435. }
  436. for k, dataList := range dataLimitList {
  437. req := PushBaseParamReq{
  438. SerialID: uuid.NewUUID().Hex32(),
  439. TableCode: "",
  440. Total: len(dataList),
  441. IsEmailWarn: 0,
  442. Data: dataList,
  443. }
  444. _, e, _ := HttpEtaBridgePost(uri, req)
  445. if e != nil {
  446. errList = append(errList, fmt.Sprintf("第%d组指标所属分类数据推送失败,postRefreshEdbData err: %s", k+1, e.Error()))
  447. errDataList = append(errDataList, dataList...)
  448. continue
  449. }
  450. }
  451. return
  452. }
  453. // 辅助函数:返回a和b中的较小值
  454. func min(a, b int) int {
  455. if a < b {
  456. return a
  457. }
  458. return b
  459. }
  460. // 同步crm指标信息锁
  461. var lockGetCrmIndex sync.Mutex
  462. // SyncXyCrmIndex
  463. // @Description: 定时同步CRM指标信息
  464. // @author: Roc
  465. // @datetime 2024-5-22 10:46:08
  466. // @param cont context.Context
  467. // @return err error
  468. func SyncXyCrmIndex(cont context.Context) (err error) {
  469. lockGetCrmIndex.Lock()
  470. errMsgList := make([]string, 0)
  471. defer func() {
  472. if err != nil {
  473. tips := "SyncXyCrmIndex-定时同步CRM数据到ETA平台失败, ErrMsg:\n" + err.Error()
  474. utils.FileLog.Info(tips)
  475. go alarm_msg.SendAlarmMsg(tips, 3)
  476. }
  477. if len(errMsgList) > 0 {
  478. tips := "SyncXyCrmIndex-定时同步CRM数据到ETA平台失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
  479. utils.FileLog.Info(tips)
  480. go alarm_msg.SendAlarmMsg(tips, 3)
  481. }
  482. lockGetCrmIndex.Unlock()
  483. fmt.Println("end SyncXyCrmIndex")
  484. }()
  485. // 未配置资产包合数据分区,那么就不执行
  486. if utils.SyncCrmAssetPkgCd == `` || utils.SyncCrmDataSourceType == `` {
  487. return
  488. }
  489. var lastUpdateTimeStr string // 上一次更新的时间
  490. nowTimeStr := time.Now().Format(utils.FormatDateTimeUnSpaceV2) // 这次更新的时间
  491. key := data_manage.CrmIndexLastUpdateTime
  492. sysInteractionLog, err := data_manage.GetBusinessSysInteractionLogByKey(key)
  493. if err != nil {
  494. if err.Error() != utils.ErrNoRow() {
  495. return
  496. }
  497. //lastUpdateTime := time.Now().Format("2006-01-02 15:04:05")
  498. } else {
  499. if sysInteractionLog.InteractionVal != `` {
  500. lastUpdateTimeStr = sysInteractionLog.InteractionVal
  501. }
  502. }
  503. syncCrmAssetPkgCd := utils.SyncCrmAssetPkgCd
  504. syncCrmAssetPkgCdList := strings.Split(syncCrmAssetPkgCd, ",")
  505. for _, assetPkgCd := range syncCrmAssetPkgCdList {
  506. err, errMsgList = syncCrmIndex(assetPkgCd, 1, utils.SyncCrmIndexNum, lastUpdateTimeStr)
  507. }
  508. // 修改最后的更新时间
  509. modifyCrmIndexLastUpdateTime(nowTimeStr)
  510. return
  511. }
  512. // syncCrmIndex
  513. // @Description: 开始同步CRM指标信息
  514. // @author: Roc
  515. // @datetime 2024-05-17 15:55:11
  516. // @param assetPkgCd string
  517. // @param currIndex int
  518. // @param pageSize int
  519. // @param lastUpdateTimeStr string
  520. // @return err error
  521. // @return errMsgList []string
  522. func syncCrmIndex(assetPkgCd string, currIndex, pageSize int, baseLastUpdateTimeStr string) (err error, errMsgList []string) {
  523. errMsgList = make([]string, 0)
  524. lastUpdateTimeStr := baseLastUpdateTimeStr
  525. if lastUpdateTimeStr != `` {
  526. lastUpdateTimeStr = url.QueryEscape(lastUpdateTimeStr)
  527. }
  528. uri := fmt.Sprintf("%s/getCrmData?index_pkg_code=%s&data_source_type=%s&current_index=%d&page_size=%d&detail_last_update_start_time=%s", utils.SyncCrmIndexPath, assetPkgCd, utils.SyncCrmDataSourceType, currIndex, pageSize, lastUpdateTimeStr)
  529. bResult, err, _ := HttpEtaBridgeGet(uri)
  530. if err != nil {
  531. return
  532. }
  533. result := new(EtaBridgeDataRespAndBusinessData)
  534. err = json.Unmarshal(bResult, &result)
  535. if err != nil {
  536. err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult))
  537. utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult))
  538. return
  539. }
  540. //totalPage := result.Data.Paging.Pages
  541. for _, v := range result.Data.List {
  542. tmpErr := pushCrmDataToHub(v)
  543. if tmpErr != nil {
  544. errMsgList = append(errMsgList, tmpErr.Error())
  545. }
  546. }
  547. // 如果还有下一页,那么就继续请求下一页
  548. if currIndex < result.Data.Paging.Pages {
  549. _, tmpErrMsgList := syncCrmIndex(assetPkgCd, currIndex+1, utils.SyncCrmIndexNum, baseLastUpdateTimeStr)
  550. errMsgList = append(errMsgList, tmpErrMsgList...)
  551. }
  552. return
  553. }
  554. // pushCrmDataToHub
  555. // @Description: 调用hub服务,将数据推送到eta
  556. // @author: Roc
  557. // @datetime 2024-05-17 15:55:24
  558. // @param data interface{}
  559. // @return err error
  560. func pushCrmDataToHub(data interface{}) (err error) {
  561. uri := `/edb/push`
  562. _, err, _ = eta_hub.HttpEtaHubPost(uri, data)
  563. //result := new(EtaBridgeDataRespAndBusinessData)
  564. //err = json.Unmarshal(bResult, &result)
  565. //if err != nil {
  566. // err = fmt.Errorf("result unmarshal err: %s\nresult: %s", err.Error(), string(bResult))
  567. // utils.FileLog.Info("桥接服务get请求失败:\n" + string(bResult))
  568. // return
  569. //}
  570. return
  571. }
  572. // modifyCrmIndexLastUpdateTime
  573. // @Description: 修改crm指标的最近拉取的时间
  574. // @author: Roc
  575. // @datetime 2024-05-17 11:32:32
  576. // @param fileName string
  577. // @param position uint32
  578. // @return err error
  579. func modifyCrmIndexLastUpdateTime(lastUpdateTime string) {
  580. var err error
  581. defer func() {
  582. if err != nil {
  583. utils.FileLog.Error("修改binlog文件名称和位置异常,lastUpdateTime", lastUpdateTime, ",err:", err)
  584. }
  585. }()
  586. // fileName 变更
  587. key := data_manage.CrmIndexLastUpdateTime
  588. fileNameLog, err := data_manage.GetBusinessSysInteractionLogByKey(key)
  589. if err != nil {
  590. if err.Error() != utils.ErrNoRow() {
  591. return
  592. }
  593. err = nil
  594. fileNameLog = &data_manage.BusinessSysInteractionLog{
  595. //ID: 0,
  596. InteractionKey: key,
  597. InteractionVal: lastUpdateTime,
  598. Remark: "crm拉取数据的最近更新时间",
  599. ModifyTime: time.Now(),
  600. CreateTime: time.Now(),
  601. }
  602. err = fileNameLog.Create()
  603. if err != nil {
  604. return
  605. }
  606. } else {
  607. fileNameLog.InteractionVal = lastUpdateTime
  608. fileNameLog.ModifyTime = time.Now()
  609. err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
  610. if err != nil {
  611. return
  612. }
  613. }
  614. return
  615. }