pools

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 13 Imported by: 19

Documentation

Overview

Package pools provides functionality to manage and reuse resources like connections.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed is returned if ResourcePool is used when it's closed.
	ErrClosed = errors.New("resource pool is closed")

	// ErrTimeout is returned if a resource get times out.
	ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "resource pool timed out")

	// ErrCtxTimeout is returned if a ctx is already expired by the time the resource pool is used
	ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "resource pool context already expired")
)

Functions

This section is empty.

Types

type Factory

type Factory func(context.Context) (Resource, error)

Factory is a function that can be used to create a resource.

type IDPool

type IDPool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

IDPool is used to ensure that the set of IDs in use concurrently never contains any duplicates. The IDs start at 1 and increase without bound, but will never be larger than the peak number of concurrent uses.

IDPool's Get() and Put() methods can be used concurrently.

func NewIDPool

func NewIDPool(initValue uint32) *IDPool

NewIDPool creates and initializes an IDPool.

func (*IDPool) Get

func (pool *IDPool) Get() (id uint32)

Get returns an ID that is unique among currently active users of this pool.

func (*IDPool) Put

func (pool *IDPool) Put(id uint32)

Put recycles an ID back into the pool for others to use. Putting back a value or 0, or a value that is not currently "checked out", will result in a panic because that should never happen except in the case of a programming error.

type IResourcePool added in v0.15.0

type IResourcePool interface {
	Close()
	Name() string
	Get(ctx context.Context, setting *Setting) (resource Resource, err error)
	Put(resource Resource)
	SetCapacity(capacity int) error
	SetIdleTimeout(idleTimeout time.Duration)
	StatsJSON() string
	Capacity() int64
	Available() int64
	Active() int64
	InUse() int64
	MaxCap() int64
	WaitCount() int64
	WaitTime() time.Duration
	IdleTimeout() time.Duration
	IdleClosed() int64
	MaxLifetimeClosed() int64
	Exhausted() int64
	GetCount() int64
	GetSettingCount() int64
	DiffSettingCount() int64
	ResetSettingCount() int64
}

type Numbered

type Numbered struct {
	// contains filtered or unexported fields
}

Numbered allows you to manage resources by tracking them with numbers. There are no interface restrictions on what you can track.

func NewNumbered

func NewNumbered() *Numbered

NewNumbered creates a new numbered

func (*Numbered) Get

func (nu *Numbered) Get(id int64, purpose string) (val any, err error)

Get locks the resource for use. It accepts a purpose as a string. If it cannot be found, it returns a "not found" error. If in use, it returns a "in use: purpose" error.

func (*Numbered) GetAll

func (nu *Numbered) GetAll() (vals []any)

GetAll returns the list of all resources in the pool.

func (*Numbered) GetByFilter added in v0.9.0

func (nu *Numbered) GetByFilter(purpose string, match func(val any) bool) (vals []any)

GetByFilter returns a list of resources that match the filter. It does not return any resources that are already locked.

func (*Numbered) Put

func (nu *Numbered) Put(id int64) bool

Put unlocks a resource for someone else to use.

func (*Numbered) Register

func (nu *Numbered) Register(id int64, val any) error

Register starts tracking a resource by the supplied id. It does not lock the object. It returns an error if the id already exists.

func (*Numbered) Size

func (nu *Numbered) Size() int64

Size returns the current size

func (*Numbered) StatsJSON

func (nu *Numbered) StatsJSON() string

StatsJSON returns stats in JSON format

func (*Numbered) Unregister

func (nu *Numbered) Unregister(id int64, reason string)

Unregister forgets the specified resource. If the resource is not present, it's ignored.

func (*Numbered) WaitForEmpty

func (nu *Numbered) WaitForEmpty()

WaitForEmpty returns as soon as the pool becomes empty

type RPCPool added in v0.12.0

type RPCPool struct {
	// contains filtered or unexported fields
}

RPCPool is a specialized version of the ResourcePool, for bounding concurrent access to making RPC calls.

Whether you use this, or a sync2.Semaphore to gate RPCs (or shared access to any shared resource) depends on what timeout semantics you need. A sync2.Semaphore takes a global timeout, which applies to calls to Acquire(); if you need to respect a context's deadline, you can call AcquireContext(context.Context), but that ignores the global timeout. Conversely, an RPCPool provides only one method of acquisition, Acquire(context.Context), which always uses the lower of the pool-global timeout or the context deadline.

func NewRPCPool added in v0.12.0

func NewRPCPool(size int, waitTimeout time.Duration, logWait func(time.Time)) *RPCPool

NewRPCPool returns an RPCPool with the given size and wait timeout. A zero timeout will be ignored on calls to Acquire, and only the context deadline will be used. If a logWait function is provided, it will be called whenever a call to Acquire has to wait for a resource, but only after a successful wait (meaning if we hit a timeout before a resource becomes available, it will not be called).

func (*RPCPool) Acquire added in v0.12.0

func (pool *RPCPool) Acquire(ctx context.Context) error

Acquire acquires one slot in the RPCPool. If a slot is not immediately available, it will block until one becomes available or until a timeout is reached. The lower of the context deadline and the pool's waitTimeout will be used as the timeout, except when the pool's waitTimeout is zero, in which case the context deadline will always serve as the overall timeout.

It returns nil on successful acquisition, and an error if a timeout occurred before a slot became available.

Note: For every successful call to Acquire, the caller must make a corresponding call to Release.

func (*RPCPool) Close added in v0.12.0

func (pool *RPCPool) Close()

Close empties the pool, preventing further Acquire calls from succeeding. It waits for all slots to be freed via Release.

func (*RPCPool) Release added in v0.12.0

func (pool *RPCPool) Release()

Release frees a slot in the pool. It must only be called after a successful call to Acquire.

func (*RPCPool) StatsJSON added in v0.12.0

func (pool *RPCPool) StatsJSON() string

type RefreshCheck added in v0.12.0

type RefreshCheck func() (bool, error)

RefreshCheck is a function used to determine if a resource pool should be refreshed (i.e. closed and reopened)

type Resource

type Resource interface {
	Close()
	Expired(time.Duration) bool
	ApplySetting(ctx context.Context, setting *Setting) error
	IsSettingApplied() bool
	IsSameSetting(setting string) bool
	ResetSetting(ctx context.Context) error
}

Resource defines the interface that every resource must provide. Thread synchronization between Close() and IsClosed() is the responsibility of the caller.

type ResourcePool

type ResourcePool struct {
	// contains filtered or unexported fields
}

ResourcePool allows you to use a pool of resources.

func NewResourcePool

func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, maxLifetime time.Duration, logWait func(time.Time), refreshCheck RefreshCheck, refreshInterval time.Duration) *ResourcePool

NewResourcePool creates a new ResourcePool pool. capacity is the number of possible resources in the pool: there can be up to 'capacity' of these at a given time. maxCap specifies the extent to which the pool can be resized in the future through the SetCapacity function. You cannot resize the pool beyond maxCap. If a resource is unused beyond idleTimeout, it's replaced with a new one. An idleTimeout of 0 means that there is no timeout. An maxLifetime of 0 means that there is no timeout. A non-zero value of prefillParallelism causes the pool to be pre-filled. The value specifies how many resources can be opened in parallel. refreshCheck is a function we consult at refreshInterval intervals to determine if the pool should be drained and reopened

func (*ResourcePool) Active

func (rp *ResourcePool) Active() int64

Active returns the number of active (i.e. non-nil) resources either in the pool or claimed for use

func (*ResourcePool) Available

func (rp *ResourcePool) Available() int64

Available returns the number of currently unused and available resources.

func (*ResourcePool) Capacity

func (rp *ResourcePool) Capacity() int64

Capacity returns the capacity.

func (*ResourcePool) Close

func (rp *ResourcePool) Close()

Close empties the pool calling Close on all its resources. You can call Close while there are outstanding resources. It waits for all resources to be returned (Put). After a Close, Get is not allowed.

func (*ResourcePool) DiffSettingCount added in v0.15.0

func (rp *ResourcePool) DiffSettingCount() int64

DiffSettingCount returns the number of times different setting were applied on the resource.

func (*ResourcePool) Exhausted

func (rp *ResourcePool) Exhausted() int64

Exhausted returns the number of times Available dropped below 1

func (*ResourcePool) Get

func (rp *ResourcePool) Get(ctx context.Context, setting *Setting) (resource Resource, err error)

Get will return the next available resource. If capacity has not been reached, it will create a new one using the factory. Otherwise, it will wait till the next resource becomes available or a timeout. A timeout of 0 is an indefinite wait.

func (*ResourcePool) GetCount added in v0.15.0

func (rp *ResourcePool) GetCount() int64

GetCount returns the number of times get was called

func (*ResourcePool) GetSettingCount added in v0.15.0

func (rp *ResourcePool) GetSettingCount() int64

GetSettingCount returns the number of times getWithSettings was called

func (*ResourcePool) IdleClosed

func (rp *ResourcePool) IdleClosed() int64

IdleClosed returns the count of resources closed due to idle timeout.

func (*ResourcePool) IdleTimeout

func (rp *ResourcePool) IdleTimeout() time.Duration

IdleTimeout returns the resource idle timeout.

func (*ResourcePool) InUse

func (rp *ResourcePool) InUse() int64

InUse returns the number of claimed resources from the pool

func (*ResourcePool) MaxCap

func (rp *ResourcePool) MaxCap() int64

MaxCap returns the max capacity.

func (*ResourcePool) MaxLifetimeClosed added in v0.16.0

func (rp *ResourcePool) MaxLifetimeClosed() int64

MaxLifetimeClosed returns the count of resources closed due to refresh timeout.

func (*ResourcePool) Name added in v0.15.0

func (rp *ResourcePool) Name() string

func (*ResourcePool) Put

func (rp *ResourcePool) Put(resource Resource)

Put will return a resource to the pool. For every successful Get, a corresponding Put is required. If you no longer need a resource, you will need to call Put(nil) instead of returning the closed resource. This will cause a new resource to be created in its place.

func (*ResourcePool) ResetSettingCount added in v0.15.0

func (rp *ResourcePool) ResetSettingCount() int64

ResetSettingCount returns the number of times setting were reset on the resource.

func (*ResourcePool) SetCapacity

func (rp *ResourcePool) SetCapacity(capacity int) error

SetCapacity changes the capacity of the pool. You can use it to shrink or expand, but not beyond the max capacity. If the change requires the pool to be shrunk, SetCapacity waits till the necessary number of resources are returned to the pool. A SetCapacity of 0 is equivalent to closing the ResourcePool.

func (*ResourcePool) SetIdleTimeout

func (rp *ResourcePool) SetIdleTimeout(idleTimeout time.Duration)

SetIdleTimeout sets the idle timeout. It can only be used if there was an idle timeout set when the pool was created.

func (*ResourcePool) StatsJSON

func (rp *ResourcePool) StatsJSON() string

StatsJSON returns the stats in JSON format.

func (*ResourcePool) WaitCount

func (rp *ResourcePool) WaitCount() int64

WaitCount returns the total number of waits.

func (*ResourcePool) WaitTime

func (rp *ResourcePool) WaitTime() time.Duration

WaitTime returns the total wait time.

type Setting added in v0.15.0

type Setting struct {
	// contains filtered or unexported fields
}

Setting represents a set query and reset query for system settings.

func NewSetting added in v0.15.0

func NewSetting(query, resetQuery string) *Setting

func (*Setting) CachedSize added in v0.15.0

func (cached *Setting) CachedSize(alloc bool) int64

func (*Setting) GetQuery added in v0.15.0

func (s *Setting) GetQuery() string

func (*Setting) GetResetQuery added in v0.15.0

func (s *Setting) GetResetQuery() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL