Parcourir la source

fix:添加记录binlog的位置入数据库

Roc il y a 9 mois
Parent
commit
0aa7d7f809

+ 2 - 0
config/config.go

@@ -111,6 +111,8 @@ type Xiangyu struct {
 	IndexSyncAuthPwd      string `mapstructure:"index-sync-auth-pwd" json:"index-sync-auth-pwd" yaml:"index-sync-auth-pwd" description:"用户统一身份的鉴权password"`
 	IndexKey              string `mapstructure:"index-key" json:"index-key" yaml:"index-key" description:"统一平台秘钥"`
 	IndexSyncTarget       string `mapstructure:"index-sync-target" json:"index-sync-target" yaml:"index-sync-target" description:"指标同步平台编码"`
+	IndexCrmHost          string `mapstructure:"index-crm-host" json:"index-crm-host" yaml:"index-crm-host" description:"crm指标平台地址"`
+	IndexCrmTarget        string `mapstructure:"index-crm-target" json:"index-crm-target" yaml:"index-crm-target" description:"crm指标平台编码"`
 }
 
 type PCSG struct {

+ 63 - 0
controller/xiangyu/crm.go

@@ -0,0 +1,63 @@
+package xiangyu
+
+import (
+	"eta/eta_bridge/controller/resp"
+	"eta/eta_bridge/global"
+	"eta/eta_bridge/logic/xiangyu"
+	"eta/eta_bridge/models/index"
+	xiangyuSrc "eta/eta_bridge/services/xiangyu"
+	"github.com/gin-gonic/gin"
+	"github.com/go-playground/validator/v10"
+)
+
+// GetCrmData
+// @Description: 获取crm数据
+// @author: Roc
+// @receiver xc
+// @datetime 2024-5-14 13:09:30
+// @param c *gin.Context
+func (xc *XiangyuController) GetCrmData(c *gin.Context) {
+	var req index.GetBusinessDataReq
+	if e := c.Bind(&req); e != nil {
+		err, ok := e.(validator.ValidationErrors)
+		if !ok {
+			resp.FailData("参数解析失败", "Err:"+e.Error(), c)
+			return
+		}
+		resp.FailData("参数解析失败", err.Translate(global.Trans), c)
+		return
+	}
+
+	if req.IndexCode == `` && req.IndexPkgCode == `` {
+		resp.Fail("指标编码或者指标包编码不允许为空", c)
+	}
+	//if req.Partition == `` {
+	//	resp.Fail("数据分区参数不允许为空", c)
+	//}
+
+	xyReq := xiangyuSrc.PostGetIndexDataParamReq{
+		PageNum:                   req.CurrentIndex,
+		PageSize:                  req.PageSize,
+		AssetCd:                   req.IndexCode,
+		AssetPkgCd:                req.IndexPkgCode,
+		DataDt:                    "",
+		StartDt:                   "",
+		EndDt:                     "",
+		Sort:                      "",
+		DataSourceType:            req.DataSourceType,
+		InfoLastUpdateStartTime:   "",
+		InfoLastUpdateEndTime:     "",
+		DetailLastUpdateStartTime: "",
+		DetailLastUpdateEndTime:   "",
+	}
+
+	dataResp, err, errMsg := xiangyu.PostGetIndexData(xyReq)
+
+	if err != nil {
+		resp.FailData(errMsg, err.Error(), c)
+		return
+	}
+	resp.OkData("同步成功", dataResp, c)
+
+	return
+}

+ 105 - 0
logic/xiangyu/crm.go

@@ -0,0 +1,105 @@
+package xiangyu
+
+import (
+	"eta/eta_bridge/global"
+	"eta/eta_bridge/models/index"
+	"eta/eta_bridge/services/xiangyu"
+	"eta/eta_bridge/utils"
+	"fmt"
+	"github.com/rdlucklib/rdluck_tools/paging"
+	"strings"
+	"time"
+)
+
+// PostGetIndexData
+// @Description: 获取数据
+// @author: Roc
+// @datetime 2024-05-14 17:23:39
+// @param data xiangyu.PostGetIndexDataParamReq
+// @return dataResp xiangyu.CrmDataResp
+// @return err error
+// @return errMsg string
+func PostGetIndexData(data xiangyu.PostGetIndexDataParamReq) (dataResp index.GetBusinessDataResp, err error, errMsg string) {
+	errMsgList := make([]string, 0)
+	defer func() {
+		if len(errMsgList) > 0 {
+			tips := "PostGetIndexData-获取CRM数据失败, ErrMsg:\n" + strings.Join(errMsgList, "\n")
+			global.FILE_LOG.Error(tips)
+		}
+	}()
+	dataResp = index.GetBusinessDataResp{
+		List: make([]index.PushBusinessIndex, 0),
+	}
+	errMsg = `获取失败`
+	resp, err := xiangyu.PostGetCrmData(data)
+	if err != nil {
+		return
+	}
+
+	dataResp.Paging = paging.GetPaging(resp.PageNum, resp.PageSize, resp.TotalNum)
+
+	// 列表数据
+
+	businessIndexMap := make(map[string]index.PushBusinessIndex)
+	//index.PushBusinessIndex
+	for _, v := range resp.Rows {
+		tmpDate, tmpErr := time.ParseInLocation(utils.FormatDateUnSpace, v.DataDt, time.Local)
+		if tmpErr != nil {
+			errMsgList = append(errMsgList, fmt.Sprintf("指标日期转换失败,指标编码:%s;日期:%s", v.AssetCd, v.DataDt))
+			continue
+		}
+
+		frequency := handleXyFrequency(v.FrequencyName)
+		if frequency == `` {
+			errMsgList = append(errMsgList, fmt.Sprintf("指标频度异常,指标编码:%s;日期:%s", v.AssetCd, v.FrequencyName))
+			continue
+		}
+
+		businessIndex, ok := businessIndexMap[v.AssetCd]
+
+		//
+		if !ok {
+			dataList := make([]index.AddBusinessData, 0)
+			businessIndex = index.PushBusinessIndex{
+				IndexCode:            v.AssetCd,
+				IndexName:            v.AssetName,
+				Unit:                 v.UnitName,
+				Frequency:            frequency,
+				SourceName:           v.DataSourceType,
+				Remark:               "",
+				DetailDataCreateTime: v.DetailDataStockInTime,
+				DataList:             dataList,
+			}
+		}
+
+		businessIndex.DataList = append(businessIndex.DataList, index.AddBusinessData{
+			Value: v.Price,
+			Date:  tmpDate.Format(utils.FormatDate),
+		})
+		businessIndexMap[v.AssetCd] = businessIndex
+	}
+
+	// 将处理后的指标返回
+	for _, v := range businessIndexMap {
+		dataResp.List = append(dataResp.List, v)
+	}
+
+	return
+}
+
+// handleXyFrequency
+// @Description: 处理象屿频度问题
+// @author: Roc
+// @datetime 2024-05-14 18:15:28
+// @param frequencyName string
+// @return frequency string
+func handleXyFrequency(frequencyName string) (frequency string) {
+	switch frequencyName {
+	case "季度", "半年度", "年度", "日度", "周度", "旬度", "月度 ":
+		frequency = frequencyName
+	case "每天", "每交易日":
+		frequency = "日度"
+	}
+
+	return
+}

+ 59 - 0
models/index/business_sys_interaction_log.go

@@ -0,0 +1,59 @@
+package index
+
+import (
+	"eta/eta_bridge/global"
+	"time"
+)
+
+// BusinessSysInteractionLog 商家系统交互记录表
+type BusinessSysInteractionLog struct {
+	ID             uint32    `gorm:"primaryKey;column:id;type:int(10) unsigned;not null" json:"-"`
+	InteractionKey string    `gorm:"unique;column:interaction_key;type:varchar(128);not null;default:''" json:"interactionKey"` // 记录Key
+	InteractionVal string    `gorm:"column:interaction_val;type:text;default:null" json:"interactionVal"`                       // 记录值
+	Remark         string    `gorm:"column:remark;type:varchar(128);not null;default:''" json:"remark"`                         // 备注
+	ModifyTime     time.Time `gorm:"column:modify_time;type:datetime;default:null" json:"modifyTime"`                           // 修改日期
+	CreateTime     time.Time `gorm:"column:create_time;type:datetime;default:null" json:"createTime"`                           // 创建时间
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *BusinessSysInteractionLog) TableName() string {
+	return "business_sys_interaction_log"
+}
+
+// BusinessSysInteractionLogColumns get sql column name.获取数据库列名
+var BusinessSysInteractionLogColumns = struct {
+	ID             string
+	InteractionKey string
+	InteractionVal string
+	Remark         string
+	ModifyTime     string
+	CreateTime     string
+}{
+	ID:             "id",
+	InteractionKey: "interaction_key",
+	InteractionVal: "interaction_val",
+	Remark:         "remark",
+	ModifyTime:     "modify_time",
+	CreateTime:     "create_time",
+}
+
+// Create 添加数据
+func (m *BusinessSysInteractionLog) Create() (err error) {
+	err = global.MYSQL["index"].Create(m).Error
+	return
+}
+
+// Update 更新数据
+func (m *BusinessSysInteractionLog) Update(cols []string) (err error) {
+	err = global.MYSQL["index"].Model(m).Select(cols).Updates(m).Error
+	return
+}
+
+var BinlogFileNameKey = "binlog_filename"        // binlog文件名
+var BinlogPositionKey = "binlog_binlog_position" // binlog位置
+
+// GetBusinessSysInteractionLogByKey 根据记录key获取数据
+func GetBusinessSysInteractionLogByKey(key string) (item *BusinessSysInteractionLog, err error) {
+	err = global.MYSQL["index"].Where("interaction_key = ?", key).First(&item).Error
+	return
+}

+ 44 - 0
models/index/edb_data_business.go

@@ -0,0 +1,44 @@
+package index
+
+import "github.com/rdlucklib/rdluck_tools/paging"
+
+type GetBusinessDataReq struct {
+	CurrentIndex int    `json:"current_index" form:"current_index" binding:"required"  description:"当前页码"`
+	PageSize     int    `json:"page_size" form:"page_size" binding:"required"  description:"单页条数,自己填,一页要多少条,最大2000条"`
+	IndexCode    string `json:"index_code" form:"index_code" description:"指标编码"`
+	IndexPkgCode string `json:"index_pkg_code" form:"index_pkg_code" description:"指标包编码,与指标编码二选一填写"`
+	DataDate     string `json:"data_date" form:"data_date" description:"指定数据日期,数据日期格式为2024-05-15"`
+	StartDate    string `json:"start_date" form:"start_date" description:"启始时间,格式 YYYY-MM-DD 如:2024-05-15;不为空时,将过滤出数据日期>=startDate 的数据行"`
+	EndDate      string `json:"end_date" form:"end_date" description:"结束时间,格式 YYYY-MM-DD 如:2024-05-15;不为空时,将过滤出数据日期<=endDate 的数据行"`
+	//Sort                      string `json:"sort" description:"排序字段,默认为0正序;按数据日期字段排序,0为正序 1为倒序"`
+	DataSourceType            string `json:"data_source_type" form:"data_source_type"  description:"内部来源系统参数,可只获取对应数据源数据,不传则默认获取所有数据源信息;参数含义:① CY产研平台;② RPA;③ KSF 金仕达;④CRM参数例子: CY,RPA,KSF,CRM  (参数传递字符串列表,通过逗号分隔)"`
+	InfoLastUpdateStartTime   string `json:"info_last_update_start_time" form:"info_last_update_start_time" description:"资产信息数据落到数仓时间,参数:YYYY-MM-DD HH24:MI:SS 如:2024-05-15 10:16:52不为空时,将过滤出 资产信息入库时间>=infoLastUpdateStartTime 的数据行"`
+	InfoLastUpdateEndTime     string `json:"info_last_update_end_time" form:"info_last_update_end_time" description:"资产信息数据落到数仓时间,参数:YYYY-MM-DD HH24:MI:SS 如:2024-05-15 10:16:52不为空时,将过滤出 资产信息入库时间<=infoLastUpdateStartTime 的数据行"`
+	DetailLastUpdateStartTime string `json:"detail_last_update_start_time" form:"detail_last_update_start_time" description:"明细数据落到数仓启始时间,参数:YYYY-MM-DD HH24:MI:SS 如:2024-05-15 10:16:52不为空时,将过滤出 资产详细信息入库时间>=detailLastUpdateStartTime 的数据行,建议延迟15分钟抽取"`
+	DetailLastUpdateEndTime   string `json:"detail_last_update_end_time" form:"detail_last_update_end_time" description:"明细数据落到数仓结束时间,参数:YYYY-MM-DD HH24:MI:SS 如:2024-05-15 10:16:52不为空时,将过滤出 资产详细信息入库时间<=detailLastUpdateStartTime 的数据行,建议延迟15分钟抽取"`
+}
+
+type GetBusinessDataResp struct {
+	List   []PushBusinessIndex
+	Paging *paging.PagingItem `description:"分页数据"`
+}
+
+// PushBusinessIndex
+// @Description:  添加外部指标(商家)请求
+type PushBusinessIndex struct {
+	IndexCode            string            `description:"指标编码"`
+	IndexName            string            `description:"指标名称"`
+	Unit                 string            `description:"单位"`
+	Frequency            string            `description:"频度"`
+	SourceName           string            `description:"数据来源名称"`
+	Remark               string            `description:"备注字段"`
+	DetailDataCreateTime string            `description:"明细数据入库时间"`
+	DataList             []AddBusinessData `description:"指标数据"`
+}
+
+// AddBusinessData
+// @Description: 外部指标(商家系统)数据
+type AddBusinessData struct {
+	Value float64 `description:"值"`
+	Date  string  `description:"日期"`
+}

+ 14 - 0
routers/xiangyu.go

@@ -18,6 +18,9 @@ func InitXiangyu(r *gin.RouterGroup) {
 
 	// 指标接口
 	initIndexXiangyu(r)
+
+	// crm数据接口
+	initCrmXiangyu(r)
 }
 
 func initAuthXiangyu(r *gin.RouterGroup) {
@@ -36,3 +39,14 @@ func initIndexXiangyu(r *gin.RouterGroup) {
 	group.POST("index/pushClassify", control.PushClassifyDataResp)
 	group.POST("index/pushEdbClassify", control.PushEdbClassifyDataResp)
 }
+
+// initCrmXiangyu
+// @Description: Crm数据接口
+// @author: Roc
+// @datetime 2024-05-14 17:21:22
+// @param r *gin.RouterGroup
+func initCrmXiangyu(r *gin.RouterGroup) {
+	control := new(xiangyu.XiangyuController)
+	group := r.Group("xy/index/").Use(middleware.InternalToken())
+	group.GET("crm/getCrmData", control.GetCrmData)
+}

+ 159 - 18
services/binlog/binlog.go

@@ -9,6 +9,7 @@ import (
 	"github.com/go-mysql-org/go-mysql/mysql"
 	_ "github.com/go-sql-driver/mysql"
 	"math/rand"
+	"strconv"
 	"time"
 )
 
@@ -126,17 +127,76 @@ func ListenMysql() {
 		}
 	}
 
-	var fileName string
-	var position uint32
+	//  获取上一次启动时的binlog文件名称和位置
+	fileName, position, err := getBinlogNamePosition()
+	if err != nil {
+		return
+	}
+	// 修改记录本次启动时的binlog文件名称和位置
+	modifyBinlogNamePosition(fileName, position)
+
+	c, err := canal.NewCanal(cfg)
+	if err != nil {
+		fmt.Println("err:", err)
+		return
+	}
+
+	global.FILE_LOG.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
 
+	binlogHandler := &MyEventHandler{}
+	binlogHandler.SetBinlogFileName(fileName, position)
+	c.SetEventHandler(binlogHandler)
+	//c.Run()
+
+	pos := mysql.Position{
+		Name: fileName,
+		Pos:  position,
+	}
+	err = c.RunFrom(pos)
+
+	// 定时修改binlog文件名称和位置
+	go timingModifyBinlogNamePosition()
+}
+
+// getBinlogNamePosition
+// @Description: 获取当前binlog文件名称和位置
+// @author: Roc
+// @datetime 2024-05-17 13:18:19
+// @return fileName string
+// @return position uint32
+// @return err error
+func getBinlogNamePosition() (fileName string, position uint32, err error) {
+	// 优先从redis获取
 	fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
-	position64, tmpErr := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
-	if tmpErr != nil && tmpErr.Error() != utils.RedisNoKeyErr {
-		panic("mysql binlog position is not found,err:" + tmpErr.Error())
+	position64, err := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
+	if err != nil && err.Error() != utils.RedisNoKeyErr {
+		panic("mysql binlog position is not found,err:" + err.Error())
 		return
 	}
 	position = uint32(position64)
 
+	// 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
+	if fileName == `` || position == 0 {
+
+		// binlog文件名
+		fileNameKey := index.BinlogFileNameKey
+		fileNameLog, tmpErr := index.GetBusinessSysInteractionLogByKey(fileNameKey)
+		if tmpErr == nil {
+			fileName = fileNameLog.InteractionKey
+		}
+
+		// binlog位置
+		positionKey := index.BinlogPositionKey
+		positionLog, tmpErr := index.GetBusinessSysInteractionLogByKey(positionKey)
+		if tmpErr == nil {
+			positionStr := positionLog.InteractionKey
+			positionInt, tmpErr := strconv.Atoi(positionStr)
+			if tmpErr == nil {
+				position = uint32(positionInt)
+			}
+		}
+	}
+
 	// 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
 	if fileName == `` || position == 0 {
 		item, tmpErr := index.GetShowMaster()
@@ -148,22 +208,103 @@ func ListenMysql() {
 		position = item.Position
 	}
 
-	c, err := canal.NewCanal(cfg)
-	if err != nil {
-		fmt.Println("err:", err)
-		return
+	return
+}
+
+// timingModifyBinlogNamePosition
+// @Description: 定时修改binlog文件名称和位置
+// @author: Roc
+// @datetime 2024-05-17 13:08:13
+func timingModifyBinlogNamePosition() {
+	for {
+		// 延时30s执行
+		time.Sleep(30 * time.Second)
+
+		// 获取最新的binlog文件名称和位置
+		fileName, position, err := getBinlogNamePosition()
+		if err != nil {
+			return
+		}
+
+		if fileName != `` && position != 0 {
+			// 修改记录本次启动时的binlog文件名称和位置
+			modifyBinlogNamePosition(fileName, position)
+		}
 	}
+}
 
-	global.FILE_LOG.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
+// modifyBinlogNamePosition
+// @Description: 修改记录本次启动时的binlog文件名称和位置
+// @author: Roc
+// @datetime 2024-05-17 11:32:32
+// @param fileName string
+// @param position uint32
+// @return err error
+func modifyBinlogNamePosition(fileName string, position uint32) {
+	var err error
+	defer func() {
+		if err != nil {
+			global.FILE_LOG.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
+		}
+	}()
 
-	binlogHandler := &MyEventHandler{}
-	binlogHandler.SetBinlogFileName(fileName, position)
-	c.SetEventHandler(binlogHandler)
-	//c.Run()
+	// fileName 变更
+	fileNameKey := index.BinlogFileNameKey
+	fileNameLog, err := index.GetBusinessSysInteractionLogByKey(fileNameKey)
+	if err != nil {
+		if err != utils.ErrNoRow {
+			return
+		}
+		err = nil
+		fileNameLog = &index.BusinessSysInteractionLog{
+			//ID:             0,
+			InteractionKey: fileNameKey,
+			InteractionVal: fileName,
+			Remark:         "mysql中binlog的filename名称",
+			ModifyTime:     time.Now(),
+			CreateTime:     time.Now(),
+		}
+		err = fileNameLog.Create()
+		if err != nil {
+			return
+		}
+	} else {
+		fileNameLog.InteractionVal = fileName
+		fileNameLog.ModifyTime = time.Now()
+		err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
+		if err != nil {
+			return
+		}
+	}
 
-	pos := mysql.Position{
-		Name: fileName,
-		Pos:  position,
+	// position 变更
+	positionKey := index.BinlogPositionKey
+	positionLog, err := index.GetBusinessSysInteractionLogByKey(positionKey)
+	if err != nil {
+		if err != utils.ErrNoRow {
+			return
+		}
+		err = nil
+		positionLog = &index.BusinessSysInteractionLog{
+			//ID:             0,
+			InteractionKey: positionKey,
+			InteractionVal: fmt.Sprint(position),
+			Remark:         "mysql中binlog的position位置",
+			ModifyTime:     time.Now(),
+			CreateTime:     time.Now(),
+		}
+		err = positionLog.Create()
+		if err != nil {
+			return
+		}
+	} else {
+		positionLog.InteractionVal = fmt.Sprint(position)
+		positionLog.ModifyTime = time.Now()
+		err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
+		if err != nil {
+			return
+		}
 	}
-	err = c.RunFrom(pos)
+
+	return
 }

+ 228 - 0
services/xiangyu/crm.go

@@ -0,0 +1,228 @@
+package xiangyu
+
+import (
+	"encoding/json"
+	"errors"
+	"eta/eta_bridge/global"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+	"strings"
+)
+
+type PostGetIndexDataParamReq struct {
+	PageNum                   int    `json:"pageNum" description:"页码,先取第一页,看一下总条数,然后根据总条数给返回"`
+	PageSize                  int    `json:"pageSize" description:"单页条数,自己填,一页要多少条,最大2000条"`
+	AssetCd                   string `json:"assetCd" description:"资产编码,与资产包编码二选一填写,该类业务数据在数仓中的唯一编码,该编码由数仓提供给项目组"`
+	AssetPkgCd                string `json:"assetPkgCd" description:"资产包编码,与资产编码二选一填写,传入该参数时,将返回该资产包内的所有资产,该编码由数仓提供给项目组"`
+	DataDt                    string `json:"dataDt" description:"数据日期,如2022年5月25日的铝的市价,数据日期则为20220525"`
+	StartDt                   string `json:"startDt" description:"启始时间,格式 YYYYMMDD 如:20211010;不为空时,将过滤出数据日期>=startDate 的数据行"`
+	EndDt                     string `json:"endDt" description:"结束时间,格式 YYYYMMDD 如:20211010;不为空时,将过滤出数据日期<=endDate 的数据行"`
+	Sort                      string `json:"sort" description:"排序字段,默认为0正序;按数据日期字段排序,0为正序 1为倒序"`
+	DataSourceType            string `json:"dataSourceType" description:"内部来源系统参数,可只获取对应数据源数据,不传则默认获取所有数据源信息;参数含义:① CY产研平台;② RPA;③ KSF 金仕达;④CRM参数例子: CY,RPA,KSF,CRM  (参数传递字符串列表,通过逗号分隔)"`
+	InfoLastUpdateStartTime   string `json:"infoLastUpdateStartTime" description:"资产信息数据落到数仓时间,参数:YYYYMMDD HH24:MI:SS 如:20240229 18:00:00 不为空时,将过滤出 资产信息入库时间>=infoLastUpdateStartTime 的数据行"`
+	InfoLastUpdateEndTime     string `json:"infoLastUpdateEndTime" description:"资产信息数据落到数仓时间,参数:YYYYMMDD HH24:MI:SS 如:20240229 18:00:00 不为空时,将过滤出 资产信息入库时间<=infoLastUpdateStartTime 的数据行"`
+	DetailLastUpdateStartTime string `json:"detailLastUpdateStartTime" description:"明细数据落到数仓启始时间,参数:YYYYMMDD HH24:MI:SS 如:20240229 18:00:00 不为空时,将过滤出 资产详细信息入库时间>=detailLastUpdateStartTime 的数据行,建议延迟15分钟抽取"`
+	DetailLastUpdateEndTime   string `json:"detailLastUpdateEndTime" description:"明细数据落到数仓结束时间,参数:YYYYMMDD HH24:MI:SS 如:20240229 18:00:00 不为空时,将过滤出 资产详细信息入库时间<=detailLastUpdateStartTime 的数据行,建议延迟15分钟抽取"`
+
+	//CommonParameters In0 `json:"commonParameters" description:"公共参数"`
+}
+
+func structToURLParams(req interface{}) (string, error) {
+	// 首先,将结构体转换为map[string]interface{}
+	marshaledBytes, err := json.Marshal(req)
+	if err != nil {
+		return "", err
+	}
+
+	var params map[string]interface{}
+	err = json.Unmarshal(marshaledBytes, &params)
+	if err != nil {
+		return "", err
+	}
+
+	// 创建url.Values,它是一个键值对的集合,用于构建URL查询字符串
+	values := url.Values{}
+	for key, value := range params {
+		values.Set(key, fmt.Sprintf("%v", value))
+	}
+
+	// 返回查询字符串
+	return values.Encode(), nil
+}
+
+// CrmBaseResp
+// @Description: Crm基础信息返回
+type CrmBaseResp struct {
+	ErrCode   int         `json:"errCode"`
+	RequestId string      `json:"requestId"`
+	ErrMsg    string      `json:"errMsg"`
+	ApiLog    interface{} `json:"apiLog"`
+}
+
+// GetCrmDataResp
+// @Description: CRM数据接口返回
+type GetCrmDataResp struct {
+	CrmBaseResp
+	Data CrmDataResp `json:"data"`
+}
+
+// CrmDataResp
+// @Description: 实际数据返回
+type CrmDataResp struct {
+	TotalNum int               `json:"totalNum"`
+	PageSize int               `json:"pageSize"`
+	Rows     []CrmDataItemResp `json:"rows"`
+	PageNum  int               `json:"pageNum"`
+}
+
+// CrmDataItemResp
+// @Description: 指标数据返回
+type CrmDataItemResp struct {
+	Price                 float64     `json:"price"`
+	DataDt                string      `json:"datadt"`
+	UpdateTime            string      `json:"updatetime"`
+	AssetCd               string      `json:"assetcd"`
+	MarketName            string      `json:"marketname"`
+	AreaName              string      `json:"areaname"`
+	AssetName             string      `json:"assetname"`
+	Currency              interface{} `json:"currency"`
+	MaterialName          string      `json:"materialname"`
+	SpecName              string      `json:"specname"`
+	BreedName             string      `json:"breedname"`
+	UnitName              string      `json:"unitname"`
+	SourceName            string      `json:"sourcename"`
+	FrequencyName         string      `json:"frequencyname"`
+	CountryName           string      `json:"countryname"`
+	ProvinceName          string      `json:"provincename"`
+	CityName              string      `json:"cityname"`
+	CountyName            string      `json:"countyname"`
+	CompanyName           string      `json:"companyname"`
+	Description           string      `json:"description"`
+	BeginDate             string      `json:"begindate"`
+	EndDate               string      `json:"enddate"`
+	CreateTime            string      `json:"createtime"`
+	Status                interface{} `json:"status"`
+	PublishRuleName       string      `json:"publishrulename"`
+	AuthKind              *string     `json:"authkind"`
+	SmallClassName        interface{} `json:"smallclassname"`
+	DataSource            string      `json:"datasource"`
+	DataSourceType        string      `json:"datasourcetype"`
+	OrginSysSource        interface{} `json:"orginsyssource"`
+	SourceType            interface{} `json:"sourcetype"`
+	DeptCd                string      `json:"deptcd"`
+	DutyDept              string      `json:"dutydept"`
+	DerivativeType        string      `json:"derivativetype"`
+	StkCode               string      `json:"stkcode"`
+	StkName               string      `json:"stkname"`
+	MetricName            string      `json:"metricname"`
+	AssetInfoStockInTime  string      `json:"asset_info_stock_in_time" description:"资产信息数据落到数仓时间,参数:YYYYMMDD HH24:MI:SS;如:20240229 18:00:00"`
+	DetailDataStockInTime string      `json:"detail_data_stock_in_time" description:"明细数据落到数仓时间,参数:YYYYMMDD HH24:MI:SS;如:20240229 18:00:00"`
+}
+
+// PostGetCrmData
+// @Description: 获取CRM数据
+// @author: Roc
+// @datetime 2024-05-14 16:45:40
+// @param req PostGetIndexDataParamReq
+// @return dataResp *CrmDataResp
+// @return err error
+func PostGetCrmData(req PostGetIndexDataParamReq) (dataResp CrmDataResp, err error) {
+	urlPath := `/xmxygtest/dasc/test/mpdata/index/data`
+	//req.CommonParameters = In0{
+	//	PageTotal: "",
+	//	PageNo:    "",
+	//	DocType:   "数仓市价服务",
+	//	Property:  "",
+	//	//DocCode:   getDocCode(),
+	//	Source: global.CONFIG.Xiangyu.SystemCode,
+	//	Target: global.CONFIG.Xiangyu.IndexCrmTarget,
+	//}
+
+	queryString, err := structToURLParams(req)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return
+	}
+	urlPath += "?" + queryString
+
+	postData, err := json.Marshal(req)
+	if err != nil {
+		return
+	}
+	result, err := HttpPostXmxyg(urlPath, string(postData))
+	if err != nil {
+		return
+	}
+
+	var resp *GetCrmDataResp
+	//  解析响应结果
+	err = json.Unmarshal(result, &resp)
+	if err != nil {
+		return
+	}
+
+	if resp.ErrCode != 0 {
+		err = errors.New(fmt.Sprintf("响应代码:%d,错误信息:%s", resp.ErrCode, resp.ErrMsg))
+		return
+	}
+
+	dataResp = resp.Data
+
+	return
+}
+
+// HttpPostXmxyg
+// @Description: post请求
+// @author: Roc
+// @datetime 2024-02-27 18:45:30
+// @param urlPath string
+// @param postData string
+// @return []byte
+// @return error
+func HttpPostXmxyg(urlPath, postData string) ([]byte, error) {
+	if global.CONFIG.Xiangyu.IndexCrmHost == `` {
+		return nil, errors.New("crm数据平台地址为空")
+	}
+	// 请求地址
+	postUrl := global.CONFIG.Xiangyu.IndexCrmHost + urlPath
+
+	body := io.NopCloser(strings.NewReader(postData))
+	client := &http.Client{}
+	req, err := http.NewRequest("POST", postUrl, body)
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("content-Type", "application/json; charset=utf-8")
+	req.Header.Set("Accept-Encoding", "application/json; charset=utf-8")
+	req.Header.Set("Accept", "application/json; charset=utf-8")
+
+	// 鉴权
+	req.SetBasicAuth(global.CONFIG.Xiangyu.IndexSyncAuthUserName, global.CONFIG.Xiangyu.IndexSyncAuthPwd)
+
+	// 秘钥
+	if global.CONFIG.Xiangyu.IndexKey != `` {
+		req.Header.Set("deipaaskeyauth", global.CONFIG.Xiangyu.IndexKey)
+	}
+
+	resp, err := client.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+	result, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return nil, err
+	}
+
+	// 日志记录
+	global.FILE_LOG.Debug("crm数据平台:地址:" + postUrl + ";\n请求参数:" + postData + ";\n返回参数:" + string(result))
+
+	//  解析返回参数,判断是否是json
+	if !json.Valid(result) {
+		err = errors.New("返回参数不是json格式")
+	}
+
+	return result, err
+}