topology

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package topology contains types that handles the discovery, monitoring, and selection of servers. This package is designed to expose enough inner workings of service discovery and monitoring to allow low level applications to have fine grained control, while hiding most of the detailed implementation of the algorithms.

Index

Constants

This section is empty.

Variables

View Source
var ErrConnectionClosed = ConnectionError{ConnectionID: "<closed>", /* contains filtered or unexported fields */}

ErrConnectionClosed is returned from an attempt to use an already closed connection.

View Source
var ErrPoolConnected = PoolError("attempted to Connect to an already connected pool")

ErrPoolConnected is returned from an attempt to connect an already connected pool

View Source
var ErrPoolDisconnected = PoolError("attempted to check out a connection from closed connection pool")

ErrPoolDisconnected is returned from an attempt to Close an already disconnected or disconnecting pool.

View Source
var ErrServerClosed = errors.New("server is closed")

ErrServerClosed occurs when an attempt to Get a connection is made after the server has been closed.

View Source
var ErrServerConnected = errors.New("server is connected")

ErrServerConnected occurs when at attempt to Connect is made after a server has already been connected.

View Source
var ErrServerSelectionTimeout = errors.New("server selection timeout")

ErrServerSelectionTimeout is returned from server selection when the server selection process took longer than allowed by the timeout.

View Source
var ErrSubscribeAfterClosed = errors.New("cannot subscribe after closeConnection")

ErrSubscribeAfterClosed is returned when a user attempts to subscribe to a closed Server or Topology.

View Source
var ErrTopologyClosed = errors.New("topology is closed")

ErrTopologyClosed is returned when a user attempts to call a method on a closed Topology.

View Source
var ErrTopologyConnected = errors.New("topology is connected or connecting")

ErrTopologyConnected is returned whena user attempts to Connect to an already connected Topology.

View Source
var ErrWaitQueueTimeout = PoolError("timed out while checking out a connection from connection pool")

ErrWaitQueueTimeout is returned when the request to get a connection from the pool timesout when on the wait queue

View Source
var ErrWrongPool = PoolError("connection does not belong to this pool")

ErrWrongPool is return when a connection is returned to a pool it doesn't belong to.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection implements the driver.Connection interface to allow reading and writing wire messages and the driver.Expirable interface to allow expiring.

func (*Connection) Address

func (c *Connection) Address() address.Address

Address returns the address of this connection.

func (*Connection) Alive

func (c *Connection) Alive() bool

Alive returns if the connection is still alive.

func (*Connection) Close

func (c *Connection) Close() error

Close returns this connection to the connection pool. This method may not closeConnection the underlying socket.

func (*Connection) CompressWireMessage

func (c *Connection) CompressWireMessage(src, dst []byte) ([]byte, error)

CompressWireMessage handles compressing the provided wire message using the underlying connection's compressor. The dst parameter will be overwritten with the new wire message. If there is no compressor set on the underlying connection, then no compression will be performed.

func (*Connection) Description

func (c *Connection) Description() description.Server

Description returns the server description of the server this connection is connected to.

func (*Connection) Expire

func (c *Connection) Expire() error

Expire closes this connection and will closeConnection the underlying socket.

func (*Connection) ID

func (c *Connection) ID() string

ID returns the ID of this connection.

func (*Connection) LocalAddress

func (c *Connection) LocalAddress() address.Address

LocalAddress returns the local address of the connection

func (*Connection) ReadWireMessage

func (c *Connection) ReadWireMessage(ctx context.Context, dst []byte) ([]byte, error)

ReadWireMessage handles reading a wire message from the underlying connection. The dst parameter will be overwritten with the new wire message.

func (*Connection) WriteWireMessage

func (c *Connection) WriteWireMessage(ctx context.Context, wm []byte) error

WriteWireMessage handles writing a wire message to the underlying connection.

type ConnectionError

type ConnectionError struct {
	ConnectionID string
	Wrapped      error
	// contains filtered or unexported fields
}

ConnectionError represents a connection error.

func (ConnectionError) Error

func (e ConnectionError) Error() string

Error implements the error interface.

type ConnectionOption

type ConnectionOption func(*connectionConfig) error

ConnectionOption is used to configure a connection.

func WithAppName

func WithAppName(fn func(string) string) ConnectionOption

WithAppName sets the application name which gets sent to MongoDB when it first connects.

func WithCompressors

func WithCompressors(fn func([]string) []string) ConnectionOption

WithCompressors sets the compressors that can be used for communication.

func WithConnectTimeout

func WithConnectTimeout(fn func(time.Duration) time.Duration) ConnectionOption

WithConnectTimeout configures the maximum amount of time a dial will wait for a Connect to complete. The default is 30 seconds.

func WithDialer

func WithDialer(fn func(Dialer) Dialer) ConnectionOption

WithDialer configures the Dialer to use when making a new connection to MongoDB.

func WithHandshaker

func WithHandshaker(fn func(Handshaker) Handshaker) ConnectionOption

WithHandshaker configures the Handshaker that wll be used to initialize newly dialed connections.

func WithIdleTimeout

func WithIdleTimeout(fn func(time.Duration) time.Duration) ConnectionOption

WithIdleTimeout configures the maximum idle time to allow for a connection.

func WithLifeTimeout

func WithLifeTimeout(fn func(time.Duration) time.Duration) ConnectionOption

WithLifeTimeout configures the maximum life of a connection.

func WithMonitor

WithMonitor configures a event for command monitoring.

func WithReadTimeout

func WithReadTimeout(fn func(time.Duration) time.Duration) ConnectionOption

WithReadTimeout configures the maximum read time for a connection.

func WithTLSConfig

func WithTLSConfig(fn func(*tls.Config) *tls.Config) ConnectionOption

WithTLSConfig configures the TLS options for a connection.

func WithWriteTimeout

func WithWriteTimeout(fn func(time.Duration) time.Duration) ConnectionOption

WithWriteTimeout configures the maximum write time for a connection.

func WithZlibLevel

func WithZlibLevel(fn func(*int) *int) ConnectionOption

WithZlibLevel sets the zLib compression level.

type Dialer

type Dialer interface {
	DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

Dialer is used to make network connections.

var DefaultDialer Dialer = &net.Dialer{}

DefaultDialer is the Dialer implementation that is used by this package. Changing this will also change the Dialer used for this package. This should only be changed why all of the connections being made need to use a different Dialer. Most of the time, using a WithDialer option is more appropriate than changing this variable.

type DialerFunc

type DialerFunc func(ctx context.Context, network, address string) (net.Conn, error)

DialerFunc is a type implemented by functions that can be used as a Dialer.

func (DialerFunc) DialContext

func (df DialerFunc) DialContext(ctx context.Context, network, address string) (net.Conn, error)

DialContext implements the Dialer interface.

type Handshaker

type Handshaker = driver.Handshaker

Handshaker is the interface implemented by types that can perform a MongoDB handshake over a provided driver.Connection. This is used during connection initialization. Implementations must be goroutine safe.

type MonitorMode

type MonitorMode uint8

MonitorMode represents the way in which a server is monitored.

const (
	AutomaticMode MonitorMode = iota
	SingleMode
)

These constants are the available monitoring modes.

type Option

type Option func(*config) error

Option is a configuration option for a topology.

func WithConnString

func WithConnString(fn func(connstring.ConnString) connstring.ConnString) Option

WithConnString configures the topology using the connection string.

func WithMode

func WithMode(fn func(MonitorMode) MonitorMode) Option

WithMode configures the topology's monitor mode.

func WithReplicaSetName

func WithReplicaSetName(fn func(string) string) Option

WithReplicaSetName configures the topology's default replica set name.

func WithSeedList

func WithSeedList(fn func(...string) []string) Option

WithSeedList configures a topology's seed list.

func WithServerOptions

func WithServerOptions(fn func(...ServerOption) []ServerOption) Option

WithServerOptions configures a topology's server options for when a new server needs to be created.

func WithServerSelectionTimeout

func WithServerSelectionTimeout(fn func(time.Duration) time.Duration) Option

WithServerSelectionTimeout configures a topology's server selection timeout. A server selection timeout of 0 means there is no timeout for server selection.

type PoolError

type PoolError string

PoolError is an error returned from a Pool method.

func (PoolError) Error

func (pe PoolError) Error() string

type SelectedServer

type SelectedServer struct {
	*Server

	Kind description.TopologyKind
}

SelectedServer represents a specific server that was selected during server selection. It contains the kind of the topology it was selected from.

func (*SelectedServer) Description

func (ss *SelectedServer) Description() description.SelectedServer

Description returns a description of the server as of the last heartbeat.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a single server within a topology.

func ConnectServer

func ConnectServer(addr address.Address, updateCallback func(description.Server), opts ...ServerOption) (*Server, error)

ConnectServer creates a new Server and then initializes it using the Connect method.

func NewServer

func NewServer(addr address.Address, opts ...ServerOption) (*Server, error)

NewServer creates a new server. The mongodb server at the address will be monitored on an internal monitoring goroutine.

func (*Server) Connect

func (s *Server) Connect(updateCallback func(description.Server)) error

Connect initializes the Server by starting background monitoring goroutines. This method must be called before a Server can be used.

func (*Server) Connection

func (s *Server) Connection(ctx context.Context) (driver.Connection, error)

Connection gets a connection to the server.

func (*Server) Description

func (s *Server) Description() description.Server

Description returns a description of the server as of the last heartbeat.

func (*Server) Disconnect

func (s *Server) Disconnect(ctx context.Context) error

Disconnect closes sockets to the server referenced by this Server. Subscriptions to this Server will be closed. Disconnect will shutdown any monitoring goroutines, closeConnection the idle connection pool, and will wait until all the in use connections have been returned to the connection pool and are closed before returning. If the context expires via cancellation, deadline, or timeout before the in use connections have been returned, the in use connections will be closed, resulting in the failure of any in flight read or write operations. If this method returns with no errors, all connections associated with this Server have been closed.

func (*Server) ProcessError

func (s *Server) ProcessError(err error)

ProcessError handles SDAM error handling and implements driver.ErrorProcessor.

func (*Server) RequestImmediateCheck

func (s *Server) RequestImmediateCheck()

RequestImmediateCheck will cause the server to send a heartbeat immediately instead of waiting for the heartbeat timeout.

func (*Server) SelectedDescription

func (s *Server) SelectedDescription() description.SelectedServer

SelectedDescription returns a description.SelectedServer with a Kind of Single. This can be used when performing tasks like monitoring a batch of servers and you want to run one off commands against those servers.

func (*Server) String

func (s *Server) String() string

String implements the Stringer interface.

func (*Server) Subscribe

func (s *Server) Subscribe() (*ServerSubscription, error)

Subscribe returns a ServerSubscription which has a channel on which all updated server descriptions will be sent. The channel will have a buffer size of one, and will be pre-populated with the current description.

type ServerOption

type ServerOption func(*serverConfig) error

ServerOption configures a server.

func WithClock

func WithClock(fn func(clock *session.ClusterClock) *session.ClusterClock) ServerOption

WithClock configures the ClusterClock for the server to use.

func WithCompressionOptions

func WithCompressionOptions(fn func(...string) []string) ServerOption

WithCompressionOptions configures the server's compressors.

func WithConnectionOptions

func WithConnectionOptions(fn func(...ConnectionOption) []ConnectionOption) ServerOption

WithConnectionOptions configures the server's connections.

func WithConnectionPoolMaxIdleTime

func WithConnectionPoolMaxIdleTime(fn func(time.Duration) time.Duration) ServerOption

WithConnectionPoolMaxIdleTime configures the maximum time that a connection can remain idle in the connection pool before being removed. If connectionPoolMaxIdleTime is 0, then no idle time is set and connections will not be removed because of their age

func WithConnectionPoolMonitor

func WithConnectionPoolMonitor(fn func(*event.PoolMonitor) *event.PoolMonitor) ServerOption

WithConnectionPoolMonitor configures the monitor for all connection pool actions

func WithHeartbeatInterval

func WithHeartbeatInterval(fn func(time.Duration) time.Duration) ServerOption

WithHeartbeatInterval configures a server's heartbeat interval.

func WithHeartbeatTimeout

func WithHeartbeatTimeout(fn func(time.Duration) time.Duration) ServerOption

WithHeartbeatTimeout configures how long to wait for a heartbeat socket to connection.

func WithMaxConnections

func WithMaxConnections(fn func(uint64) uint64) ServerOption

WithMaxConnections configures the maximum number of connections to allow for a given server. If max is 0, then the default will be 100

func WithMinConnections

func WithMinConnections(fn func(uint64) uint64) ServerOption

WithMinConnections configures the minimum number of connections to allow for a given server. If min is 0, then there is no lower limit to the number of connections.

func WithRegistry

func WithRegistry(fn func(*bsoncodec.Registry) *bsoncodec.Registry) ServerOption

WithRegistry configures the registry for the server to use when creating cursors.

type ServerSubscription

type ServerSubscription struct {
	C <-chan description.Server
	// contains filtered or unexported fields
}

ServerSubscription represents a subscription to the description.Server updates for a specific server.

func (*ServerSubscription) Unsubscribe

func (ss *ServerSubscription) Unsubscribe() error

Unsubscribe unsubscribes this ServerSubscription from updates and closes the subscription channel.

type Subscription

type Subscription struct {
	C <-chan description.Topology
	// contains filtered or unexported fields
}

Subscription is a subscription to updates to the description of the Topology that created this Subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe unsubscribes this Subscription from updates and closes the subscription channel.

type Topology

type Topology struct {
	SessionPool *session.Pool
	// contains filtered or unexported fields
}

Topology represents a MongoDB deployment.

func New

func New(opts ...Option) (*Topology, error)

New creates a new topology.

func (*Topology) Connect

func (t *Topology) Connect() error

Connect initializes a Topology and starts the monitoring process. This function must be called to properly monitor the topology.

func (*Topology) Description

func (t *Topology) Description() description.Topology

Description returns a description of the topology.

func (*Topology) Disconnect

func (t *Topology) Disconnect(ctx context.Context) error

Disconnect closes the topology. It stops the monitoring thread and closes all open subscriptions.

func (*Topology) FindServer

func (t *Topology) FindServer(selected description.Server) (*SelectedServer, error)

FindServer will attempt to find a server that fits the given server description. This method will return nil, nil if a matching server could not be found.

func (*Topology) Kind

func (t *Topology) Kind() description.TopologyKind

Kind returns the topology kind of this Topology.

func (*Topology) RequestImmediateCheck

func (t *Topology) RequestImmediateCheck()

RequestImmediateCheck will send heartbeats to all the servers in the topology right away, instead of waiting for the heartbeat timeout.

func (*Topology) SelectServer

func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelector) (driver.Server, error)

SelectServer selects a server with given a selector. SelectServer complies with the server selection spec, and will time out after severSelectionTimeout or when the parent context is done.

func (*Topology) SelectServerLegacy

func (t *Topology) SelectServerLegacy(ctx context.Context, ss description.ServerSelector) (*SelectedServer, error)

SelectServerLegacy selects a server with given a selector. SelectServerLegacy complies with the server selection spec, and will time out after severSelectionTimeout or when the parent context is done.

func (*Topology) String

func (t *Topology) String() string

String implements the Stringer interface

func (*Topology) Subscribe

func (t *Topology) Subscribe() (*Subscription, error)

Subscribe returns a Subscription on which all updated description.Topologys will be sent. The channel of the subscription will have a buffer size of one, and will be pre-populated with the current description.Topology.

func (*Topology) SupportsRetryWrites

func (t *Topology) SupportsRetryWrites() bool

SupportsRetryWrites returns true if the topology supports retryable writes, which it does if it supports sessions.

func (*Topology) SupportsSessions

func (t *Topology) SupportsSessions() bool

SupportsSessions returns true if the topology supports sessions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL