command

package
v0.33.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: Apache-2.0 Imports: 22 Imported by: 1

Documentation

Index

Constants

View Source
const (
	MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count"
	MetricCommandLimiterWorkTimer          = "command.limiter.work_timer"

	DefaultLimiterSize = 100
	MinLimiterSize     = 10

	DefaultAdaptiveRateLimiterEnabled       = true
	DefaultAdaptiveRateLimiterMinWindowSize = 5
	DefaultAdaptiveRateLimiterMaxWindowSize = 250
	DefaultAdaptiveRateLimiterTimeout       = 30 * time.Second
)

Variables

This section is empty.

Functions

func LoadAdaptiveRateLimiterConfig added in v0.33.0

func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error

func WasRateLimited added in v0.32.2

func WasRateLimited(err error) bool

Types

type AdaptiveRateLimitTracker added in v0.33.0

type AdaptiveRateLimitTracker interface {
	RunRateLimited(label string) (RateLimitControl, error)
	RunRateLimitedF(label string, f func(control RateLimitControl) error) error
	IsRateLimited() bool
}

An AdaptiveRateLimitTracker works similarly to an AdaptiveRateLimiter, except it just manages the rate limiting without actually running the work. Because it doesn't run the work itself, it has to account for the possibility that some work may never report as complete or failed. It thus has a configurable timeout at which point outstanding work will be marked as failed.

func NewAdaptiveRateLimitTracker added in v0.33.0

func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) AdaptiveRateLimitTracker

type AdaptiveRateLimiter added in v0.32.0

type AdaptiveRateLimiter interface {
	RunRateLimited(f func() error) (RateLimitControl, error)
}

An AdaptiveRateLimiter allows running arbitrary, sequential operations with a limiter, so that only N operations can be queued to run at any given time. If the system is too busy, the rate limiter will return an ApiError indicating that the server is too busy.

The rate limiter returns a RateLimitControl, allow the calling code to indicate if the operation finished in time. If operations are timing out before the results are available, the rate limiter should allow fewer operations in, as they will likely time out before the results can be used.

The rate limiter doesn't have a set queue size, it has a window which can grow and shrink. When a timeout is signaled, using the RateLimitControl, it shrinks the window based on queue position of the timed out operation. For example, if an operation was queued at position 200, but the times out, we assume that we need to limit the queue size to something less than 200 for now.

The limiter will also reject already queued operations if the window size changes and the operation was queued at a position larger than the current window size.

The window size will slowly grow back towards the max as successes are noted in the RateLimitControl.

func NewAdaptiveRateLimiter added in v0.32.0

func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) AdaptiveRateLimiter

type AdaptiveRateLimiterConfig added in v0.32.0

type AdaptiveRateLimiterConfig struct {
	// Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting
	Enabled bool

	// MaxSize - the maximum window size to allow
	MaxSize uint32

	// MinSize - the smallest window size to allow
	MinSize uint32

	// WorkTimerMetric - the name of the timer metric for timing how long operations take to execute
	WorkTimerMetric string

	// QueueSize - the name of the gauge metric showing the current number of operations queued
	QueueSizeMetric string

	// WindowSizeMetric - the name of the metric show the current window size
	WindowSizeMetric string

	// Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to
	//           have failed if it hasn't been marked completed yet, so that work slots aren't lost
	Timeout time.Duration
}

AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter

func (*AdaptiveRateLimiterConfig) SetDefaults added in v0.33.0

func (self *AdaptiveRateLimiterConfig) SetDefaults()

type Command

type Command interface {
	// Apply runs the commands
	Apply(ctx boltz.MutateContext) error

	// GetChangeContext returns the change context associated with the command
	GetChangeContext() *change.Context

	// Encode returns a serialized representation of the command
	Encode() ([]byte, error)
}

Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination

type CreateEntityCommand

type CreateEntityCommand[T models.Entity] struct {
	Context        *change.Context
	Creator        EntityCreator[T]
	Entity         T
	PostCreateHook func(ctx boltz.MutateContext, entity T) error
	Flags          uint32
}

func (*CreateEntityCommand[T]) Apply

func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*CreateEntityCommand[T]) Encode

func (self *CreateEntityCommand[T]) Encode() ([]byte, error)

func (*CreateEntityCommand[T]) GetChangeContext

func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context

type Decoder

type Decoder interface {
	Decode(commandType int32, data []byte) (Command, error)
}

Decoder instances know how to decode encoded commands

type DecoderF

type DecoderF func(commandType int32, data []byte) (Command, error)

DecoderF is a function version of the Decoder interface

func (DecoderF) Decode

func (self DecoderF) Decode(commandType int32, data []byte) (Command, error)

type Decoders

type Decoders interface {
	Register(id int32, decoder Decoder)
	RegisterF(id int32, decoder DecoderF)
	Decode(data []byte) (Command, error)
	GetDecoder(id int32) Decoder
	Clear()
}

func GetDefaultDecoders

func GetDefaultDecoders() Decoders

func NewDecoders

func NewDecoders() Decoders

type DefaultRateLimiter added in v0.31.0

type DefaultRateLimiter struct {
	// contains filtered or unexported fields
}

func (*DefaultRateLimiter) GetQueueFillPct added in v0.33.0

func (self *DefaultRateLimiter) GetQueueFillPct() float64

func (*DefaultRateLimiter) RunRateLimited added in v0.31.0

func (self *DefaultRateLimiter) RunRateLimited(f func() error) error

type DeleteEntityCommand

type DeleteEntityCommand struct {
	Context *change.Context
	Deleter EntityDeleter
	Id      string
}

func (*DeleteEntityCommand) Apply

func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error

func (*DeleteEntityCommand) Encode

func (self *DeleteEntityCommand) Encode() ([]byte, error)

func (*DeleteEntityCommand) GetChangeContext

func (self *DeleteEntityCommand) GetChangeContext() *change.Context

type Dispatcher

type Dispatcher interface {
	Dispatch(command Command) error
	IsLeaderOrLeaderless() bool
	GetPeers() map[string]channel.Channel
	GetRateLimiter() RateLimiter
}

Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally

type EntityCreator

type EntityCreator[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyCreate creates the entity described by the given command
	ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityCreator instances can apply a create entity command to create entities of a given type

type EntityDeleter

type EntityDeleter interface {
	GetEntityTypeId() string

	// ApplyDelete deletes the entity described by the given command
	ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error
}

EntityDeleter instances can apply a delete entity command to delete entities of a given type

type EntityManager

type EntityManager[T models.Entity] interface {
	EntityCreator[T]
	EntityUpdater[T]
	EntityDeleter
}

EntityManager instances can handle create, update and delete entities of a specific type

type EntityMarshaller

type EntityMarshaller[T any] interface {
	// GetEntityTypeId returns the entity type id. This is distinct from the Store entity id
	// which may be shared by types, such as service and router. The entity type is unique
	// for each type
	GetEntityTypeId() string

	// Marshall marshals the entity to a bytes encoded format
	Marshall(entity T) ([]byte, error)

	// Unmarshall unmarshalls the bytes back into an entity
	Unmarshall(bytes []byte) (T, error)
}

EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type

type EntityUpdater

type EntityUpdater[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyUpdate updates the entity described by the given command
	ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityUpdater instances can apply an update entity command to update entities of a given type

type LocalDispatcher

type LocalDispatcher struct {
	EncodeDecodeCommands bool
	Limiter              RateLimiter
}

LocalDispatcher should be used when running a non-clustered system

func (*LocalDispatcher) Dispatch

func (self *LocalDispatcher) Dispatch(command Command) error

func (*LocalDispatcher) GetPeers

func (self *LocalDispatcher) GetPeers() map[string]channel.Channel

func (*LocalDispatcher) GetRateLimiter added in v0.33.0

func (self *LocalDispatcher) GetRateLimiter() RateLimiter

func (*LocalDispatcher) IsLeaderOrLeaderless

func (self *LocalDispatcher) IsLeaderOrLeaderless() bool

type NoOpAdaptiveRateLimitTracker added in v0.33.0

type NoOpAdaptiveRateLimitTracker struct{}

func (NoOpAdaptiveRateLimitTracker) IsRateLimited added in v0.33.0

func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool

func (NoOpAdaptiveRateLimitTracker) RunRateLimited added in v0.33.0

func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF added in v0.33.0

func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control RateLimitControl) error) error

type NoOpAdaptiveRateLimiter added in v0.32.0

type NoOpAdaptiveRateLimiter struct{}

func (NoOpAdaptiveRateLimiter) RunRateLimited added in v0.32.0

func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (RateLimitControl, error)

type NoOpRateLimiter added in v0.31.0

type NoOpRateLimiter struct{}

func (NoOpRateLimiter) GetQueueFillPct added in v0.33.0

func (self NoOpRateLimiter) GetQueueFillPct() float64

func (NoOpRateLimiter) RunRateLimited added in v0.31.0

func (self NoOpRateLimiter) RunRateLimited(f func() error) error

type RateLimitControl added in v0.32.0

type RateLimitControl interface {
	// Success indicates the operation was a success
	Success()

	// Backoff indicates that we need to backoff
	Backoff()

	// Failed indicates the operation was not a success, but a backoff isn't required
	Failed()
}

func NoOpRateLimitControl added in v0.33.0

func NoOpRateLimitControl() RateLimitControl

type RateLimiter added in v0.31.0

type RateLimiter interface {
	RunRateLimited(func() error) error
	GetQueueFillPct() float64
}

A RateLimiter allows running arbitrary, sequential operations with a limiter, so that only N operations can be queued to run at any given time. If the system is too busy, the rate limiter will return an ApiError indicating that the server is too busy

func NewRateLimiter added in v0.31.0

func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) RateLimiter

type RateLimiterConfig added in v0.31.0

type RateLimiterConfig struct {
	Enabled   bool
	QueueSize uint32
}

type SyncSnapshotCommand

type SyncSnapshotCommand struct {
	SnapshotId   string
	Snapshot     []byte
	SnapshotSink func(cmd *SyncSnapshotCommand) error
}

func (*SyncSnapshotCommand) Apply

func (*SyncSnapshotCommand) Encode

func (self *SyncSnapshotCommand) Encode() ([]byte, error)

func (*SyncSnapshotCommand) GetChangeContext

func (self *SyncSnapshotCommand) GetChangeContext() *change.Context

type UpdateEntityCommand

type UpdateEntityCommand[T models.Entity] struct {
	Context       *change.Context
	Updater       EntityUpdater[T]
	Entity        T
	UpdatedFields fields.UpdatedFields
	Flags         uint32
}

func (*UpdateEntityCommand[T]) Apply

func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*UpdateEntityCommand[T]) Encode

func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)

func (*UpdateEntityCommand[T]) GetChangeContext

func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context

type Validatable

type Validatable interface {
	Validate() error
}

Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL