command

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2024 License: Apache-2.0 Imports: 23 Imported by: 1

Documentation

Index

Constants

View Source
const (
	MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count"
	MetricCommandLimiterWorkTimer          = "command.limiter.work_timer"

	DefaultLimiterSize = 100
	MinLimiterSize     = 10

	DefaultAdaptiveRateLimiterEnabled       = true
	DefaultAdaptiveRateLimiterMinWindowSize = 5
	DefaultAdaptiveRateLimiterMaxWindowSize = 250
	DefaultAdaptiveRateLimiterTimeout       = 30 * time.Second
)

Variables

This section is empty.

Functions

func LoadAdaptiveRateLimiterConfig added in v0.33.0

func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error

func NewAdaptiveRateLimitTracker added in v0.33.0

func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimitTracker

func NewAdaptiveRateLimiter added in v0.32.0

func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimiter

func NewRateLimiter added in v0.31.0

func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.RateLimiter

func WasRateLimited added in v0.32.2

func WasRateLimited(err error) bool

Types

type AdaptiveRateLimiterConfig added in v0.32.0

type AdaptiveRateLimiterConfig struct {
	// Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting
	Enabled bool

	// MaxSize - the maximum window size to allow
	MaxSize uint32

	// MinSize - the smallest window size to allow
	MinSize uint32

	// WorkTimerMetric - the name of the timer metric for timing how long operations take to execute
	WorkTimerMetric string

	// QueueSize - the name of the gauge metric showing the current number of operations queued
	QueueSizeMetric string

	// WindowSizeMetric - the name of the metric show the current window size
	WindowSizeMetric string

	// Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to
	//           have failed if it hasn't been marked completed yet, so that work slots aren't lost
	Timeout time.Duration
}

AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter

func (*AdaptiveRateLimiterConfig) SetDefaults added in v0.33.0

func (self *AdaptiveRateLimiterConfig) SetDefaults()

type Command

type Command interface {
	// Apply runs the commands
	Apply(ctx boltz.MutateContext) error

	// GetChangeContext returns the change context associated with the command
	GetChangeContext() *change.Context

	// Encode returns a serialized representation of the command
	Encode() ([]byte, error)
}

Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination

type CreateEntityCommand

type CreateEntityCommand[T models.Entity] struct {
	Context        *change.Context
	Creator        EntityCreator[T]
	Entity         T
	PostCreateHook func(ctx boltz.MutateContext, entity T) error
	Flags          uint32
}

func (*CreateEntityCommand[T]) Apply

func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*CreateEntityCommand[T]) Encode

func (self *CreateEntityCommand[T]) Encode() ([]byte, error)

func (*CreateEntityCommand[T]) GetChangeContext

func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context

type Decoder

type Decoder interface {
	Decode(commandType int32, data []byte) (Command, error)
}

Decoder instances know how to decode encoded commands

type DecoderF

type DecoderF func(commandType int32, data []byte) (Command, error)

DecoderF is a function version of the Decoder interface

func (DecoderF) Decode

func (self DecoderF) Decode(commandType int32, data []byte) (Command, error)

type Decoders

type Decoders interface {
	Register(id int32, decoder Decoder)
	RegisterF(id int32, decoder DecoderF)
	Decode(data []byte) (Command, error)
	GetDecoder(id int32) Decoder
	Clear()
}

func GetDefaultDecoders

func GetDefaultDecoders() Decoders

func NewDecoders

func NewDecoders() Decoders

type DefaultRateLimiter added in v0.31.0

type DefaultRateLimiter struct {
	// contains filtered or unexported fields
}

func (*DefaultRateLimiter) GetQueueFillPct added in v0.33.0

func (self *DefaultRateLimiter) GetQueueFillPct() float64

func (*DefaultRateLimiter) RunRateLimited added in v0.31.0

func (self *DefaultRateLimiter) RunRateLimited(f func() error) error

type DeleteEntityCommand

type DeleteEntityCommand struct {
	Context *change.Context
	Deleter EntityDeleter
	Id      string
}

func (*DeleteEntityCommand) Apply

func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error

func (*DeleteEntityCommand) Encode

func (self *DeleteEntityCommand) Encode() ([]byte, error)

func (*DeleteEntityCommand) GetChangeContext

func (self *DeleteEntityCommand) GetChangeContext() *change.Context

type Dispatcher

type Dispatcher interface {
	Dispatch(command Command) error
	IsLeaderOrLeaderless() bool
	IsLeaderless() bool
	IsLeader() bool
	GetPeers() map[string]channel.Channel
	GetRateLimiter() rate.RateLimiter
	Bootstrap() error
}

Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally

type EntityCreator

type EntityCreator[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyCreate creates the entity described by the given command
	ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityCreator instances can apply a create entity command to create entities of a given type

type EntityDeleter

type EntityDeleter interface {
	GetEntityTypeId() string

	// ApplyDelete deletes the entity described by the given command
	ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error
}

EntityDeleter instances can apply a delete entity command to delete entities of a given type

type EntityManager

type EntityManager[T models.Entity] interface {
	EntityCreator[T]
	EntityUpdater[T]
	EntityDeleter
}

EntityManager instances can handle create, update and delete entities of a specific type

type EntityMarshaller

type EntityMarshaller[T any] interface {
	// GetEntityTypeId returns the entity type id. This is distinct from the Store entity id
	// which may be shared by types, such as service and router. The entity type is unique
	// for each type
	GetEntityTypeId() string

	// Marshall marshals the entity to a bytes encoded format
	Marshall(entity T) ([]byte, error)

	// Unmarshall unmarshalls the bytes back into an entity
	Unmarshall(bytes []byte) (T, error)
}

EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type

type EntityUpdater

type EntityUpdater[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyUpdate updates the entity described by the given command
	ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityUpdater instances can apply an update entity command to update entities of a given type

type LocalDispatcher

type LocalDispatcher struct {
	EncodeDecodeCommands bool
	Limiter              rate.RateLimiter
}

LocalDispatcher should be used when running a non-clustered system

func (*LocalDispatcher) Bootstrap added in v1.2.0

func (self *LocalDispatcher) Bootstrap() error

func (*LocalDispatcher) Dispatch

func (self *LocalDispatcher) Dispatch(command Command) error

func (*LocalDispatcher) GetPeers

func (self *LocalDispatcher) GetPeers() map[string]channel.Channel

func (*LocalDispatcher) GetRateLimiter added in v0.33.0

func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter

func (*LocalDispatcher) IsLeader added in v1.2.1

func (self *LocalDispatcher) IsLeader() bool

func (*LocalDispatcher) IsLeaderOrLeaderless

func (self *LocalDispatcher) IsLeaderOrLeaderless() bool

func (*LocalDispatcher) IsLeaderless added in v1.1.8

func (self *LocalDispatcher) IsLeaderless() bool

type NoOpAdaptiveRateLimitTracker added in v0.33.0

type NoOpAdaptiveRateLimitTracker struct{}

func (NoOpAdaptiveRateLimitTracker) IsRateLimited added in v0.33.0

func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool

func (NoOpAdaptiveRateLimitTracker) RunRateLimited added in v0.33.0

func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF added in v0.33.0

func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control rate.RateLimitControl) error) error

type NoOpAdaptiveRateLimiter added in v0.32.0

type NoOpAdaptiveRateLimiter struct{}

func (NoOpAdaptiveRateLimiter) RunRateLimited added in v0.32.0

func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (rate.RateLimitControl, error)

type NoOpRateLimiter added in v0.31.0

type NoOpRateLimiter struct{}

func (NoOpRateLimiter) GetQueueFillPct added in v0.33.0

func (self NoOpRateLimiter) GetQueueFillPct() float64

func (NoOpRateLimiter) RunRateLimited added in v0.31.0

func (self NoOpRateLimiter) RunRateLimited(f func() error) error

type RateLimiterConfig added in v0.31.0

type RateLimiterConfig struct {
	Enabled   bool
	QueueSize uint32
}

type SyncSnapshotCommand

type SyncSnapshotCommand struct {
	SnapshotId   string
	Snapshot     []byte
	SnapshotSink func(cmd *SyncSnapshotCommand) error
}

func (*SyncSnapshotCommand) Apply

func (*SyncSnapshotCommand) Encode

func (self *SyncSnapshotCommand) Encode() ([]byte, error)

func (*SyncSnapshotCommand) GetChangeContext

func (self *SyncSnapshotCommand) GetChangeContext() *change.Context

type UpdateEntityCommand

type UpdateEntityCommand[T models.Entity] struct {
	Context       *change.Context
	Updater       EntityUpdater[T]
	Entity        T
	UpdatedFields fields.UpdatedFields
	Flags         uint32
}

func (*UpdateEntityCommand[T]) Apply

func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*UpdateEntityCommand[T]) Encode

func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)

func (*UpdateEntityCommand[T]) GetChangeContext

func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context

type Validatable

type Validatable interface {
	Validate() error
}

Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL