grpc_pool

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2023 License: MIT Imports: 9 Imported by: 0

README

grpc_pool

High performance gRPC pool for grpc.ClientConn connections. This pool is special. It does not have single connection for single call, but rather it shares single connection for multiple concurrent calls. This is useful when you don't want to overload your servers with too many gRPC method calls on single connection.

how

So how does it work? It's pretty simple. It uses reflect.Select to select on multiple channels. Two channels represent context and acquire timeout, and rest of channels represent connections. Select can return from following channels:

  • 0. context is done - return context Error
  • 1. acquire timeout is done - try to create new connection
  • 2..n connection is ready - return connection

Whole pool is based on this idea. There are some additional features but basically this is how the core works. There are additional features, but it's minor to this main idea.

features

gRPC pool supports following features:

  • max concurrent calls on single connection
  • max idle connections count
  • max idle connection time
  • max connections count
  • max lifetime of connection

All have respective options With....

example

Let's have a look at example use in code.

// create new pool
pool, err := grpc_pool.New(
    // Dial function is used to create new connection
    func(ctx context.Context, stats *grpc_pool.PoolStats, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
        // add additional dial options
        opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
        // create new connection (always pass options from grpc-pool)
        return grpc.DialContext(ctx, "localhost:50051", opts...) 
    }, 
    // WithMaxConcurrency sets how many concurrent calls can be made on single connection 
    WithMaxConcurrency(1000),
    // WithMaxIdleConnections sets how many idle connections can be kept in pool
    WithMaxIdleConnections(5),
    // WithMaxIdleTime sets after how much time idle connection is marked as idle
    WithMaxIdleTime(time.Second*10),
    // WithMaxConnections sets how many connections can be kept in pool
    WithMaxConnections(20),
)

// prepare context with some timeout
ctx, cf := context.WithTimeout(context.Background(), time.Second*10)
defer cf()

// get connection
conn, err := pool.Acquire(ctx)
if err != nil {
	panic(err)
}

// don't forget to return connection back to pool, otherwise you will leak connections, and pool will be confused.
defer pool.Release(conn)

config

gRPC pool provides config compatible with viper (mapstructure), and also provides default tags. These tags work with https://github.com/mcuadros/go-defaults . You still need to provide dial function, so there is some small amout of work necessary. Bear in mind that you can squash this config and add your additional settings. This makes it easy to extend.

stats

gRPC pool provides stats about pool. You can use it to monitor your pool. It is safe to use in concurrent environment. However please note that it can have delay if pool is dialing new connection.

stats := pool.Stats()

author

Peter Vrba phonkee@phonkee.eu

Documentation

Index

Constants

View Source
const (
	// DefaultAcquireTimeout is the default timeout for acquiring a connection from the pool using reflect.Select
	DefaultAcquireTimeout = 50 * time.Millisecond
	// DefaultCleanupInterval is the default interval for cleaning up idle connections and connections that passed their max lifetime
	DefaultCleanupInterval = 5 * time.Second
	// DefaultMaxConcurrency is the default maximum number of concurrent connections
	DefaultMaxConcurrency = 1000
	// DefaultMaxIdleTime is the default maximum time to mark connection as idle when it wasn't used
	DefaultMaxIdleTime = time.Second * 60
	// DefaultMaxLifetime is the default maximum lifetime of a connection
	DefaultMaxLifetime = 30 * time.Minute
)

if these are changed, please change also config values in config.go

View Source
const (
	// ChosenContextDeadline is returned from Select when context deadline is reached
	ChosenContextDeadline = 0
	// ChosenAcquireTimeout is returned from Select when acquire timeout is reached
	ChosenAcquireTimeout = 1
)

These chosen constants are in strict order, so we can use them to check which channel was selected Warning: do not change the order of these constants, it will break main loop select.

Variables

View Source
var (
	ErrInvalidAcquireTimeout  = errors.New("invalid acquire timeout")
	ErrInvalidCleanupInterval = errors.New("invalid cleanup interval")
	ErrInvalidConnection      = errors.New("invalid connection")
	ErrInvalidDialFunc        = errors.New("invalid dial func")
	ErrDialFailed             = errors.New("dial failed")
	ErrInvalidMaxConcurrency  = errors.New("invalid max concurrency")
	ErrInvalidMaxIdleTime     = errors.New("invalid max idle time")
	ErrInvalidMaxLifetime     = errors.New("invalid max lifetime")
	ErrMaxConnectionsReached  = errors.New("max connections reached")
	ErrAlreadyClosed          = errors.New("pool already isClosed")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	AcquireTimeout     time.Duration `mapstructure:"acquire_timeout" default:"50ms"`
	CleanupInterval    time.Duration `mapstructure:"cleanup_interval" default:"5s"`
	MaxConcurrency     uint          `mapstructure:"max_concurrency" default:"1000"`
	MaxConnections     uint          `mapstructure:"max_connections" default:"0"`
	MaxIdleConnections uint          `mapstructure:"max_idle_connections" default:"0"`
	MaxIdleTime        time.Duration `mapstructure:"max_idle_time" default:"60s"`
	MaxLifetime        time.Duration `mapstructure:"max_lifetime" default:"30m"`
}

Config is compatible with viper config and mapstructure

It supports default values in struct tags, so you can use it with https://github.com/mcuadros/go-defaults

func (*Config) Options

func (c *Config) Options() []Option

Options returns options by given config

type ConnStats

type ConnStats struct {
	Target       string    `json:"target"`
	Created      time.Time `json:"created"`
	Deadline     time.Time `json:"deadline"`
	LastChange   time.Time `json:"last_change"`
	WorkingConns uint      `json:"working_conns"`
	IdleConns    uint      `json:"idle_conns"`
	Usage        uint64    `json:"used"`
}

ConnStats represents pool connection statistics

type DialFunc

type DialFunc func(ctx context.Context, stats *Stats, opts ...grpc.DialOption) (*grpc.ClientConn, error)

DialFunc is a function that dials a gRPC connection. This function is passed as required argument to New.

You need to provide your own implementation of this function.

It adds stats information to have context about already established connections. This is for case when you need to do more advanced client side load balancing based on already connected connections

func StaticHostDialFunc

func StaticHostDialFunc(address string, options ...grpc.DialOption) DialFunc

StaticHostDialFunc returns DialFunc that always connects to the same host.

type Logger

type Logger interface {
	// Log logs a message, this is useful for debugging purposes
	Log(ctx context.Context, msg string)
}

Logger interface for logging

type LoggerFunc

type LoggerFunc func(ctx context.Context, msg string)

LoggerFunc eases the creation of custom loggers

func (LoggerFunc) Log

func (f LoggerFunc) Log(ctx context.Context, msg string)

Log implements the Logger interface

type Option

type Option func(*options) error

Option is a function that can be passed to New to configure the pool.

func WithAcquireTimeout

func WithAcquireTimeout(timeout time.Duration) Option

WithAcquireTimeout sets the timeout for acquiring a connection from the pool before retrying again. Warning! This is very sensitive value, so please be careful when changing it. It should be very low value. Preferably under 100ms. Please set this value to a higher value only if you know what you are doing!

func WithCleanupInterval

func WithCleanupInterval(interval time.Duration) Option

WithCleanupInterval sets the interval for cleaning up idle connections.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger for the pool.

func WithMaxConcurrency

func WithMaxConcurrency(max uint) Option

WithMaxConcurrency sets the maximum number of concurrent method calls on single connection.

func WithMaxConnections

func WithMaxConnections(max uint) Option

WithMaxConnections sets the maximum number of connections. This is optional value.

func WithMaxIdleConnections

func WithMaxIdleConnections(max uint) Option

WithMaxIdleConnections sets the maximum number of idle connections. This is optional value.

func WithMaxIdleTime

func WithMaxIdleTime(max time.Duration) Option

WithMaxIdleTime sets the maximum idle time of a connection. It is necessary to set this option.

func WithMaxLifetime

func WithMaxLifetime(max time.Duration) Option

WithMaxLifetime sets the maximum lifetime of a connection. It is necessary to set this option to a value lower than zero.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool implementation

func New

func New(dialFunc DialFunc, opts ...Option) (*Pool, error)

New creates a new pool of gRPC connections.

DialFUnc is required because pool cannot work without it. Options can be passed to configure the pool.

func (*Pool) Acquire

func (p *Pool) Acquire(ctx context.Context) (*grpc.ClientConn, error)

Acquire acquires single connection from the pool. It checks if there is any connection available, and if not, it will dial new connection.

It uses unsafe, but uses it in a very safe way. It is safe to use it in concurrent environment.

Do not forget to Release the connection when you are done with it. Otherwise, you will have a problem.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context) error

Close closes the pool, connections and other background resources.

After pool is closed, you cannot do anything with it.

func (*Pool) Forget

func (p *Pool) Forget(cc *grpc.ClientConn) error

Forget directly removes connection from the pool.

Warning! This method should be only used when you want the connection to be closed asap. For usual use cases, use Release method.

After calling this method, you don't need to call Release.

func (*Pool) Release

func (p *Pool) Release(conn *grpc.ClientConn) error

Release returns a connection to the pool.

It also updates necessary information about the connection (stats, last used time, etc.).

func (*Pool) Stats

func (p *Pool) Stats() *Stats

Stats returns stats of the pool. It's safe to call this method from multiple goroutines. There is corner case when this method can take some time to return. When pool is dialing new connection.

type Stats

type Stats struct {
	Connections []ConnStats `json:"connections"`
}

Stats represents pool statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL