mongodb

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2024 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 1 more Imports: 23 Imported by: 0

README

English | 中文

tRPC-Go mongodb plugin

BK Pipelines Status

Base on community mongo, used with trpc.

mongodb client

client:                                            # Backend configuration for client calls.
  service:                                         # Configuration for the backend.
    - name: trpc.mongodb.xxx.xxx         
      target: mongodb://user:passwd@vip:port       # mongodb standard uri:mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
      timeout: 800                                 # The maximum processing time of the current request.
    - name: trpc.mongodb.xxx.xxx1         
      target: mongodb+polaris://user:passwd@polaris_name  # mongodb+polaris means that the host in the mongodb uri will perform Polaris analysis.
      timeout: 800                                        # The maximum processing time of the current request.
package main

import (
	"context"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"trpc.group/trpc-go/trpc-database/mongodb"
	"trpc.group/trpc-go/trpc-go/log"
)

// BattleFlow is battle information.
type BattleInfo struct {
	Id    string `bson:"_id,omitempty"`
	Ctime uint32 `bson:"ctime,omitempty" json:"ctime,omitempty"`
}

func (s *server) SayHello(ctx context.Context, req *pb.ReqBody, rsp *pb.RspBody) (err error) {
	proxy := mongodb.NewClientProxy("trpc.mongodb.xxx.xxx") // Your custom service name,used for monitoring, reporting and mapping configuration.

	// mongodb insert
	_, err = proxy.InsertOne(sc, "database", "table", bson.M{"_id": "key2", "value": "v2"})

	// mongodb ReplaceOne
	opts := options.Replace().SetUpsert(true)
	filter := bson.D{{"_id", "key1"}}
	_, err := proxy.ReplaceOne(ctx, "database", "table", filter, &BattleInfo{}, opts)
	if err != nil {
		log.Errorf("err=%v, data=%v", err, *battleInfo)
		return err
	}

	// mongodb FindOne
	rst := proxy.FindOne(ctx, "database", "table", bson.D{{"_id", "key1"}})
	battleInfo = &BattleInfo{}
	err = rst.Decode(battleInfo)
	if err != nil {
		return nil, err
	}

	// mongodb transaction
	err = proxy.Transaction(ctx, func(sc mongo.SessionContext) error {
		// The same proxy instance needs to be used during transaction execution.
		_, tErr := proxy.InsertOne(sc, "database", "table", bson.M{"_id": "key1", "value": "v1"})
		if tErr != nil {
			return tErr
		}
		_, tErr = proxy.InsertOne(sc, "database", "table", bson.M{"_id": "key2", "value": "v2"})
		if tErr != nil {
			return tErr
		}
		return nil
	}, nil)

	// mongodb RunCommand
	cmdDB := bson.D{}
	cmdDB = append(cmdDB, bson.E{Key: "enableSharding", Value: "dbName"})
	err = proxy.RunCommand(ctx, "admin", cmdDB).Err()
	if err != nil {
		return nil, err
	}

	cmdColl := bson.D{}
	cmdColl = append(cmdColl, bson.E{Key: "shardCollection", Value: "dbName.collectionName"})
	cmdColl = append(cmdColl, bson.E{Key: "key", Value: bson.D{{"openId", "hashed"}}})
	cmdColl = append(cmdColl, bson.E{Key: "unique", Value: false})
	cmdColl = append(cmdColl, bson.E{Key: "numInitialChunks", Value: 10})
	err = proxy.RunCommand(ctx, "admin", cmdColl).Err()
	if err != nil {
		return nil, err
	}
	// Business logic.
}

Frequently Asked Questions (FAQs)

  • Q1: How to configure ClientOptions:
  • A1: When creating a Transport, you can use WithOptionInterceptor to configure ClientOptions. You can refer to options_test.go for more information.

Documentation

Overview

Package mongodb encapsulates standard library mongodb.

Index

Constants

View Source
const (
	RetDuplicateKeyErr = 11000 // key conflict error
)

error code, refer: https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml

Variables

View Source
var (
	Find              = "find"
	FindOne           = "findone"
	FindOneAndReplace = "findoneandreplace"
	FindC             = "findc" // Return mongo.Cursor type interface, use cursor.All/Decode to parse to structure.
	DeleteOne         = "deleteone"
	DeleteMany        = "deletemany"
	FindOneAndDelete  = "findoneanddelete"
	FindOneAndUpdate  = "findoneandupdate"
	FindOneAndUpdateS = "findoneandupdates" // Return mongo.SingleResult type interface,
	// use Decode to parse to structure.
	InsertOne  = "insertone"
	InsertMany = "insertmany"
	UpdateOne  = "updateone"
	UpdateMany = "updatemany"
	ReplaceOne = "replaceone"
	Count      = "count"
	Aggregate  = "aggregate"  // Polymerization
	AggregateC = "aggregatec" // Return mongo.Cursor type interface,
	// use cursor.All/Decode to parse to structure.
	Distinct               = "distinct"
	BulkWrite              = "bulkwrite"
	CountDocuments         = "countdocuments"
	EstimatedDocumentCount = "estimateddocumentcount"
	Watch                  = "watch"
	WatchDatabase          = "watchdatabase"
	WatchCollection        = "watchcollection"
	Transaction            = "transaction"
	Disconnect             = "disconnect"
	RunCommand             = "runcommand"      // Execute commands sequentially
	IndexCreateOne         = "indexcreateone"  // Create index
	IndexCreateMany        = "indexcreatemany" // Create indexes in batches
	IndexDropOne           = "indexdropone"    // Delete index
	IndexDropAll           = "indexdropall"    // Delete all indexes
	Indexes                = "indexes"         // Get the original index object
	DatabaseCmd            = "database"        // Get the original database
	CollectionCmd          = "collection"      // Get the original collection
	StartSession           = "startsession"    // Create a new Session and SessionContext
)

mongodb cmd definition

View Source
var DefaultClientTransport = NewMongoTransport()

DefaultClientTransport is a default client mongodb transport.

Functions

func IsDuplicateKeyError

func IsDuplicateKeyError(err error) bool

IsDuplicateKeyError handles whether it is a key conflict error.

func NewClientTransport

func NewClientTransport(opt ...transport.ClientTransportOption) transport.ClientTransport

NewClientTransport creates a mongodb transport. Deprecated,use NewMongoTransport instead.

func NewInsertOneModel

func NewInsertOneModel() *mongo.InsertOneModel

NewInsertOneModel creates a new InsertOneModel. InsertOneModel is used to insert a single document in a BulkWrite operation.

func NewMongoTransport

func NewMongoTransport(opt ...ClientTransportOption) transport.ClientTransport

NewMongoTransport creates a mongodb transport.

func NewReplaceOneModel

func NewReplaceOneModel() *mongo.ReplaceOneModel

NewReplaceOneModel creates a new ReplaceOneModel. ReplaceOneModel is used to replace at most one document in a BulkWrite operation.

func NewSessionContext

func NewSessionContext(ctx context.Context, sess mongo.Session) mongo.SessionContext

NewSessionContext creates a new SessionContext associated with the given Context and Session parameters.

func NewUpdateManyModel

func NewUpdateManyModel() *mongo.UpdateManyModel

NewUpdateManyModel creates a new UpdateManyModel. UpdateManyModel is used to update multiple documents in a BulkWrite operation.

func NewUpdateOneModel

func NewUpdateOneModel() *mongo.UpdateOneModel

NewUpdateOneModel creates a new UpdateOneModel. UpdateOneModel is used to update at most one document in a BulkWrite operation.

Types

type Client

type Client interface {
	BulkWrite(ctx context.Context, database string, coll string, models []mongo.WriteModel,
		opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
	InsertOne(ctx context.Context, database string, coll string, document interface{},
		opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
	InsertMany(ctx context.Context, database string, coll string, documents []interface{},
		opts ...*options.InsertManyOptions) (*mongo.InsertManyResult, error)
	DeleteOne(ctx context.Context, database string, coll string, filter interface{},
		opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
	DeleteMany(ctx context.Context, database string, coll string, filter interface{},
		opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
	UpdateOne(ctx context.Context, database string, coll string, filter interface{}, update interface{},
		opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	UpdateMany(ctx context.Context, database string, coll string, filter interface{}, update interface{},
		opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	ReplaceOne(ctx context.Context, database string, coll string, filter interface{},
		replacement interface{}, opts ...*options.ReplaceOptions) (*mongo.UpdateResult, error)
	Aggregate(ctx context.Context, database string, coll string, pipeline interface{},
		opts ...*options.AggregateOptions) (*mongo.Cursor, error)
	CountDocuments(ctx context.Context, database string, coll string, filter interface{},
		opts ...*options.CountOptions) (int64, error)
	EstimatedDocumentCount(ctx context.Context, database string, coll string,
		opts ...*options.EstimatedDocumentCountOptions) (int64, error)
	Distinct(ctx context.Context, database string, coll string, fieldName string, filter interface{},
		opts ...*options.DistinctOptions) ([]interface{}, error)
	Find(ctx context.Context, database string, coll string, filter interface{},
		opts ...*options.FindOptions) (*mongo.Cursor, error)
	FindOne(ctx context.Context, database string, coll string, filter interface{},
		opts ...*options.FindOneOptions) *mongo.SingleResult
	FindOneAndDelete(ctx context.Context, database string, coll string, filter interface{},
		opts ...*options.FindOneAndDeleteOptions) *mongo.SingleResult
	FindOneAndReplace(ctx context.Context, database string, coll string, filter interface{},
		replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *mongo.SingleResult
	FindOneAndUpdate(ctx context.Context, database string, coll string, filter interface{},
		update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
	Watch(ctx context.Context, pipeline interface{},
		opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
	WatchDatabase(ctx context.Context, database string, pipeline interface{},
		opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
	WatchCollection(ctx context.Context, database string, collection string, pipeline interface{},
		opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
	Transaction(ctx context.Context, sf TxFunc, tOpts []*options.TransactionOptions,
		opts ...*options.SessionOptions) error
	Disconnect(ctx context.Context) error
	RunCommand(ctx context.Context, database string, runCommand interface{},
		opts ...*options.RunCmdOptions) *mongo.SingleResult
	Indexes(ctx context.Context, database string, collection string) (mongo.IndexView, error)
	Database(ctx context.Context, database string) (*mongo.Database, error)
	Collection(ctx context.Context, database string, collection string) (*mongo.Collection, error)
	StartSession(ctx context.Context) (mongo.Session, error)
	//Deprecated
	Do(ctx context.Context, cmd string, db string, coll string, args map[string]interface{}) (interface{}, error)
}

Client is mongodb request interface.

func NewClientProxy

func NewClientProxy(name string, opts ...client.Option) Client

NewClientProxy creates a new mongo backend request proxy. The required parameter mongo service name: trpc.mongo.xxx.xxx.

type ClientCodec

type ClientCodec struct{}

ClientCodec decodes mongodb client request.

func (*ClientCodec) Decode

func (c *ClientCodec) Decode(msg codec.Msg, _ []byte) ([]byte, error)

Decode parses the metadata in the mongodb client return packet.

func (*ClientCodec) Encode

func (c *ClientCodec) Encode(msg codec.Msg, _ []byte) ([]byte, error)

Encode sets the metadata requested by the mongodb client.

type ClientTransport

type ClientTransport struct {
	MaxOpenConns    uint64
	MinOpenConns    uint64
	MaxConnIdleTime time.Duration
	ReadPreference  *readpref.ReadPref
	ServiceNameURIs map[string][]string
	// contains filtered or unexported fields
}

ClientTransport is a client-side mongodb transport.

func (*ClientTransport) GetMgoClient

func (ct *ClientTransport) GetMgoClient(ctx context.Context, dsn string) (*mongo.Client, error)

GetMgoClient obtains mongodb client, cache dsn=>client, save some initialization steps such as reparsing parameters, generating topology server, etc.

func (*ClientTransport) RoundTrip

func (ct *ClientTransport) RoundTrip(ctx context.Context, _ []byte,
	callOpts ...transport.RoundTripOption) (rspBytes []byte,
	err error)

RoundTrip sends and receives mongodb packets, returns the mongodb response and puts it in ctx, there is no need to return rspbuf here.

type ClientTransportOption

type ClientTransportOption func(opt *ClientTransport)

ClientTransportOption sets client transport parameter.

func WithOptionInterceptor

func WithOptionInterceptor(f func(dsn string, opts *options.ClientOptions)) ClientTransportOption

WithOptionInterceptor returns an ClientTransportOption which sets mongo client option interceptor

type Config

type Config struct {
	MinOpen        uint64        `yaml:"min_open"`        // Minimum number of simultaneous online connections
	MaxOpen        uint64        `yaml:"max_open"`        // The maximum number of simultaneous online connections
	MaxIdleTime    time.Duration `yaml:"max_idle_time"`   // Maximum idle time per link
	ReadPreference string        `yaml:"read_preference"` // reference on read
}

Config mongo is a proxy configuration structure declaration.

type IndexViewer

type IndexViewer interface {
	// CreateMany creates the interface definition of the index.
	CreateMany(ctx context.Context, database string, coll string,
		models []mongo.IndexModel, opts ...*options.CreateIndexesOptions) ([]string, error)
	CreateOne(ctx context.Context, database string, coll string,
		model mongo.IndexModel, opts ...*options.CreateIndexesOptions) (string, error)
	DropOne(ctx context.Context, database string, coll string,
		name string, opts ...*options.DropIndexesOptions) (bson.Raw, error)
	DropAll(ctx context.Context, database string, coll string,
		opts ...*options.DropIndexesOptions) (bson.Raw, error)
}

IndexViewer is the interface definition of the index. Refer to the naming of the community open source library, define the index interface separately, and divide the interface according to the function.

type Request

type Request struct {
	Command    string
	Database   string
	Collection string
	Arguments  map[string]interface{}

	DriverProxy bool        //driver transparent transmission
	Filter      interface{} //driver filter
	CommArg     interface{} //general parameters
	Opts        interface{} //option parameter
}

Request mongodb request body

type Response

type Response struct {
	Result interface{}
	// contains filtered or unexported fields
}

Response mongodb response body

type TxFunc

type TxFunc func(sc mongo.SessionContext) error

TxFunc mongo is a transaction logic function, if an error is returned, it will be rolled back.

Directories

Path Synopsis
Package mockmongodb is a generated GoMock package.
Package mockmongodb is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL