mongo.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package init_serve
  2. import (
  3. "context"
  4. "github.com/qiniu/qmgo"
  5. "github.com/qiniu/qmgo/options"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/event"
  8. mgoptions "go.mongodb.org/mongo-driver/mongo/options"
  9. "hongze/hongze_yb/global"
  10. "sync"
  11. )
  12. var (
  13. ConnectTimeoutMS = int64(3000)
  14. SocketTimeoutMS = int64(500000)
  15. MaxPoolSize = uint64(50)
  16. MinPoolSize = uint64(0)
  17. )
  18. //func init() {
  19. // fmt.Println("start MgoNewClient")
  20. // MgoNewClient()
  21. // fmt.Println("end MgoNewClient")
  22. //}
  23. // MgoNewClient
  24. // @Description: 创建一个mongodb客户端链接
  25. // @author: Roc
  26. // @datetime 2024-04-25 14:01:14
  27. // @return *qmgo.Client
  28. func MgoNewClient() *qmgo.Client {
  29. // 创建cmdMonitor,用于打印SQL
  30. //startedCommands := make(map[int64]bson.Raw)
  31. mgoConfig := global.CONFIG.Mongo
  32. startedCommands := sync.Map{} // map[int64]bson.Raw
  33. cmdMonitor := &event.CommandMonitor{
  34. Started: func(_ context.Context, evt *event.CommandStartedEvent) {
  35. startedCommands.Store(evt.RequestID, evt.Command)
  36. //startedCommands[evt.RequestID] = evt.Command
  37. },
  38. Succeeded: func(_ context.Context, evt *event.CommandSucceededEvent) {
  39. //log.Printf("Command: %v Reply: %v\n",
  40. // startedCommands[evt.RequestID],
  41. // evt.Reply,
  42. //)
  43. var commands bson.Raw
  44. v, ok := startedCommands.Load(evt.RequestID)
  45. if ok {
  46. commands = v.(bson.Raw)
  47. }
  48. //global.MONGO_LOG.Info("\n【MongoDB】[%.3fms] [%v] %v \n", float64(evt.Duration)/1e6, commands, evt.Reply)
  49. global.MONGO_LOG.Info("\n【MongoDB】[%.3fms] [%v] \n", float64(evt.Duration)/1e6, commands)
  50. },
  51. Failed: func(_ context.Context, evt *event.CommandFailedEvent) {
  52. //log.Printf("Command: %v Failure: %v\n",
  53. // startedCommands[evt.RequestID],
  54. // evt.Failure,
  55. //)
  56. var commands bson.Raw
  57. v, ok := startedCommands.Load(evt.RequestID)
  58. if ok {
  59. commands = v.(bson.Raw)
  60. }
  61. global.MONGO_LOG.Info("\n【MongoDB】[%.3fms] [%v] \n %v \n", float64(evt.Duration)/1e6, commands, evt.Failure)
  62. },
  63. }
  64. // 创建options
  65. ops := options.ClientOptions{ClientOptions: &mgoptions.ClientOptions{}}
  66. ops.SetMonitor(cmdMonitor)
  67. ctx := context.Background()
  68. //mongodb+srv://myDatabaseUser:D1fficultP%40ssw0rd@mongodb0.example.com/?authSource=admin&replicaSet=myRepl
  69. // 创建一个数据库链接
  70. client, err := qmgo.NewClient(ctx, &qmgo.Config{
  71. Uri: mgoConfig.Url,
  72. Auth: &qmgo.Credential{
  73. AuthMechanism: mgoConfig.AuthMechanism,
  74. AuthSource: mgoConfig.AuthSource,
  75. Username: mgoConfig.Username,
  76. Password: mgoConfig.Password,
  77. //PasswordSet: false,
  78. },
  79. Database: mgoConfig.Database,
  80. ConnectTimeoutMS: &ConnectTimeoutMS,
  81. SocketTimeoutMS: &SocketTimeoutMS,
  82. MaxPoolSize: &MaxPoolSize,
  83. MinPoolSize: &MinPoolSize,
  84. }, ops)
  85. if err != nil {
  86. panic("MongoDB连接异常:" + err.Error())
  87. }
  88. return client
  89. }
  90. func MgoGetColl(collName string) (cli *qmgo.QmgoClient, err error) {
  91. ctx := context.Background()
  92. config := &qmgo.Config{
  93. //Uri: MGO_URL,
  94. //Database: MGO_DB,
  95. Coll: collName,
  96. ConnectTimeoutMS: &ConnectTimeoutMS,
  97. SocketTimeoutMS: &SocketTimeoutMS,
  98. MaxPoolSize: &MaxPoolSize,
  99. MinPoolSize: &MinPoolSize,
  100. }
  101. cli, err = qmgo.Open(ctx, config)
  102. return
  103. }
  104. func InitMongo() {
  105. if global.CONFIG.Mongo.Url != `` {
  106. mgoCli := MgoNewClient()
  107. global.MgoDataCli = mgoCli
  108. }
  109. }