Documentation ¶
Overview ¶
Package topology contains types that handles the discovery, monitoring, and selection of servers. This package is designed to expose enough inner workings of service discovery and monitoring to allow low level applications to have fine grained control, while hiding most of the detailed implementation of the algorithms.
Index ¶
- Variables
- type Connection
- func (c *Connection) Address() address.Address
- func (c *Connection) Alive() bool
- func (c *Connection) Close() error
- func (c *Connection) CompressWireMessage(src, dst []byte) ([]byte, error)
- func (c *Connection) Description() description.Server
- func (c *Connection) Expire() error
- func (c *Connection) ID() string
- func (c *Connection) LocalAddress() address.Address
- func (c *Connection) ReadWireMessage(ctx context.Context, dst []byte) ([]byte, error)
- func (c *Connection) WriteWireMessage(ctx context.Context, wm []byte) error
- type ConnectionError
- type ConnectionOption
- func WithCompressors(fn func([]string) []string) ConnectionOption
- func WithConnectTimeout(fn func(time.Duration) time.Duration) ConnectionOption
- func WithConnectionAppName(fn func(string) string) ConnectionOption
- func WithDialer(fn func(Dialer) Dialer) ConnectionOption
- func WithDisableOCSPEndpointCheck(fn func(bool) bool) ConnectionOption
- func WithHandshaker(fn func(Handshaker) Handshaker) ConnectionOption
- func WithIdleTimeout(fn func(time.Duration) time.Duration) ConnectionOption
- func WithLifeTimeout(fn func(time.Duration) time.Duration) ConnectionOption
- func WithMonitor(fn func(*event.CommandMonitor) *event.CommandMonitor) ConnectionOption
- func WithOCSPCache(fn func(ocsp.Cache) ocsp.Cache) ConnectionOption
- func WithReadTimeout(fn func(time.Duration) time.Duration) ConnectionOption
- func WithTLSConfig(fn func(*tls.Config) *tls.Config) ConnectionOption
- func WithWriteTimeout(fn func(time.Duration) time.Duration) ConnectionOption
- func WithZlibLevel(fn func(*int) *int) ConnectionOption
- func WithZstdLevel(fn func(*int) *int) ConnectionOption
- type Dialer
- type DialerFunc
- type Handshaker
- type MonitorMode
- type Option
- func WithConnString(fn func(connstring.ConnString) connstring.ConnString) Option
- func WithMode(fn func(MonitorMode) MonitorMode) Option
- func WithReplicaSetName(fn func(string) string) Option
- func WithSeedList(fn func(...string) []string) Option
- func WithServerOptions(fn func(...ServerOption) []ServerOption) Option
- func WithServerSelectionTimeout(fn func(time.Duration) time.Duration) Option
- func WithURI(fn func(string) string) Option
- type PoolError
- type SelectedServer
- type Server
- func (s *Server) Connect(updateCallback func(description.Server)) error
- func (s *Server) Connection(ctx context.Context) (driver.Connection, error)
- func (s *Server) Description() description.Server
- func (s *Server) Disconnect(ctx context.Context) error
- func (s *Server) ProcessError(err error)
- func (s *Server) RequestImmediateCheck()
- func (s *Server) SelectedDescription() description.SelectedServer
- func (s *Server) String() string
- func (s *Server) Subscribe() (*ServerSubscription, error)
- type ServerOption
- func WithClock(fn func(clock *session.ClusterClock) *session.ClusterClock) ServerOption
- func WithCompressionOptions(fn func(...string) []string) ServerOption
- func WithConnectionOptions(fn func(...ConnectionOption) []ConnectionOption) ServerOption
- func WithConnectionPoolMaxIdleTime(fn func(time.Duration) time.Duration) ServerOption
- func WithConnectionPoolMonitor(fn func(*event.PoolMonitor) *event.PoolMonitor) ServerOption
- func WithHeartbeatInterval(fn func(time.Duration) time.Duration) ServerOption
- func WithHeartbeatTimeout(fn func(time.Duration) time.Duration) ServerOption
- func WithMaxConnections(fn func(uint64) uint64) ServerOption
- func WithMinConnections(fn func(uint64) uint64) ServerOption
- func WithRegistry(fn func(*bsoncodec.Registry) *bsoncodec.Registry) ServerOption
- func WithServerAppName(fn func(string) string) ServerOption
- type ServerSubscription
- type Topology
- func (t *Topology) Connect() error
- func (t *Topology) Description() description.Topology
- func (t *Topology) Disconnect(ctx context.Context) error
- func (t *Topology) FindServer(selected description.Server) (*SelectedServer, error)
- func (t *Topology) Kind() description.TopologyKind
- func (t *Topology) RequestImmediateCheck()
- func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelector) (driver.Server, error)
- func (t *Topology) SelectServerLegacy(ctx context.Context, ss description.ServerSelector) (*SelectedServer, error)
- func (t *Topology) String() string
- func (t *Topology) Subscribe() (*driver.Subscription, error)
- func (t *Topology) SupportsRetryWrites() bool
- func (t *Topology) SupportsSessions() bool
- func (t *Topology) Unsubscribe(sub *driver.Subscription) error
Constants ¶
This section is empty.
Variables ¶
var ErrConnectionClosed = ConnectionError{ConnectionID: "<closed>", /* contains filtered or unexported fields */}
ErrConnectionClosed is returned from an attempt to use an already closed connection.
var ErrPoolConnected = PoolError("attempted to Connect to an already connected pool")
ErrPoolConnected is returned from an attempt to connect an already connected pool
var ErrPoolDisconnected = PoolError("attempted to check out a connection from closed connection pool")
ErrPoolDisconnected is returned from an attempt to Close an already disconnected or disconnecting pool.
var ErrServerClosed = errors.New("server is closed")
ErrServerClosed occurs when an attempt to Get a connection is made after the server has been closed.
var ErrServerConnected = errors.New("server is connected")
ErrServerConnected occurs when at attempt to Connect is made after a server has already been connected.
var ErrServerSelectionTimeout = errors.New("server selection timeout")
ErrServerSelectionTimeout is returned from server selection when the server selection process took longer than allowed by the timeout.
var ErrSubscribeAfterClosed = errors.New("cannot subscribe after closeConnection")
ErrSubscribeAfterClosed is returned when a user attempts to subscribe to a closed Server or Topology.
var ErrTopologyClosed = errors.New("topology is closed")
ErrTopologyClosed is returned when a user attempts to call a method on a closed Topology.
var ErrTopologyConnected = errors.New("topology is connected or connecting")
ErrTopologyConnected is returned whena user attempts to Connect to an already connected Topology.
var ErrWaitQueueTimeout = PoolError("timed out while checking out a connection from connection pool")
ErrWaitQueueTimeout is returned when the request to get a connection from the pool timesout when on the wait queue
var ErrWrongPool = PoolError("connection does not belong to this pool")
ErrWrongPool is return when a connection is returned to a pool it doesn't belong to.
Functions ¶
This section is empty.
Types ¶
type Connection ¶ added in v1.1.0
type Connection struct {
// contains filtered or unexported fields
}
Connection implements the driver.Connection interface to allow reading and writing wire messages and the driver.Expirable interface to allow expiring.
func (*Connection) Address ¶ added in v1.1.0
func (c *Connection) Address() address.Address
Address returns the address of this connection.
func (*Connection) Alive ¶ added in v1.1.0
func (c *Connection) Alive() bool
Alive returns if the connection is still alive.
func (*Connection) Close ¶ added in v1.1.0
func (c *Connection) Close() error
Close returns this connection to the connection pool. This method may not closeConnection the underlying socket.
func (*Connection) CompressWireMessage ¶ added in v1.1.0
func (c *Connection) CompressWireMessage(src, dst []byte) ([]byte, error)
CompressWireMessage handles compressing the provided wire message using the underlying connection's compressor. The dst parameter will be overwritten with the new wire message. If there is no compressor set on the underlying connection, then no compression will be performed.
func (*Connection) Description ¶ added in v1.1.0
func (c *Connection) Description() description.Server
Description returns the server description of the server this connection is connected to.
func (*Connection) Expire ¶ added in v1.1.0
func (c *Connection) Expire() error
Expire closes this connection and will closeConnection the underlying socket.
func (*Connection) ID ¶ added in v1.1.0
func (c *Connection) ID() string
ID returns the ID of this connection.
func (*Connection) LocalAddress ¶ added in v1.1.0
func (c *Connection) LocalAddress() address.Address
LocalAddress returns the local address of the connection
func (*Connection) ReadWireMessage ¶ added in v1.1.0
ReadWireMessage handles reading a wire message from the underlying connection. The dst parameter will be overwritten with the new wire message.
func (*Connection) WriteWireMessage ¶ added in v1.1.0
func (c *Connection) WriteWireMessage(ctx context.Context, wm []byte) error
WriteWireMessage handles writing a wire message to the underlying connection.
type ConnectionError ¶ added in v1.1.0
type ConnectionError struct { ConnectionID string Wrapped error // contains filtered or unexported fields }
ConnectionError represents a connection error.
func (ConnectionError) Error ¶ added in v1.1.0
func (e ConnectionError) Error() string
Error implements the error interface.
type ConnectionOption ¶ added in v1.1.0
type ConnectionOption func(*connectionConfig) error
ConnectionOption is used to configure a connection.
func WithCompressors ¶ added in v1.1.0
func WithCompressors(fn func([]string) []string) ConnectionOption
WithCompressors sets the compressors that can be used for communication.
func WithConnectTimeout ¶ added in v1.1.0
func WithConnectTimeout(fn func(time.Duration) time.Duration) ConnectionOption
WithConnectTimeout configures the maximum amount of time a dial will wait for a Connect to complete. The default is 30 seconds.
func WithConnectionAppName ¶ added in v1.2.0
func WithConnectionAppName(fn func(string) string) ConnectionOption
WithConnectionAppName sets the application name which gets sent to MongoDB when it first connects.
func WithDialer ¶ added in v1.1.0
func WithDialer(fn func(Dialer) Dialer) ConnectionOption
WithDialer configures the Dialer to use when making a new connection to MongoDB.
func WithDisableOCSPEndpointCheck ¶ added in v1.4.0
func WithDisableOCSPEndpointCheck(fn func(bool) bool) ConnectionOption
WithDisableOCSPEndpointCheck specifies whether or the driver should perform non-stapled OCSP verification. If set to true, the driver will only check stapled responses and will continue the connection without reaching out to OCSP responders.
func WithHandshaker ¶ added in v1.1.0
func WithHandshaker(fn func(Handshaker) Handshaker) ConnectionOption
WithHandshaker configures the Handshaker that wll be used to initialize newly dialed connections.
func WithIdleTimeout ¶ added in v1.1.0
func WithIdleTimeout(fn func(time.Duration) time.Duration) ConnectionOption
WithIdleTimeout configures the maximum idle time to allow for a connection.
func WithLifeTimeout ¶ added in v1.1.0
func WithLifeTimeout(fn func(time.Duration) time.Duration) ConnectionOption
WithLifeTimeout configures the maximum life of a connection.
func WithMonitor ¶ added in v1.1.0
func WithMonitor(fn func(*event.CommandMonitor) *event.CommandMonitor) ConnectionOption
WithMonitor configures a event for command monitoring.
func WithOCSPCache ¶ added in v1.4.0
func WithOCSPCache(fn func(ocsp.Cache) ocsp.Cache) ConnectionOption
WithOCSPCache specifies a cache to use for OCSP verification.
func WithReadTimeout ¶ added in v1.1.0
func WithReadTimeout(fn func(time.Duration) time.Duration) ConnectionOption
WithReadTimeout configures the maximum read time for a connection.
func WithTLSConfig ¶ added in v1.1.0
func WithTLSConfig(fn func(*tls.Config) *tls.Config) ConnectionOption
WithTLSConfig configures the TLS options for a connection.
func WithWriteTimeout ¶ added in v1.1.0
func WithWriteTimeout(fn func(time.Duration) time.Duration) ConnectionOption
WithWriteTimeout configures the maximum write time for a connection.
func WithZlibLevel ¶ added in v1.1.0
func WithZlibLevel(fn func(*int) *int) ConnectionOption
WithZlibLevel sets the zLib compression level.
func WithZstdLevel ¶ added in v1.2.0
func WithZstdLevel(fn func(*int) *int) ConnectionOption
WithZstdLevel sets the zstd compression level.
type Dialer ¶ added in v1.1.0
type Dialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}
Dialer is used to make network connections.
DefaultDialer is the Dialer implementation that is used by this package. Changing this will also change the Dialer used for this package. This should only be changed why all of the connections being made need to use a different Dialer. Most of the time, using a WithDialer option is more appropriate than changing this variable.
type DialerFunc ¶ added in v1.1.0
DialerFunc is a type implemented by functions that can be used as a Dialer.
func (DialerFunc) DialContext ¶ added in v1.1.0
DialContext implements the Dialer interface.
type Handshaker ¶ added in v1.1.0
type Handshaker = driver.Handshaker
Handshaker is the interface implemented by types that can perform a MongoDB handshake over a provided driver.Connection. This is used during connection initialization. Implementations must be goroutine safe.
type MonitorMode ¶
type MonitorMode uint8
MonitorMode represents the way in which a server is monitored.
const ( AutomaticMode MonitorMode = iota SingleMode )
These constants are the available monitoring modes.
type Option ¶
type Option func(*config) error
Option is a configuration option for a topology.
func WithConnString ¶
func WithConnString(fn func(connstring.ConnString) connstring.ConnString) Option
WithConnString configures the topology using the connection string.
func WithMode ¶
func WithMode(fn func(MonitorMode) MonitorMode) Option
WithMode configures the topology's monitor mode.
func WithReplicaSetName ¶
WithReplicaSetName configures the topology's default replica set name.
func WithSeedList ¶
WithSeedList configures a topology's seed list.
func WithServerOptions ¶
func WithServerOptions(fn func(...ServerOption) []ServerOption) Option
WithServerOptions configures a topology's server options for when a new server needs to be created.
func WithServerSelectionTimeout ¶
WithServerSelectionTimeout configures a topology's server selection timeout. A server selection timeout of 0 means there is no timeout for server selection.
type PoolError ¶ added in v1.1.0
type PoolError string
PoolError is an error returned from a Pool method.
type SelectedServer ¶
type SelectedServer struct { *Server Kind description.TopologyKind }
SelectedServer represents a specific server that was selected during server selection. It contains the kind of the topology it was selected from.
func (*SelectedServer) Description ¶
func (ss *SelectedServer) Description() description.SelectedServer
Description returns a description of the server as of the last heartbeat.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is a single server within a topology.
func ConnectServer ¶
func ConnectServer(addr address.Address, updateCallback func(description.Server), opts ...ServerOption) (*Server, error)
ConnectServer creates a new Server and then initializes it using the Connect method.
func NewServer ¶
func NewServer(addr address.Address, opts ...ServerOption) (*Server, error)
NewServer creates a new server. The mongodb server at the address will be monitored on an internal monitoring goroutine.
func (*Server) Connect ¶
func (s *Server) Connect(updateCallback func(description.Server)) error
Connect initializes the Server by starting background monitoring goroutines. This method must be called before a Server can be used.
func (*Server) Connection ¶
Connection gets a connection to the server.
func (*Server) Description ¶
func (s *Server) Description() description.Server
Description returns a description of the server as of the last heartbeat.
func (*Server) Disconnect ¶
Disconnect closes sockets to the server referenced by this Server. Subscriptions to this Server will be closed. Disconnect will shutdown any monitoring goroutines, closeConnection the idle connection pool, and will wait until all the in use connections have been returned to the connection pool and are closed before returning. If the context expires via cancellation, deadline, or timeout before the in use connections have been returned, the in use connections will be closed, resulting in the failure of any in flight read or write operations. If this method returns with no errors, all connections associated with this Server have been closed.
func (*Server) ProcessError ¶ added in v1.1.0
ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
func (*Server) RequestImmediateCheck ¶
func (s *Server) RequestImmediateCheck()
RequestImmediateCheck will cause the server to send a heartbeat immediately instead of waiting for the heartbeat timeout.
func (*Server) SelectedDescription ¶
func (s *Server) SelectedDescription() description.SelectedServer
SelectedDescription returns a description.SelectedServer with a Kind of Single. This can be used when performing tasks like monitoring a batch of servers and you want to run one off commands against those servers.
func (*Server) Subscribe ¶
func (s *Server) Subscribe() (*ServerSubscription, error)
Subscribe returns a ServerSubscription which has a channel on which all updated server descriptions will be sent. The channel will have a buffer size of one, and will be pre-populated with the current description.
type ServerOption ¶
type ServerOption func(*serverConfig) error
ServerOption configures a server.
func WithClock ¶
func WithClock(fn func(clock *session.ClusterClock) *session.ClusterClock) ServerOption
WithClock configures the ClusterClock for the server to use.
func WithCompressionOptions ¶
func WithCompressionOptions(fn func(...string) []string) ServerOption
WithCompressionOptions configures the server's compressors.
func WithConnectionOptions ¶
func WithConnectionOptions(fn func(...ConnectionOption) []ConnectionOption) ServerOption
WithConnectionOptions configures the server's connections.
func WithConnectionPoolMaxIdleTime ¶ added in v1.1.0
func WithConnectionPoolMaxIdleTime(fn func(time.Duration) time.Duration) ServerOption
WithConnectionPoolMaxIdleTime configures the maximum time that a connection can remain idle in the connection pool before being removed. If connectionPoolMaxIdleTime is 0, then no idle time is set and connections will not be removed because of their age
func WithConnectionPoolMonitor ¶ added in v1.1.0
func WithConnectionPoolMonitor(fn func(*event.PoolMonitor) *event.PoolMonitor) ServerOption
WithConnectionPoolMonitor configures the monitor for all connection pool actions
func WithHeartbeatInterval ¶
func WithHeartbeatInterval(fn func(time.Duration) time.Duration) ServerOption
WithHeartbeatInterval configures a server's heartbeat interval.
func WithHeartbeatTimeout ¶
func WithHeartbeatTimeout(fn func(time.Duration) time.Duration) ServerOption
WithHeartbeatTimeout configures how long to wait for a heartbeat socket to connection.
func WithMaxConnections ¶
func WithMaxConnections(fn func(uint64) uint64) ServerOption
WithMaxConnections configures the maximum number of connections to allow for a given server. If max is 0, then the default will be math.MaxInt64.
func WithMinConnections ¶ added in v1.1.0
func WithMinConnections(fn func(uint64) uint64) ServerOption
WithMinConnections configures the minimum number of connections to allow for a given server. If min is 0, then there is no lower limit to the number of connections.
func WithRegistry ¶
func WithRegistry(fn func(*bsoncodec.Registry) *bsoncodec.Registry) ServerOption
WithRegistry configures the registry for the server to use when creating cursors.
func WithServerAppName ¶ added in v1.2.0
func WithServerAppName(fn func(string) string) ServerOption
WithServerAppName configures the server's application name.
type ServerSubscription ¶
type ServerSubscription struct { C <-chan description.Server // contains filtered or unexported fields }
ServerSubscription represents a subscription to the description.Server updates for a specific server.
func (*ServerSubscription) Unsubscribe ¶
func (ss *ServerSubscription) Unsubscribe() error
Unsubscribe unsubscribes this ServerSubscription from updates and closes the subscription channel.
type Topology ¶
type Topology struct {
// contains filtered or unexported fields
}
Topology represents a MongoDB deployment.
func (*Topology) Connect ¶
Connect initializes a Topology and starts the monitoring process. This function must be called to properly monitor the topology.
func (*Topology) Description ¶
func (t *Topology) Description() description.Topology
Description returns a description of the topology.
func (*Topology) Disconnect ¶
Disconnect closes the topology. It stops the monitoring thread and closes all open subscriptions.
func (*Topology) FindServer ¶
func (t *Topology) FindServer(selected description.Server) (*SelectedServer, error)
FindServer will attempt to find a server that fits the given server description. This method will return nil, nil if a matching server could not be found.
func (*Topology) Kind ¶ added in v1.1.0
func (t *Topology) Kind() description.TopologyKind
Kind returns the topology kind of this Topology.
func (*Topology) RequestImmediateCheck ¶
func (t *Topology) RequestImmediateCheck()
RequestImmediateCheck will send heartbeats to all the servers in the topology right away, instead of waiting for the heartbeat timeout.
func (*Topology) SelectServer ¶
func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelector) (driver.Server, error)
SelectServer selects a server with given a selector. SelectServer complies with the server selection spec, and will time out after severSelectionTimeout or when the parent context is done.
func (*Topology) SelectServerLegacy ¶ added in v1.1.0
func (t *Topology) SelectServerLegacy(ctx context.Context, ss description.ServerSelector) (*SelectedServer, error)
SelectServerLegacy selects a server with given a selector. SelectServerLegacy complies with the server selection spec, and will time out after severSelectionTimeout or when the parent context is done.
func (*Topology) Subscribe ¶
func (t *Topology) Subscribe() (*driver.Subscription, error)
Subscribe returns a Subscription on which all updated description.Topologys will be sent. The channel of the subscription will have a buffer size of one, and will be pre-populated with the current description.Topology. Subscribe implements the driver.Subscriber interface.
func (*Topology) SupportsRetryWrites ¶ added in v1.1.0
SupportsRetryWrites returns true if the topology supports retryable writes, which it does if it supports sessions.
func (*Topology) SupportsSessions ¶
SupportsSessions returns true if the topology supports sessions.
func (*Topology) Unsubscribe ¶ added in v1.2.0
func (t *Topology) Unsubscribe(sub *driver.Subscription) error
Unsubscribe unsubscribes the given subscription from the topology and closes the subscription channel. Unsubscribe implements the driver.Subscriber interface.